Cleanup: rename package manager addon from bpkg -> package_manager

As I see it, the *package manager* is called "bpkg", while the *package
manager addon* is called "package_manager".

The *package manager* contains the actual package management code, and
the *package manager addon* provides the interface to use it.
This commit is contained in:
Ellwood Zwovic
2017-07-20 23:05:13 -07:00
parent 57f730b04c
commit a250777d15
4 changed files with 38 additions and 38 deletions

897
package_manager/__init__.py Normal file
View File

@@ -0,0 +1,897 @@
"""
Blender Package manager
"""
bl_info = {
'name': 'Package Manager',
'author': 'Sybren A. Stüvel',
'version': (0, 1, 0),
'blender': (2, 79, 0),
'location': 'Addon Preferences panel',
'description': 'Add-on package manager.',
'category': 'System',
}
import logging
if 'bpy' in locals():
import importlib
subproc = importlib.reload(subproc)
Package = subproc.Package
else:
from . import subproc
from .subproc import Package
import bpy
from collections import OrderedDict
class SubprocMixin:
"""Mix-in class for things that need to be run in a subprocess."""
log = logging.getLogger(__name__ + '.SubprocMixin')
_state = 'INITIALIZING'
_abort_timeout = 0 # time at which we stop waiting for an abort response and just terminate the process
# Mapping from message type (see bpkg_manager.subproc) to handler function.
# Should be constructed before modal() gets called.
msg_handlers = {}
def execute(self, context):
return self.invoke(context, None)
def quit(self):
"""Signals the state machine to stop this operator from running."""
self._state = 'QUIT'
def invoke(self, context, event):
import multiprocessing
self.log.info('Starting')
self.pipe_blender, self.pipe_subproc = multiprocessing.Pipe()
# The subprocess should just be terminated when Blender quits. Without this,
# Blender would hang while closing, until the subprocess terminates itself.
self.process = self.create_subprocess()
self.process.daemon = True
self.process.start()
self._state = 'RUNNING'
wm = context.window_manager
wm.modal_handler_add(self)
self.timer = wm.event_timer_add(0.1, context.window)
return {'RUNNING_MODAL'}
def modal(self, context, event):
import time
if event.type == 'ESC':
self.log.warning('Escape pressed, sending abort signal to subprocess')
self.abort()
return {'PASS_THROUGH'}
if event.type != 'TIMER':
return {'PASS_THROUGH'}
if self._state == 'ABORTING' and time.time() > self._abort_timeout:
self.log.error('No response from subprocess to abort request, terminating it.')
self.report({'ERROR'}, 'No response from subprocess to abort request, terminating it.')
self.process.terminate()
self._finish(context)
return {'CANCELLED'}
while self.pipe_blender.poll():
self.handle_received_data()
if self._state == 'QUIT':
self._finish(context)
return {'FINISHED'}
if not self.process.is_alive():
self.report_process_died()
self._finish(context)
return {'CANCELLED'}
return {'RUNNING_MODAL'}
def abort(self):
import time
# Allow the subprocess 10 seconds to repsond to our abort message.
self._abort_timeout = time.time() + 10
self._state = 'ABORTING'
self.pipe_blender.send(subproc.Abort())
def _finish(self, context):
import multiprocessing
global bpkg_operation_running
context.window_manager.event_timer_remove(self.timer)
bpkg_operation_running = False
if self.process and self.process.is_alive():
self.log.debug('Waiting for subprocess to quit')
try:
self.process.join(timeout=10)
except multiprocessing.TimeoutError:
self.log.warning('Subprocess is hanging, terminating it forcefully.')
self.process.terminate()
else:
self.log.debug('Subprocess stopped with exit code %i', self.process.exitcode)
def handle_received_data(self):
recvd = self.pipe_blender.recv()
self.log.debug('Received message from subprocess: %s', recvd)
try:
handler = self.msg_handlers[type(recvd)]
except KeyError:
self.log.error('Unable to handle received message %s', recvd)
# Maybe we shouldn't show this to the user?
self.report({'WARNING'}, 'Unable to handle received message %s' % recvd)
return
handler(recvd)
def create_subprocess(self):
"""Implement this in a subclass.
:rtype: multiprocessing.Process
"""
raise NotImplementedError()
def report_process_died(self):
"""Provides the user with sensible information when the process has died.
Implement this in a subclass.
"""
raise NotImplementedError()
class PACKAGE_OT_install(SubprocMixin, bpy.types.Operator):
bl_idname = 'package.install'
bl_label = 'Install package'
bl_description = 'Downloads and installs a Blender add-on package'
bl_options = {'REGISTER'}
package_url = bpy.props.StringProperty(name='package_url', description='The URL of the file to download')
log = logging.getLogger(__name__ + '.PACKAGE_OT_install')
def invoke(self, context, event):
if not self.package_url:
self.report({'ERROR'}, 'Package URL not given')
return {'CANCELLED'}
return super().invoke(context, event)
def create_subprocess(self):
"""Starts the download process.
Also registers the message handlers.
:rtype: multiprocessing.Process
"""
import multiprocessing
self.msg_handlers = {
subproc.Progress: self._subproc_progress,
subproc.DownloadError: self._subproc_download_error,
subproc.InstallError: self._subproc_install_error,
subproc.FileConflictError: self._subproc_conflict_error,
subproc.Success: self._subproc_success,
subproc.Aborted: self._subproc_aborted,
}
import pathlib
# TODO: We need other paths besides this one on subprocess end, so it might be better to pass them all at once.
# For now, just pass this one.
install_path = pathlib.Path(bpy.utils.user_resource('SCRIPTS', 'addons', create=True))
self.log.debug("Using %s as install path", install_path)
import addon_utils
proc = multiprocessing.Process(target=subproc.download_and_install,
args=(self.pipe_subproc, self.package_url, install_path, addon_utils.paths()))
return proc
def _subproc_progress(self, progress: subproc.Progress):
self.log.info('Task progress at %i%%', progress.progress * 100)
def _subproc_download_error(self, error: subproc.DownloadError):
self.report({'ERROR'}, 'Unable to download package: %s' % error.description)
self.quit()
def _subproc_install_error(self, error: subproc.InstallError):
self.report({'ERROR'}, 'Unable to install package: %s' % error.message)
self.quit()
def _subproc_conflict_error(self, error: subproc.FileConflictError):
self.report({'ERROR'}, 'Unable to install package: %s' % error.message)
self.quit()
def _subproc_success(self, success: subproc.Success):
self.report({'INFO'}, 'Package installed successfully')
getattr(bpy.ops, __package__).refresh_packages()
self.quit()
def _subproc_aborted(self, aborted: subproc.Aborted):
self.report({'ERROR'}, 'Package installation aborted per your request')
self.quit()
def report_process_died(self):
if self.process.exitcode:
self.log.error('Process died without telling us! Exit code was %i', self.process.exitcode)
self.report({'ERROR'}, 'Error downloading package, exit code %i' % self.process.exitcode)
else:
self.log.error('Process died without telling us! Exit code was 0 though')
self.report({'WARNING'}, 'Error downloading package, but process finished OK. This is weird.')
class PACKAGE_OT_uninstall(SubprocMixin, bpy.types.Operator):
bl_idname = 'package.uninstall'
bl_label = 'Install package'
bl_description = 'Downloads and installs a Blender add-on package'
bl_options = {'REGISTER'}
package_name = bpy.props.StringProperty(name='package_name', description='The name of the package to uninstall')
log = logging.getLogger(__name__ + '.PACKAGE_OT_uninstall')
def invoke(self, context, event):
if not self.package_name:
self.report({'ERROR'}, 'Package name not given')
return {'CANCELLED'}
return super().invoke(context, event)
def create_subprocess(self):
"""Starts the uninstall process and registers the message handlers.
:rtype: multiprocessing.Process
"""
import multiprocessing
self.msg_handlers = {
subproc.UninstallError: self._subproc_uninstall_error,
subproc.Success: self._subproc_success,
}
import pathlib
install_path = pathlib.Path(bpy.utils.user_resource('SCRIPTS', 'addons', create=True))
# TODO: only drawing-related package data should be stored on the panel. Maybe move this to a global..?
package = USERPREF_PT_packages.all_packages[self.package_name].get_latest_version()
proc = multiprocessing.Process(target=subproc.uninstall,
args=(self.pipe_subproc, package, install_path))
return proc
def _subproc_uninstall_error(self, error: subproc.InstallError):
self.report({'ERROR'}, 'Unable to install package: %s' % error.message)
self.quit()
def _subproc_success(self, success: subproc.Success):
self.report({'INFO'}, 'Package uninstalled successfully')
getattr(bpy.ops, __package__).refresh_packages()
self.quit()
def report_process_died(self):
if self.process.exitcode:
self.log.error('Process died without telling us! Exit code was %i', self.process.exitcode)
self.report({'ERROR'}, 'Error downloading package, exit code %i' % self.process.exitcode)
else:
self.log.error('Process died without telling us! Exit code was 0 though')
self.report({'WARNING'}, 'Error downloading package, but process finished OK. This is weird.')
def get_packages_from_disk(refresh=False) -> list:
"""Get list of packages installed on disk"""
import addon_utils
return [Package.from_module(mod) for mod in addon_utils.modules(refresh=refresh)]
def get_packages_from_repo() -> list:
"""Get list of packages from cached repository lists (does not refresh them from server)"""
import pathlib
storage_path = pathlib.Path(bpy.utils.user_resource('CONFIG', 'packages', create=True))
repo = subproc._load_repo(storage_path)
for pkg in repo.packages:
pkg.repository = repo.name
return repo.packages
class PACKAGE_OT_refresh_packages(bpy.types.Operator):
bl_idname = "package.refresh_packages"
bl_label = "Refresh Packages"
bl_description = "Scan for packages on disk"
log = logging.getLogger(__name__ + ".PACKAGE_OT_refresh_packages")
def execute(self, context):
installed_packages = get_packages_from_disk(refresh=True)
available_packages = get_packages_from_repo()
USERPREF_PT_packages.all_packages = build_composite_packagelist(installed_packages, available_packages)
return {'FINISHED'}
class PACKAGE_OT_refresh_repositories(SubprocMixin, bpy.types.Operator):
bl_idname = "package.refresh_repositories"
bl_label = "Refresh Repositories"
bl_description = 'Check repositories for new and updated packages'
bl_options = {'REGISTER'}
log = logging.getLogger(__name__ + ".PACKAGE_OT_refresh")
def create_subprocess(self):
"""Starts the download process.
Also registers the message handlers.
:rtype: multiprocessing.Process
"""
import multiprocessing
#TODO: make sure all possible messages are handled
self.msg_handlers = {
subproc.Progress: self._subproc_progress,
subproc.SubprocError: self._subproc_error,
subproc.DownloadError: self._subproc_download_error,
subproc.Success: self._subproc_success,
subproc.RepositoryResult: self._subproc_repository_result,
subproc.Aborted: self._subproc_aborted,
}
import pathlib
storage_path = pathlib.Path(bpy.utils.user_resource('CONFIG', 'packages', create=True))
repository_url = bpy.context.user_preferences.addons[__package__].preferences.repository_url
from urllib.parse import urlsplit, urlunsplit
parsed_url = urlsplit(repository_url)
if not parsed_url.path.endswith("repo.json"):
if parsed_url.path.endswith('/'):
new_path = parsed_url.path + "repo.json"
else:
new_path = parsed_url.path + "/repo.json"
repository_url = urlunsplit((parsed_url.scheme, parsed_url.netloc, new_path, parsed_url.query, parsed_url.fragment))
proc = multiprocessing.Process(target=subproc.refresh,
args=(self.pipe_subproc, storage_path, repository_url))
return proc
def _subproc_progress(self, progress: subproc.Progress):
self.log.info('Task progress at %i%%', progress.progress * 100)
def _subproc_error(self, error: subproc.SubprocError):
self.report({'ERROR'}, 'Unable to refresh package list: %s' % error.message)
self.quit()
def _subproc_download_error(self, error: subproc.DownloadError):
self.report({'ERROR'}, 'Unable to download package list: %s' % error.description)
self.quit()
def _subproc_success(self, success: subproc.Success):
self.quit()
def _subproc_repository_result(self, result: subproc.RepositoryResult):
available_packages = result.repository.packages
installed_packages = get_packages_from_disk(refresh=False)
# TODO: deduplicate creation of view-packages..
for pkg in available_packages:
pkg.repository = result.repository.name
USERPREF_PT_packages.all_packages = build_composite_packagelist(installed_packages, available_packages)
USERPREF_PT_packages.available_packages = available_packages
self.report({'INFO'}, 'Package list retrieved successfully')
def _subproc_aborted(self, aborted: subproc.Aborted):
self.report({'ERROR'}, 'Package list retrieval aborted per your request')
self.quit()
def report_process_died(self):
if self.process.exitcode:
self.log.error('Process died without telling us! Exit code was %i', self.process.exitcode)
self.report({'ERROR'}, 'Error refreshing package lists, exit code %i' % self.process.exitcode)
else:
self.log.error('Process died without telling us! Exit code was 0 though')
self.report({'WARNING'}, 'Error refreshing package lists, but process finished OK. This is weird.')
#TODO:
# monkey patch refresh_repositories and add refresh_packages in the success callback
# this way refresh_packages is always called after repositories have been refreshed
class PACKAGE_OT_refresh(bpy.types.Operator):
bl_idname = "package.refresh"
bl_label = "Refresh"
bl_description = "Check for new and updated packages"
def execute(self, context):
getattr(bpy.ops, __package__).refresh_repositories()
# getattr(bpy.ops, __package__).refresh_packages()
return {'FINISHED'}
class PACKAGE_OT_hang(SubprocMixin, bpy.types.Operator):
bl_idname = 'package.hang'
bl_label = 'Hang (debug)'
bl_description = 'Starts a process that hangs for an hour, for debugging purposes'
bl_options = {'REGISTER'}
log = logging.getLogger(__name__ + '.PACKAGE_OT_install')
def create_subprocess(self):
"""Starts the download process.
Also registers the message handlers.
:rtype: multiprocessing.Process
"""
import multiprocessing
proc = multiprocessing.Process(target=subproc.debug_hang)
return proc
def report_process_died(self):
self.report({'ERROR'}, 'Process died, exit code %s' % self.process.exitcode)
class PACKAGE_OT_load_repositories(SubprocMixin, bpy.types.Operator):
bl_idname = 'package.load_repositories'
bl_label = 'Load Repositories'
bl_description = 'Load repositories from disk'
bl_options = {'REGISTER'}
log = logging.getLogger(__name__ + '.PACKAGE_OT_load_repositories')
def create_subprocess(self):
"""
Start the load process and register message handlers
"""
import multiprocessing
import pathlib
# TODO: We need other paths besides this one on subprocess end, so it might be better to pass them all at once.
# For now, just pass this one.
storage_path = pathlib.Path(bpy.utils.user_resource('CONFIG', 'packages', create=True))
self.log.debug("Using %s as install path", install_path)
import addon_utils
proc = multiprocessing.Process(
target=subproc.load_repositories,
args=(self.pipe_subproc, self.storage_path)
)
return proc
self.msg_handlers = {
subproc.SubprocError: self._subproc_error,
subproc.RepositoryResult: self._subproc_repository_result,
subproc.Success: self._subproc_success,
subproc.Aborted: self._subproc_aborted,
}
def _subproc_error(self, error: subproc.SubprocError):
self.report({'ERROR'}, 'Failed to load repositories: %s' % error.message)
self.quit()
def _subproc_repository_result(self, result: subproc.RepositoryResult):
bpy.context.user_preferences.addons[__package__].preferences['repo'] = result.repository
self.log.info("Loaded repository %s", result.repository.name)
def _subproc_success(self, success: subproc.Success):
self.log.info("Successfully loaded repositories")
self.quit()
def _subproc_aborted(self, aborted: subproc.Aborted):
self.report({'ERROR'}, 'Package installation aborted per your request')
self.quit()
def report_process_died(self):
if self.process.exitcode:
self.log.error('Process died without telling us! Exit code was %i', self.process.exitcode)
self.report({'ERROR'}, 'Error downloading package, exit code %i' % self.process.exitcode)
else:
self.log.error('Process died without telling us! Exit code was 0 though')
self.report({'WARNING'}, 'Error downloading package, but process finished OK. This is weird.')
class USERPREF_PT_packages(bpy.types.Panel):
bl_label = "Package Management"
bl_space_type = 'USER_PREFERENCES'
bl_region_type = 'WINDOW'
bl_options = {'HIDE_HEADER'}
log = logging.getLogger(__name__ + '.USERPREF_PT_packages')
all_packages = OrderedDict()
available_packages = []
installed_packages = []
displayed_packages = []
@classmethod
def poll(cls, context):
userpref = context.user_preferences
return (userpref.active_section == 'PACKAGES')
def draw(self, context):
layout = self.layout
wm = context.window_manager
main = layout.row()
spl = main.split(.12)
sidebar = spl.column(align=True)
pkgzone = spl.column()
sidebar.label(text="Category")
sidebar.prop(wm, "addon_filter", text="")
top = pkgzone.row()
spl = top.split(.6)
spl.prop(wm, "package_search", text="", icon='VIEWZOOM')
spl_r = spl.row()
spl_r.prop(wm, "package_install_filter", expand=True)
def filtered_packages(filters: dict, packages: OrderedDict) -> list:
"""Returns filtered and sorted list of names of packages which match filters"""
#TODO: using lower() for case-insensitive comparison doesn't work in some languages
def match_contains(blinfo) -> bool:
if blinfo['name'].lower().__contains__(filters['search'].lower()):
return True
return False
def match_startswith(blinfo) -> bool:
if blinfo['name'].lower().startswith(filters['search'].lower()):
return True
return False
def match_category(blinfo) -> bool:
if filters['category'].lower() == 'all':
return True
if 'category' not in blinfo:
return False
if blinfo['category'].lower() == filters['category'].lower():
return True
return False
# use two lists as a simple way of putting "matches from the beginning" on top
contains = []
startswith = []
for pkgname, pkg in packages.items():
blinfo = pkg.versions[0].bl_info
if match_category(blinfo):
if len(filters['search']) == 0:
startswith.append(pkgname)
continue
if match_startswith(blinfo):
startswith.append(pkgname)
continue
if match_contains(blinfo):
contains.append(pkgname)
continue
return startswith + contains
def draw_package(pkg: ConsolidatedPackage, layout: bpy.types.UILayout):# {{{
"""Draws the given package"""
pkgbox = layout.box()
spl = pkgbox.split(.8)
left = spl.row(align=True)
blinfo = pkg.versions[0].bl_info
# for install/uninstall buttons
right = spl.row()
right.alignment = 'RIGHT'
right.scale_y = 1.5
# for collapse/expand button
left.operator(
WM_OT_package_toggle_expand.bl_idname,
icon='TRIA_DOWN' if pkg.expanded else 'TRIA_RIGHT',
emboss=False,
).package_name=blinfo['name']
# for metadata
leftcol = left.column(align=True)
def collapsed():
lr1 = leftcol.row()
lr2 = leftcol.row()
lr1.label(text=blinfo.get('name', ""))
lr2.label(text=blinfo.get('description', ""))
lr2.enabled = False #Give name more visual weight
latest_pkg = pkg.get_latest_version()
if latest_pkg.installed:
if latest_pkg.url:
right.operator(PACKAGE_OT_uninstall.bl_idname,
text="Uninstall").package_name=latest_pkg.name
else:
right.label("Installed")
else:
if latest_pkg.url:
right.operator(PACKAGE_OT_install.bl_idname,
text="Install").package_url=pkg.versions[0].url
else:
right.label("Not installed, but no URL?")
def expanded():
row1 = leftcol.row()
row1.label(blinfo.get('name'), "")
def string_version(version_number) -> str:
"""Take version number as an iterable and format it as a string"""
vstr = str(version_number[0])
for component in version_number[1:]:
vstr += "." + str(component)
return vstr
if blinfo.get('description'):
row2 = leftcol.row()
row2.label(blinfo['description'])
# row2.scale_y = 1.2
if blinfo.get('version'):
spl = leftcol.row().split(.15)
spl.label("Version:")
spl.label(string_version(blinfo['version']))
def draw_metadatum(key: str, value: str, layout: bpy.types.UILayout):
"""Draw the given key value pair in a new row in given layout container"""
row = layout.row()
row.scale_y = .8
spl = row.split(.15)
spl.label("{}:".format(key))
spl.label(value)
for prop in (
# "description",
"author",
"category",
# "version",
# "blender",
"location",
"warning",
"support",
# "wiki_url",
# "tracker_url",
):
if blinfo.get(prop):
row = leftcol.row()
row.scale_y = .8
spl = row.split(.15)
spl.label("{}:".format(prop.title()))
spl.label(str(blinfo[prop]))
def draw_version(layout: bpy.types.UILayout, pkg: Package):
"""Draw version of package"""
spl = layout.split(.9)
left = spl.column()
right = spl.column()
right.alignment = 'RIGHT'
left.label(text=string_version(pkg.version))
if pkg.repository is not None:
draw_metadatum("Repository", pkg.repository, left)
if pkg.installed:
right.label(text="Installed")
draw_metadatum("Installed to", str(pkg.installed_location), left)
if len(pkg.versions) > 1:
row = pkgbox.row()
row.label(text="There are multiple providers of this package:")
for version in pkg.versions:
# row = pkgbox.row()
subvbox = pkgbox.box()
draw_version(subvbox, version)
if pkg.expanded:
expanded()
else:
collapsed()# }}}
def center_message(layout, msg: str):
"""draw a label in the center of an extra-tall row"""
row = layout.row()
row.label(text=msg)
row.alignment='CENTER'
row.scale_y = 10
if len(USERPREF_PT_packages.all_packages) == 0:
center_message(pkgzone, "No packages found.")
# TODO: read repository and installed packages synchronously for now;
# can't run an operator from draw code to do async monitoring
installed_packages = get_packages_from_disk()
try:
available_packages = get_packages_from_repo()
except FileNotFoundError:
center_message(pkgzone, "No repositories found")
return
all_packages = build_composite_packagelist(installed_packages, available_packages)
if len(all_packages) == 0:
center_message(pkgzone, "No packages found")
return
USERPREF_PT_packages.all_packages = all_packages
filters = {
'category': bpy.context.window_manager.addon_filter,
'search': bpy.context.window_manager.package_search,
}
USERPREF_PT_packages.displayed_packages = filtered_packages(filters, USERPREF_PT_packages.all_packages)
for pkgname in USERPREF_PT_packages.displayed_packages:
row = pkgzone.row()
draw_package(USERPREF_PT_packages.all_packages[pkgname], row)
class ConsolidatedPackage:
"""
Stores a grouping of different versions of the same packages,
and view-specific data used for drawing
"""
def __init__(self, pkg=None):
self.versions = []
self.expanded = False
self.installed = False
if pkg is not None:
self.add_version(pkg)
def get_latest_version(self) -> Package:
"""Get package with highest version number"""
return self.versions[0] # this is always sorted with the highest on top
def add_version(self, pkg: Package):
self.versions.append(pkg)
self.versions.sort(key=lambda v: v.version, reverse=True)
def __iter__(self):
return (pkg for pkg in self.versions)
class WM_OT_package_toggle_expand(bpy.types.Operator):
bl_idname = "wm.package_toggle_expand"
bl_label = ""
bl_description = "Toggle display of extended information for given package (hold shift to collapse all other packages)"
bl_options = {'INTERNAL'}
log = logging.getLogger(__name__ + ".WM_OT_package_toggle_expand")
package_name = bpy.props.StringProperty(
name="Package Name",
description="Name of package to expand/collapse",
)
def invoke(self, context, event):
try:
pkg = USERPREF_PT_packages.all_packages[self.package_name]
except KeyError:
log.error("Couldn't find package '%s'", self.package_name)
return {'CANCELLED'}
pkg.expanded = not pkg.expanded
if event.shift:
for pkgname in USERPREF_PT_packages.displayed_packages:
if not pkgname == self.package_name:
USERPREF_PT_packages.all_packages[pkgname].expanded = False
return {'FINISHED'}
class PackageManagerPreferences(bpy.types.AddonPreferences):
bl_idname = __package__
package_url = bpy.props.StringProperty(
name='Package URL',
description='Just a temporary place to store the URL of a package to download')
repository_url = bpy.props.StringProperty(
name='Repository URL',
description='Temporary repository URL')
def draw(self, context):
layout = self.layout
temp_box = layout.box()
temp_box.label(text="Temporary stuff while we're developing")
temp_box.prop(self, 'repository_url')
temp_box.operator(PACKAGE_OT_refresh.bl_idname)
def validate_packagelist(pkglist: list) -> list:
"""Ensures all packages have required fields; strips out bad packages and returns them in a list"""
pass
def build_composite_packagelist(installed: list, available: list) -> OrderedDict:
"""Merge list of installed and available packages into one dict, keyed by package name"""
log = logging.getLogger(__name__ + ".build_composite_packagelist")
masterlist = {}
def packages_are_equivilent(pkg1: Package, pkg2: Package) -> bool:
"""Check that packages are the same version and provide the same files"""
return pkg1.version == pkg2.version\
and pkg1.files == pkg2.files
for pkg in available:
pkgname = pkg.bl_info['name']
if pkgname in masterlist:
masterlist[pkgname].add_version(pkg)
else:
masterlist[pkgname] = ConsolidatedPackage(pkg)
for pkg in installed:
pkg.installed = True
if pkg.name in masterlist:
for masterpkg in masterlist[pkg.name]:
log.debug("{} and {} equivilent? {}".format((pkg.name, pkg.version), (masterpkg.name, masterpkg.version), packages_are_equivilent(pkg, masterpkg)))
if packages_are_equivilent(pkg, masterpkg):
masterpkg.installed = True
masterpkg.installed_location = pkg.installed_location
break
else:
masterlist[pkg.name].add_version(pkg)
else:
masterlist[pkg.name] = ConsolidatedPackage(pkg)
return OrderedDict(sorted(masterlist.items()))
def register():
bpy.utils.register_class(PACKAGE_OT_install)
bpy.utils.register_class(PACKAGE_OT_uninstall)
bpy.utils.register_class(PACKAGE_OT_refresh_repositories)
bpy.utils.register_class(PACKAGE_OT_refresh_packages)
bpy.utils.register_class(PACKAGE_OT_refresh)
bpy.utils.register_class(PACKAGE_OT_load_repositories)
bpy.utils.register_class(PACKAGE_OT_hang)
bpy.utils.register_class(USERPREF_PT_packages)
bpy.utils.register_class(WM_OT_package_toggle_expand)
bpy.types.WindowManager.package_search = bpy.props.StringProperty(
name="Search",
description="Filter packages by name",
options={'TEXTEDIT_UPDATE'}
)
bpy.types.WindowManager.package_install_filter = bpy.props.EnumProperty(
items=[('AVAILABLE', "Available", "All packages in selected repositories"),
('INSTALLED', "Installed", "All installed packages"),
('UPDATES', "Updates", "All installed packages for which there is a newer version availabe")
],
name="Install filter",
default='AVAILABLE',
)
bpy.utils.register_class(PackageManagerPreferences)
def unregister():
bpy.utils.unregister_class(PACKAGE_OT_install)
bpy.utils.unregister_class(PACKAGE_OT_uninstall)
bpy.utils.unregister_class(PACKAGE_OT_refresh_repositories)
bpy.utils.unregister_class(PACKAGE_OT_refresh_packages)
bpy.utils.unregister_class(PACKAGE_OT_refresh)
bpy.utils.unregister_class(PACKAGE_OT_load_repositories)
bpy.utils.unregister_class(PACKAGE_OT_hang)
bpy.utils.unregister_class(USERPREF_PT_packages)
bpy.utils.unregister_class(WM_OT_package_toggle_expand)
del bpy.types.WindowManager.package_search
del bpy.types.WindowManager.package_install_filter
bpy.utils.unregister_class(PackageManagerPreferences)

