This repository has been archived on 2023-10-09. You can view files and clone it, but cannot push or open issues or pull requests.
Files
blender-archive/release/scripts/modules/bpkg/actions.py
2017-08-29 01:50:58 -07:00

159 lines
5.4 KiB
Python

from pathlib import Path
from . import exceptions
from . import utils
import shutil
import logging
def download(url: str, destination: Path, progress_callback=None) -> Path:
"""
Downloads file at the given url, and if progress_callback is specified,
repeatedly calls progress_callback with an argument between 0 and 1, or
infinity if progress cannot be determined. Raises DownloadException if an
error occurs with the download.
:returns: path to the downloaded file, or None if not modified
"""
import requests
log = logging.getLogger('%s.download' % __name__)
if progress_callback is None:
# assign to do-nothing function
def progress_callback(x): return None
progress_callback(0)
log.info('Downloading %s ', url)
resp = requests.get(url, stream=True, verify=True)
try:
resp.raise_for_status()
except requests.HTTPError as err:
raise exceptions.DownloadException(resp.status_code, str(err))
if resp.status_code == requests.codes.not_modified:
log.info("Server responded 'Not Modified', not downloading")
progress_callback(1)
return None
# determine destination filename from url, but only after we've determined it works as a real url
# derive filename from url if given `destination` is an existing directory,
# otherwise use `destination` directly
if destination.is_dir():
# TODO: get filename from Content-Disposition header, if available.
from urllib.parse import urlsplit, urlunsplit
parsed_url = urlsplit(url)
local_filename = Path(parsed_url.path).name or 'download.tmp'
local_fpath = destination / local_filename
else:
local_fpath = destination
try:
content_length = int(resp.headers['content-length'])
except KeyError:
log.warning(
'Server did not send content length, cannot report progress.')
content_length = 0
try:
downloaded_length = 0
with local_fpath.open('wb') as outfile:
for chunk in resp.iter_content(chunk_size=1024 ** 2):
if not chunk: # filter out keep-alive new chunks
continue
outfile.write(chunk)
downloaded_length += len(chunk)
try:
progress_callback(downloaded_length / content_length)
except ZeroDivisionError:
pass
except OSError as err:
raise exceptions.DownloadException("Encountered an error while writing file to '%s', are you sure there's enough space?" % local_fpath) from err
except PermissionError as err:
raise exceptions.DownloadException("No permissions to write to '%s'" % local_fpath)
progress_callback(1)
return local_fpath
def install(src_file: Path, dest_dir: Path):
"""Extracts/moves package at `src_file` to `dest_dir`"""
import zipfile
log = logging.getLogger('%s.install' % __name__)
log.error("Starting installation")
if not src_file.is_file():
raise exceptions.InstallException("Package isn't a file")
if not dest_dir.is_dir():
raise exceptions.InstallException("Destination is not a directory")
# TODO: check to make sure addon/package isn't already installed elsewhere
def install_zip(src_zip, dest_dir):
"""Extract src_zip to dest_dir"""
try:
file_to_extract = zipfile.ZipFile(str(src_zip), 'r')
except Exception as err:
raise exceptions.InstallException(
"Failed to read zip file: %s" % err) from err
def root_files(filelist: list) -> list:
"""Some string parsing to get a list of the root contents of a zip from its namelist"""
rootlist = []
for f in filelist:
# Get all names which have no path separators (root level files)
# or have a single path separator at the end (root level directories).
if len(f.rstrip('/').split('/')) == 1:
rootlist.append(f)
return rootlist
conflicts = [
dest_dir / f for f in root_files(file_to_extract.namelist()) if (dest_dir / f).exists()]
backups = []
for conflict in conflicts:
log.debug("Creating backup of conflict %s", conflict)
backups.append(utils.InplaceBackup(conflict))
try:
file_to_extract.extractall(str(dest_dir))
except Exception as err:
for backup in backups:
backup.restore()
raise exceptions.InstallException(
"Failed to extract zip file to '%s': %s" % (dest_dir, err)) from err
for backup in backups:
backup.remove()
def install_py(src_file, dest_dir):
"""Move src_file to dest_dir)"""
dest_file = dest_dir / src_file.name
backup = None
if dest_file.exists():
backup = utils.InplaceBackup(dest_file)
try:
shutil.copyfile(str(src_file), str(dest_file))
except Exception as err:
backup.restore()
raise exceptions.InstallException(
"Failed to copy file to '%s': %s" % (dest_dir, err)) from err
if backup:
backup.remove()
if zipfile.is_zipfile(str(src_file)):
install_zip(src_file, dest_dir)
else:
install_py(src_file, dest_dir)
log.debug("Installation succeeded")