552
package_manager/appdirs.py Normal file
View File

@@ -0,0 +1,552 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2005-2010 ActiveState Software Inc.
# Copyright (c) 2013 Eddy Petrișor
"""Utilities for determining application-specific dirs.
See <http://github.com/ActiveState/appdirs> for details and usage.
"""
# Dev Notes:
# - MSDN on where to store app data files:
# http://support.microsoft.com/default.aspx?scid=kb;en-us;310294#XSLTH3194121123120121120120
# - Mac OS X: http://developer.apple.com/documentation/MacOSX/Conceptual/BPFileSystem/index.html
# - XDG spec for Un*x: http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
__version_info__ = (1, 4, 0)
__version__ = '.'.join(map(str, __version_info__))
import sys
import os
PY3 = sys.version_info[0] == 3
if PY3:
unicode = str
if sys.platform.startswith('java'):
import platform
os_name = platform.java_ver()[3][0]
if os_name.startswith('Windows'): # "Windows XP", "Windows 7", etc.
system = 'win32'
elif os_name.startswith('Mac'): # "Mac OS X", etc.
system = 'darwin'
else: # "Linux", "SunOS", "FreeBSD", etc.
# Setting this to "linux2" is not ideal, but only Windows or Mac
# are actually checked for and the rest of the module expects
# *sys.platform* style strings.
system = 'linux2'
else:
system = sys.platform
def user_data_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
Mac OS X: ~/Library/Application Support/<AppName>
Unix: ~/.local/share/<AppName> # or in $XDG_DATA_HOME, if defined
Win XP (not roaming): C:\Documents and Settings\<username>\Application Data\<AppAuthor>\<AppName>
Win XP (roaming): C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>
Win 7 (not roaming): C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>
Win 7 (roaming): C:\Users\<username>\AppData\Roaming\<AppAuthor>\<AppName>
For Unix, we follow the XDG spec and support $XDG_DATA_HOME.
That means, by default "~/.local/share/<AppName>".
"""
if system == "win32":
if appauthor is None:
appauthor = appname
const = roaming and "CSIDL_APPDATA" or "CSIDL_LOCAL_APPDATA"
path = os.path.normpath(_get_win_folder(const))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
elif system == 'darwin':
path = os.path.expanduser('~/Library/Application Support/')
if appname:
path = os.path.join(path, appname)
else:
path = os.getenv('XDG_DATA_HOME', os.path.expanduser("~/.local/share"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def site_data_dir(appname=None, appauthor=None, version=None, multipath=False):
"""Return full path to the user-shared data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"multipath" is an optional parameter only applicable to *nix
which indicates that the entire list of data dirs should be
returned. By default, the first item from XDG_DATA_DIRS is
returned, or '/usr/local/share/<AppName>',
if XDG_DATA_DIRS is not set
Typical user data directories are:
Mac OS X: /Library/Application Support/<AppName>
Unix: /usr/local/share/<AppName> or /usr/share/<AppName>
Win XP: C:\Documents and Settings\All Users\Application Data\<AppAuthor>\<AppName>
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory on Vista.)
Win 7: C:\ProgramData\<AppAuthor>\<AppName> # Hidden, but writeable on Win 7.
For Unix, this is using the $XDG_DATA_DIRS[0] default.
WARNING: Do not use this on Windows. See the Vista-Fail note above for why.
"""
if system == "win32":
if appauthor is None:
appauthor = appname
path = os.path.normpath(_get_win_folder("CSIDL_COMMON_APPDATA"))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
elif system == 'darwin':
path = os.path.expanduser('/Library/Application Support')
if appname:
path = os.path.join(path, appname)
else:
# XDG default for $XDG_DATA_DIRS
# only first, if multipath is False
path = os.getenv('XDG_DATA_DIRS',
os.pathsep.join(['/usr/local/share', '/usr/share']))
pathlist = [os.path.expanduser(x.rstrip(os.sep)) for x in path.split(os.pathsep)]
if appname:
if version:
appname = os.path.join(appname, version)
pathlist = [os.sep.join([x, appname]) for x in pathlist]
if multipath:
path = os.pathsep.join(pathlist)
else:
path = pathlist[0]
return path
if appname and version:
path = os.path.join(path, version)
return path
def user_config_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific config dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
Mac OS X: same as user_data_dir
Unix: ~/.config/<AppName> # or in $XDG_CONFIG_HOME, if defined
Win *: same as user_data_dir
For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME.
That means, by deafult "~/.config/<AppName>".
"""
if system in ["win32", "darwin"]:
path = user_data_dir(appname, appauthor, None, roaming)
else:
path = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def site_config_dir(appname=None, appauthor=None, version=None, multipath=False):
"""Return full path to the user-shared data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"multipath" is an optional parameter only applicable to *nix
which indicates that the entire list of config dirs should be
returned. By default, the first item from XDG_CONFIG_DIRS is
returned, or '/etc/xdg/<AppName>', if XDG_CONFIG_DIRS is not set
Typical user data directories are:
Mac OS X: same as site_data_dir
Unix: /etc/xdg/<AppName> or $XDG_CONFIG_DIRS[i]/<AppName> for each value in
$XDG_CONFIG_DIRS
Win *: same as site_data_dir
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory on Vista.)
For Unix, this is using the $XDG_CONFIG_DIRS[0] default, if multipath=False
WARNING: Do not use this on Windows. See the Vista-Fail note above for why.
"""
if system in ["win32", "darwin"]:
path = site_data_dir(appname, appauthor)
if appname and version:
path = os.path.join(path, version)
else:
# XDG default for $XDG_CONFIG_DIRS
# only first, if multipath is False
path = os.getenv('XDG_CONFIG_DIRS', '/etc/xdg')
pathlist = [os.path.expanduser(x.rstrip(os.sep)) for x in path.split(os.pathsep)]
if appname:
if version:
appname = os.path.join(appname, version)
pathlist = [os.sep.join([x, appname]) for x in pathlist]
if multipath:
path = os.pathsep.join(pathlist)
else:
path = pathlist[0]
return path
def user_cache_dir(appname=None, appauthor=None, version=None, opinion=True):
r"""Return full path to the user-specific cache dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"opinion" (boolean) can be False to disable the appending of
"Cache" to the base app data dir for Windows. See
discussion below.
Typical user cache directories are:
Mac OS X: ~/Library/Caches/<AppName>
Unix: ~/.cache/<AppName> (XDG default)
Win XP: C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>\Cache
Vista: C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>\Cache
On Windows the only suggestion in the MSDN docs is that local settings go in
the `CSIDL_LOCAL_APPDATA` directory. This is identical to the non-roaming
app data dir (the default returned by `user_data_dir` above). Apps typically
put cache data somewhere *under* the given dir here. Some examples:
...\Mozilla\Firefox\Profiles\<ProfileName>\Cache
...\Acme\SuperApp\Cache\1.0
OPINION: This function appends "Cache" to the `CSIDL_LOCAL_APPDATA` value.
This can be disabled with the `opinion=False` option.
"""
if system == "win32":
if appauthor is None:
appauthor = appname
path = os.path.normpath(_get_win_folder("CSIDL_LOCAL_APPDATA"))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
if opinion:
path = os.path.join(path, "Cache")
elif system == 'darwin':
path = os.path.expanduser('~/Library/Caches')
if appname:
path = os.path.join(path, appname)
else:
path = os.getenv('XDG_CACHE_HOME', os.path.expanduser('~/.cache'))
if appname:
path = os.path.join(path, appname.lower().replace(' ', '-'))
if appname and version:
path = os.path.join(path, version)
return path
def user_log_dir(appname=None, appauthor=None, version=None, opinion=True):
r"""Return full path to the user-specific log dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"opinion" (boolean) can be False to disable the appending of
"Logs" to the base app data dir for Windows, and "log" to the
base cache dir for Unix. See discussion below.
Typical user cache directories are:
Mac OS X: ~/Library/Logs/<AppName>
Unix: ~/.cache/<AppName>/log # or under $XDG_CACHE_HOME if defined
Win XP: C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>\Logs
Vista: C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>\Logs
On Windows the only suggestion in the MSDN docs is that local settings
go in the `CSIDL_LOCAL_APPDATA` directory. (Note: I'm interested in
examples of what some windows apps use for a logs dir.)
OPINION: This function appends "Logs" to the `CSIDL_LOCAL_APPDATA`
value for Windows and appends "log" to the user cache dir for Unix.
This can be disabled with the `opinion=False` option.
"""
if system == "darwin":
path = os.path.join(
os.path.expanduser('~/Library/Logs'),
appname)
elif system == "win32":
path = user_data_dir(appname, appauthor, version)
version = False
if opinion:
path = os.path.join(path, "Logs")
else:
path = user_cache_dir(appname, appauthor, version)
version = False
if opinion:
path = os.path.join(path, "log")
if appname and version:
path = os.path.join(path, version)
return path
class AppDirs(object):
"""Convenience wrapper for getting application dirs."""
def __init__(self, appname, appauthor=None, version=None, roaming=False,
multipath=False):
self.appname = appname
self.appauthor = appauthor
self.version = version
self.roaming = roaming
self.multipath = multipath
@property
def user_data_dir(self):
return user_data_dir(self.appname, self.appauthor,
version=self.version, roaming=self.roaming)
@property
def site_data_dir(self):
return site_data_dir(self.appname, self.appauthor,
version=self.version, multipath=self.multipath)
@property
def user_config_dir(self):
return user_config_dir(self.appname, self.appauthor,
version=self.version, roaming=self.roaming)
@property
def site_config_dir(self):
return site_config_dir(self.appname, self.appauthor,
version=self.version, multipath=self.multipath)
@property
def user_cache_dir(self):
return user_cache_dir(self.appname, self.appauthor,
version=self.version)
@property
def user_log_dir(self):
return user_log_dir(self.appname, self.appauthor,
version=self.version)
#---- internal support stuff
def _get_win_folder_from_registry(csidl_name):
"""This is a fallback technique at best. I'm not sure if using the
registry for this guarantees us the correct answer for all CSIDL_*
names.
"""
import _winreg
shell_folder_name = {
"CSIDL_APPDATA": "AppData",
"CSIDL_COMMON_APPDATA": "Common AppData",
"CSIDL_LOCAL_APPDATA": "Local AppData",
}[csidl_name]
key = _winreg.OpenKey(
_winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders"
)
dir, type = _winreg.QueryValueEx(key, shell_folder_name)
return dir
def _get_win_folder_with_pywin32(csidl_name):
from win32com.shell import shellcon, shell
dir = shell.SHGetFolderPath(0, getattr(shellcon, csidl_name), 0, 0)
# Try to make this a unicode path because SHGetFolderPath does
# not return unicode strings when there is unicode data in the
# path.
try:
dir = unicode(dir)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in dir:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
try:
import win32api
dir = win32api.GetShortPathName(dir)
except ImportError:
pass
except UnicodeError:
pass
return dir
def _get_win_folder_with_ctypes(csidl_name):
import ctypes
csidl_const = {
"CSIDL_APPDATA": 26,
"CSIDL_COMMON_APPDATA": 35,
"CSIDL_LOCAL_APPDATA": 28,
}[csidl_name]
buf = ctypes.create_unicode_buffer(1024)
ctypes.windll.shell32.SHGetFolderPathW(None, csidl_const, None, 0, buf)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in buf:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
buf2 = ctypes.create_unicode_buffer(1024)
if ctypes.windll.kernel32.GetShortPathNameW(buf.value, buf2, 1024):
buf = buf2
return buf.value
def _get_win_folder_with_jna(csidl_name):
import array
from com.sun import jna
from com.sun.jna.platform import win32
buf_size = win32.WinDef.MAX_PATH * 2
buf = array.zeros('c', buf_size)
shell = win32.Shell32.INSTANCE
shell.SHGetFolderPath(None, getattr(win32.ShlObj, csidl_name), None, win32.ShlObj.SHGFP_TYPE_CURRENT, buf)
dir = jna.Native.toString(buf.tostring()).rstrip("\0")
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in dir:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
buf = array.zeros('c', buf_size)
kernel = win32.Kernel32.INSTANCE
if kernal.GetShortPathName(dir, buf, buf_size):
dir = jna.Native.toString(buf.tostring()).rstrip("\0")
return dir
if system == "win32":
try:
import win32com.shell
_get_win_folder = _get_win_folder_with_pywin32
except ImportError:
try:
from ctypes import windll
_get_win_folder = _get_win_folder_with_ctypes
except ImportError:
try:
import com.sun.jna
_get_win_folder = _get_win_folder_with_jna
except ImportError:
_get_win_folder = _get_win_folder_from_registry
#---- self test code
if __name__ == "__main__":
appname = "MyApp"
appauthor = "MyCompany"
props = ("user_data_dir", "site_data_dir",
"user_config_dir", "site_config_dir",
"user_cache_dir", "user_log_dir")
print("-- app dirs (with optional 'version')")
dirs = AppDirs(appname, appauthor, version="1.0")
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (without optional 'version')")
dirs = AppDirs(appname, appauthor)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (without optional 'appauthor')")
dirs = AppDirs(appname)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (with disabled 'appauthor')")
dirs = AppDirs(appname, appauthor=False)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))

49
package_manager/cache.py Normal file
View File

@@ -0,0 +1,49 @@
# ##### BEGIN GPL LICENSE BLOCK #####
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# ##### END GPL LICENSE BLOCK #####
import os
import logging
import pathlib
from . import appdirs
log = logging.getLogger(__name__)
def cache_directory(*subdirs) -> pathlib.Path:
"""Returns an OS-specifc cache location, and ensures it exists.
Should be replaced with a call to bpy.utils.user_resource('CACHE', ...)
once https://developer.blender.org/T47684 is finished.
:param subdirs: extra subdirectories inside the cache directory.
>>> cache_directory()
'.../blender_cloud/your_username'
>>> cache_directory('sub1', 'sub2')
'.../blender_cloud/your_username/sub1/sub2'
"""
# TODO: use bpy.utils.user_resource('CACHE', ...)
# once https://developer.blender.org/T47684 is finished.
user_cache_dir = appdirs.user_cache_dir(appname='Blender', appauthor=False)
cache_dir = pathlib.Path(user_cache_dir) / 'blender_package_manager' / pathlib.Path(*subdirs)
cache_dir.mkdir(mode=0o700, parents=True, exist_ok=True)
return cache_dir

636
package_manager/subproc.py Normal file
View File

@@ -0,0 +1,636 @@
"""
All the stuff that needs to run in a subprocess.
"""
import logging
import pathlib
import shutil
import json
class Message:
"""Superclass for all message sent over pipes."""
class BlenderMessage(Message):
"""Superclass for all messages sent from Blender to the subprocess."""
class SubprocMessage(Message):
"""Superclass for all messages sent from the subprocess to Blender."""
class Abort(BlenderMessage):
"""Sent when the user requests abortion of a task."""
class Progress(SubprocMessage):
"""Send from subprocess to Blender to report progress.
:ivar progress: the progress percentage, from 0-1.
"""
def __init__(self, progress: float):
self.progress = progress
class SubprocError(SubprocMessage):
"""Superclass for all fatal error messages sent from the subprocess."""
def __init__(self, message: str):
self.message = message
class SubprocWarning(SubprocMessage):
"""Superclass for all non-fatal warning messages sent from the subprocess."""
def __init__(self, message: str):
self.message = message
class InstallError(SubprocError):
"""Sent when there was an error installing something."""
class UninstallError(SubprocError):
"""Sent when there was an error uninstalling something."""
class FileConflictError(InstallError):
"""Sent when installation would overwrite existing files."""
def __init__(self, message: str, conflicts: list):
self.message = message
self.conflicts = conflicts
class DownloadError(SubprocMessage):
"""Sent when there was an error downloading something."""
def __init__(self, status_code: int, description: str):
self.status_code = status_code
self.description = description
class Success(SubprocMessage):
"""Sent when an operation finished sucessfully."""
class RepositoryResult(SubprocMessage):
"""Sent when an operation returns a repository to be used on the parent process."""
def __init__(self, repository: dict):
self.repository = repository
class Aborted(SubprocMessage):
"""Sent as response to Abort message."""
class InstallException(Exception):
"""Raised when there is an error during installation"""
class DownloadException(Exception):
"""Raised when there is an error downloading something"""
class BadRepository(Exception):
"""Raised when reading a repository results in an error"""
class InplaceBackup:
"""Utility for moving a file out of the way by appending a '~'"""
log = logging.getLogger('%s.inplace-backup' % __name__)
def __init__(self, path: pathlib.Path):
self.path = path
self.backup()
def backup(self):
"""Move 'path' to 'path~'"""
if not self.path.exists():
raise FileNotFoundError("Can't backup path which doesn't exist")
self.backup_path = pathlib.Path(str(self.path) + '~')
if self.backup_path.exists():
self.log.warning("Overwriting existing backup '{}'".format(self.backup_path))
self._rm(self.backup_path)
shutil.move(str(self.path), str(self.backup_path))
def restore(self):
"""Move 'path~' to 'path'"""
try:
getattr(self, 'backup_path')
except AttributeError as err:
raise RuntimeError("Can't restore file before backing it up") from err
if not self.backup_path.exists():
raise FileNotFoundError("Can't restore backup which doesn't exist")
if self.path.exists():
self.log.warning("Overwriting '{0}' with backup file".format(self.path))
self._rm(self.path)
shutil.move(str(self.backup_path), str(self.path))
def remove(self):
"""Remove 'path~'"""
self._rm(self.backup_path)
def _rm(self, path: pathlib.Path):
"""Just delete whatever is specified by `path`"""
if path.is_dir():
shutil.rmtree(str(path))
else:
path.unlink()
class Package:
"""
Stores package methods and metadata
"""
log = logging.getLogger(__name__ + ".Package")
def __init__(self, package_dict:dict = None):
self.bl_info = {}
self.url = ""
self.files = []
self.set_from_dict(package_dict)
self.installed = False
self.repository = None
def to_dict(self) -> dict:
"""
Return a dict representation of the package
"""
return {
'bl_info': self.bl_info,
'url': self.url,
'files': self.files,
}
def set_from_dict(self, package_dict: dict):
"""
Get attributes from a dict such as produced by `to_dict`
"""
if package_dict is None:
package_dict = {}
for attr in ('files', 'url', 'bl_info'):
if package_dict.get(attr) is not None:
setattr(self, attr, package_dict[attr])
#bl_info convenience getters
@property
def name(self) -> str:
"""Get name from bl_info"""
return self.bl_info['name']
@property
def description(self) -> str:
"""Get description from bl_info"""
return self.bl_info['description']
@property
def version(self) -> tuple:
"""Get version from bl_info"""
return tuple(self.bl_info['version'])
# @classmethod
# def from_dict(cls, package_dict: dict):
# """
# Return a Package with values from dict
# """
# pkg = cls()
# pkg.set_from_dict(package_dict)
@classmethod
def from_blinfo(cls, blinfo: dict):
"""
Return a Package with bl_info filled in
"""
return cls({'bl_info': blinfo})
@classmethod
def from_module(cls, module):
"""
Return a Package object from an addon module
"""
from pathlib import Path
filepath = Path(module.__file__)
if filepath.name == '__init__.py':
filepath = filepath.parent
pkg = cls()
pkg.files = [filepath.name]
pkg.installed_location = str(filepath)
try:
pkg.bl_info = module.bl_info
except AttributeError as err:
raise BadAddon("Module does not appear to be an addon; no bl_info attribute") from err
return pkg
class Repository:
"""
Stores repository metadata (including packages)
"""
log = logging.getLogger(__name__ + ".Repository")
def __init__(self, url=None):
if url is None:
url = ""
self.set_from_dict({'url': url})
# def cleanse_packagelist(self):
# """Remove empty packages (no bl_info), packages with no name"""
def refresh(self):
"""
Requests repo.json from URL and embeds etag/last-modification headers
"""
import requests
if self.url is None:
raise ValueError("Cannot refresh repository without a URL")
self.log.debug("Refreshing repository from %s", self.url)
req_headers = {}
# Do things this way to avoid adding empty objects/None to the req_headers dict
try:
req_headers['If-None-Match'] = self._headers['etag']
except KeyError:
pass
try:
req_headers['If-Modified-Since'] = self._headers['last-modified']
except KeyError:
pass
try:
resp = requests.get(self.url, headers=req_headers, timeout=60)
except requests.exceptions.RequestException as err:
raise DownloadException(err) from err
try:
resp.raise_for_status()
except requests.HTTPError as err:
self.log.error('Error downloading %s: %s', self.url, err)
raise DownloadException(resp.status_code, resp.reason) from err
if resp.status_code == requests.codes.not_modified:
self.log.debug("Packagelist not modified")
return
resp_headers = {}
try:
resp_headers['etag'] = resp.headers['etag']
except KeyError:
pass
try:
resp_headers['last-modified'] = resp.headers['last-modified']
except KeyError:
pass
self.log.debug("Found headers: %s", resp_headers)
try:
repodict = resp.json()
except json.decoder.JSONDecodeError:
self.log.exception("Failed to parse downloaded repository")
raise DownloadException("Could not parse repository downloaded from '%s'. Are you sure this is the correct URL?" % self.url)
repodict['_headers'] = resp_headers
self.set_from_dict(repodict)
def to_dict(self, sort=False, ids=False) -> dict:
"""
Return a dict representation of the repository
"""
packages = [p.to_dict() for p in self.packages]
if sort:
packages.sort(key=lambda p: p['bl_info']['name'].lower())
if ids:
for pkg in packages:
# hash may be too big for a C int
pkg['id'] = str(hash(pkg['url'] + pkg['bl_info']['name'] + self.name + self.url))
return {
'name': self.name,
'packages': packages,
'url': self.url,
'_headers': self._headers,
}
def set_from_dict(self, repodict: dict):
"""
Get repository attributes from a dict such as produced by `to_dict`
"""
def initialize(item, value):
if item is None:
return value
else:
return item
#Be certain to initialize everything; downloaded packagelist might contain null values
name = initialize(repodict.get('name'), "")
url = initialize(repodict.get('url'), "")
packages = initialize(repodict.get('packages'), [])
headers = initialize(repodict.get('_headers'), {})
self.name = name
self.url = url
self.packages = [Package(pkg) for pkg in packages]
self._headers = headers
@classmethod
def from_dict(cls, repodict: dict):
"""
Like `set_from_dict`, but immutable
"""
repo = cls()
repo.set_from_dict(repodict)
return repo
def to_file(self, path: pathlib.Path):
"""
Dump repository to a json file at `path`.
"""
if self.packages is None:
self.log.warning("Writing an empty repository")
with path.open('w', encoding='utf-8') as repo_file:
json.dump(self.to_dict(), repo_file, indent=4, sort_keys=True)
self.log.debug("Repository written to %s" % path)
@classmethod
def from_file(cls, path: pathlib.Path):
"""
Read repository from a json file at `path`.
"""
repo_file = path.open('r', encoding='utf-8')
with repo_file:
try:
repo = cls.from_dict(json.load(repo_file))
except Exception as err:
raise BadRepository from err
cls.log.debug("Repository read from %s", path)
return repo
def _download(pipe_to_blender, package_url: str, download_dir: pathlib.Path) -> pathlib.Path:
"""Downloads the given package
:returns: path to the downloaded file, or None in case of error.
"""
import requests
log = logging.getLogger('%s.download' % __name__)
log.info('Going to download %s to %s', package_url, download_dir)
pipe_to_blender.send(Progress(0.0))
log.info('Downloading %s', package_url)
try:
resp = requests.get(package_url, stream=True, verify=True)
except requests.exceptions.RequestException as err:
pipe_to_blender.send(DownloadError(1, err))
raise
try:
resp.raise_for_status()
except requests.HTTPError as ex:
log.error('Error downloading %s: %s', package_url, ex)
pipe_to_blender.send(DownloadError(resp.status_code, str(ex)))
return None
try:
# Use float so that we can also use infinity
content_length = float(resp.headers['content-length'])
except KeyError:
log.warning('Server did not send content length, cannot report progress.')
content_length = float('inf')
# TODO: check if there's enough disk space.
# TODO: get filename from Content-Disposition header, if available.
# TODO: use urllib.parse to parse the URL.
local_filename = package_url.split('/')[-1] or 'download.tmp'
local_fpath = download_dir / local_filename
downloaded_length = 0
with local_fpath.open('wb') as outfile:
for chunk in resp.iter_content(chunk_size=1024 ** 2):
# Handle abort messages from Blender
while pipe_to_blender.poll():
recvd = pipe_to_blender.recv()
if isinstance(recvd, Abort):
log.warning('Aborting download of %s by request', package_url)
pipe_to_blender.send(Aborted())
return None
log.warning('Unknown message %s received, ignoring', recvd)
if not chunk: # filter out keep-alive new chunks
continue
outfile.write(chunk)
downloaded_length += len(chunk)
# TODO: use multiplier for progress, so that we can count up to 70% and
# leave 30% "progress" for installation of the package.
pipe_to_blender.send(Progress(downloaded_length / content_length))
return local_fpath
def _add_to_installed(storage_path: pathlib.Path, pkg: Package):
"""Add pkg to local repository"""
repo_path = storage_path / 'local.json'
if repo_path.exists():
repo = Repository.from_file(repo_path)
else:
repo = Repository()
repo.packages.append(pkg)
repo.to_file(repo_path)
def _remove_from_installed(storage_path: pathlib.Path, pkg: Package):
"""Remove pkg from local repository"""
repo = Repository.from_file(storage_path / 'local.json')
#TODO: this won't work, compare by name? (watch out for conflicts though)
repo.packages.remove(pkg)
def _install(pipe_to_blender, pkgpath: pathlib.Path, dest: pathlib.Path, searchpaths: list):
"""Extracts/moves package at `pkgpath` to `dest`"""
import zipfile
log = logging.getLogger('%s.install' % __name__)
log.debug("Starting installation")
pipe_to_blender.send(Progress(0.0))
if not pkgpath.is_file():
raise InstallException("Package isn't a file")
if not dest.is_dir():
raise InstallException("Destination is not a directory")
# TODO: check to make sure addon/package isn't already installed elsewhere
# The following is adapted from `addon_install` in bl_operators/wm.py
# check to see if the file is in compressed format (.zip)
if zipfile.is_zipfile(pkgpath):
log.debug("Package is zipfile")
try:
file_to_extract = zipfile.ZipFile(str(pkgpath), 'r')
except Exception as err:
raise InstallException("Failed to read zip file: %s" % err) from err
def root_files(filelist: list) -> list:
"""Some string parsing to get a list of the root contents of a zip from its namelist"""
rootlist = []
for f in filelist:
# Get all names which have no path separators (root level files)
# or have a single path separator at the end (root level directories).
if len(f.rstrip('/').split('/')) == 1:
rootlist.append(f)
return rootlist
conflicts = [dest / f for f in root_files(file_to_extract.namelist()) if (dest / f).exists()]
backups = []
for conflict in conflicts:
log.debug("Creating backup of conflict %s", conflict)
backups.append(InplaceBackup(conflict))
try:
file_to_extract.extractall(str(dest))
except Exception as err:
for backup in backups:
backup.restore()
raise InstallException("Failed to extract zip file to '%s': %s" % (dest, err)) from err
for backup in backups:
backup.remove()
else:
log.debug("Package is pyfile")
dest_file = (dest / pkgpath.name)
if dest_file.exists():
backup = InplaceBackup(dest_file)
try:
shutil.copyfile(str(pkgpath), str(dest_file))
except Exception as err:
backup.restore()
raise InstallException("Failed to copy file to '%s': %s" % (dest, err)) from err
try:
pkgpath.unlink()
log.debug("Removed cached package: %s", pkgpath)
except Exception as err:
pipe_to_blender.send(SubprocWarning("Install succeeded, but failed to remove package from cache: %s" % err))
log.warning("Failed to remove package from cache: %s", err)
pipe_to_blender.send(Progress(1.0))
return
def download_and_install(pipe_to_blender, package_url: str, install_path: pathlib.Path, search_paths: list):
"""Downloads and installs the given package."""
from . import cache
log = logging.getLogger('%s.download_and_install' % __name__)
cache_dir = cache.cache_directory('downloads')
downloaded = _download(pipe_to_blender, package_url, cache_dir)
if not downloaded:
log.debug('Download failed/aborted, not going to install anything.')
return
try:
_install(pipe_to_blender, downloaded, install_path, search_paths)
pipe_to_blender.send(Success())
except InstallException as err:
log.exception("Failed to install package: %s", err)
pipe_to_blender.send(InstallError(err))
def uninstall(pipe_to_blender, package: Package, install_path: pathlib.Path):
"""Deletes the given package's files from the install directory"""
#TODO: move package to cache and present an "undo" button to user, to give nicer UX on misclicks
#TODO: move this to a shared utility function
# Duplicated code with InplaceBackup class
def _rm(path: pathlib.Path):
"""Just delete whatever is specified by `path`"""
if path.is_dir():
shutil.rmtree(str(path))
else:
path.unlink()
for pkgfile in [install_path / pathlib.Path(p) for p in package.files]:
if not pkgfile.exists():
pipe_to_blender.send(UninstallError("Could not find file owned by package: '%s'. Refusing to uninstall." % pkgfile))
return None
for pkgfile in [install_path / pathlib.Path(p) for p in package.files]:
_rm(pkgfile)
pipe_to_blender.send(Success())
def _load_repo(storage_path: pathlib.Path) -> Repository:
"""Reads the stored repositories"""
repo_path = storage_path / 'repo.json'
return Repository.from_file(repo_path)
def refresh(pipe_to_blender, storage_path: pathlib.Path, repository_url: str):
"""Retrieves and stores the given repository"""
log = logging.getLogger(__name__ + '.refresh')
repo_path = storage_path / 'repo.json'
if repo_path.exists():
repo = Repository.from_file(repo_path)
if repo.url != repository_url:
# We're getting a new repository
repo = Repository(repository_url)
else:
repo = Repository(repository_url)
try:
repo.refresh()
except DownloadException as err:
pipe_to_blender.send(SubprocError(err))
return
repo.to_file(repo_path) # TODO: this always writes even if repo wasn't changed
pipe_to_blender.send(RepositoryResult(repo))
pipe_to_blender.send(Success())
def load(pipe_to_blender, storage_path: pathlib.Path):
"""Reads the stored repository and sends the result to blender"""
try:
repo = _load_repo(storage_path)
pipe_to_blender.send(RepositoryResult(repo.to_dict(sort=True, ids=True)))
pipe_to_blender.send(Success())
return repo
except BadRepository as err:
pipe_to_blender.send(SubprocError("Failed to read repository: %s" % err))
# def load_local(pipe_to_blender
def debug_hang():
"""Hangs for an hour. For testing purposes only."""
import time
time.sleep(3600)