""" Blender Package manager """ bl_info = { 'name': 'Package Manager', 'author': 'Ellwood Zwovic (gandalf3), Sybren A. Stüvel, Mitchell Stokes (Moguri)', 'version': (0, 1, 0), 'blender': (2, 79, 0), 'location': 'User Preferences > Packages', 'description': 'A tool for installing, updating, and otherwise managing, addons and packages.', 'category': 'System', 'support': 'TESTING', } import logging log = logging.getLogger(__name__) if 'bpy' in locals(): from importlib import reload def recursive_reload(mod): """Reloads the given module and all its submodules""" log.debug("Reloading %s", mod) from types import ModuleType reloaded_mod = reload(mod) for attr in [getattr(mod, attr_name) for attr_name in dir(mod)]: if type(attr) is ModuleType and attr.__name__.startswith(mod.__name__): recursive_reload(attr) return reloaded_mod subproc = recursive_reload(subproc) messages = recursive_reload(messages) utils = recursive_reload(utils) bpkg = recursive_reload(bpkg) Package = bpkg.Package else: from . import subproc from . import messages from . import bpkg from . import utils from .bpkg import Package import bpy from collections import OrderedDict # global list of all known packages, indexed by name _packages = OrderedDict() class ConsolidatedPackage: """ Stores a grouping of different versions of the same package """ log = logging.getLogger(__name__ + ".ConsolidatedPackage") def __init__(self, pkg=None): self.versions = [] if pkg is not None: self.add_version(pkg) @property def installed(self) -> bool: """Return true if any version of this package is installed""" for pkg in self.versions: if pkg.installed: return True return False def get_latest_installed_version(self) -> bpkg.Package: """ Return the installed package with the highest version number. If no packages are installed, return None. """ #self.versions is always sorted newer -> older, so we can just grab the first we find for pkg in self.versions: if pkg.installed: return pkg return None def get_latest_version(self) -> bpkg.Package: """Return package with highest version number""" return self.versions[0] # this is always sorted with the highest on top def add_version(self, pkg: Package): self.versions.append(pkg) self.versions.sort(key=lambda v: v.version, reverse=True) def __iter__(self): return (pkg for pkg in self.versions) class SubprocMixin: """Mix-in class for things that need to be run in a subprocess.""" log = logging.getLogger(__name__ + '.SubprocMixin') _state = 'INITIALIZING' _abort_timeout = 0 # time at which we stop waiting for an abort response and just terminate the process # Mapping from message type (see bpkg_manager.subproc) to handler function. # Should be constructed before modal() gets called. msg_handlers = {} def execute(self, context): return self.invoke(context, None) def quit(self): """Signals the state machine to stop this operator from running.""" try: self.cancel(bpy.context) except AttributeError: pass self._state = 'QUIT' def invoke(self, context, event): import multiprocessing self.pipe_blender, self.pipe_subproc = multiprocessing.Pipe() # The subprocess should just be terminated when Blender quits. Without this, # Blender would hang while closing, until the subprocess terminates itself. # TODO: Perhaps it would be better to fork when blender exits? self.process = self.create_subprocess() self.process.daemon = True self.process.start() self._state = 'RUNNING' wm = context.window_manager wm.modal_handler_add(self) self.timer = wm.event_timer_add(0.1, context.window) return {'RUNNING_MODAL'} def modal(self, context, event): import time if event.type == 'ESC': self.log.warning('Escape pressed, sending abort signal to subprocess') self.abort() return {'PASS_THROUGH'} if event.type != 'TIMER': return {'PASS_THROUGH'} if self._state == 'ABORTING' and time.time() > self._abort_timeout: self.log.error('No response from subprocess to abort request, terminating it.') self.report({'ERROR'}, 'No response from subprocess to abort request, terminating it.') self.process.terminate() self._finish(context) return {'CANCELLED'} while self.pipe_blender.poll(): self.handle_received_data() if self._state == 'QUIT': self._finish(context) return {'FINISHED'} if not self.process.is_alive(): self.report_process_died() self.cancel(context) self._finish(context) return {'CANCELLED'} return {'RUNNING_MODAL'} def abort(self): import time # Allow the subprocess 10 seconds to repsond to our abort message. self._abort_timeout = time.time() + 10 self._state = 'ABORTING' self.pipe_blender.send(messages.Abort()) def _finish(self, context): import multiprocessing global bpkg_operation_running context.window_manager.event_timer_remove(self.timer) bpkg_operation_running = False if self.process and self.process.is_alive(): self.log.debug('Waiting for subprocess to quit') try: self.process.join(timeout=10) except multiprocessing.TimeoutError: self.log.warning('Subprocess is hanging, terminating it forcefully.') self.process.terminate() else: self.log.debug('Subprocess stopped with exit code %i', self.process.exitcode) def handle_received_data(self): recvd = self.pipe_blender.recv() self.log.debug('Received message from subprocess: %s', recvd) try: handler = self.msg_handlers[type(recvd)] except KeyError: self.log.error('Unable to handle received message %s', recvd) # Maybe we shouldn't show this to the user? self.report({'WARNING'}, 'Unable to handle received message %s' % recvd) return handler(recvd) def create_subprocess(self): """Implement this in a subclass. :rtype: multiprocessing.Process """ raise NotImplementedError() def report_process_died(self): """Provides the user with sensible information when the process has died. Implement this in a subclass. """ raise NotImplementedError() class PACKAGE_OT_install(SubprocMixin, bpy.types.Operator): bl_idname = 'package.install' bl_label = 'Install package' bl_description = 'Downloads and installs a Blender add-on package' bl_options = {'REGISTER'} package_name = bpy.props.StringProperty( name='package_name', description='The name of the package to install' ) log = logging.getLogger(__name__ + '.PACKAGE_OT_install') def invoke(self, context, event): if not self.package_name: self.report({'ERROR'}, 'Package name not given') return {'CANCELLED'} return super().invoke(context, event) def create_subprocess(self): """Starts the download process. Also registers the message handlers. :rtype: multiprocessing.Process """ import multiprocessing self.msg_handlers = { messages.Progress: self._subproc_progress, messages.DownloadError: self._subproc_download_error, messages.InstallError: self._subproc_install_error, messages.Success: self._subproc_success, messages.Aborted: self._subproc_aborted, } global _packages package = _packages[self.package_name].get_latest_version() import pathlib # TODO: We need other paths besides this one on subprocess end, so it might be better to pass them all at once. # For now, just pass this one. install_path = pathlib.Path(bpy.utils.user_resource('SCRIPTS', 'addons', create=True)) self.log.debug("Using %s as install path", install_path) import addon_utils proc = multiprocessing.Process(target=messages.download_and_install_package, args=(self.pipe_subproc, package, install_path)) return proc def _subproc_progress(self, progress: messages.Progress): self.log.info('Task progress at %i%%', progress.progress * 100) def _subproc_download_error(self, error: messages.DownloadError): self.report({'ERROR'}, 'Unable to download package: %s' % error.description) self.quit() def _subproc_install_error(self, error: messages.InstallError): self.report({'ERROR'}, 'Unable to install package: %s' % error.message) self.quit() def _subproc_success(self, success: messages.Success): self.report({'INFO'}, 'Package installed successfully') bpy.ops.package.refresh_packages() self.quit() def _subproc_aborted(self, aborted: messages.Aborted): self.report({'ERROR'}, 'Package installation aborted per your request') self.quit() def report_process_died(self): if self.process.exitcode: self.log.error('Process died without telling us! Exit code was %i', self.process.exitcode) self.report({'ERROR'}, 'Error downloading package, exit code %i' % self.process.exitcode) else: self.log.error('Process died without telling us! Exit code was 0 though') self.report({'WARNING'}, 'Error downloading package, but process finished OK. This is weird.') class PACKAGE_OT_uninstall(SubprocMixin, bpy.types.Operator): bl_idname = 'package.uninstall' bl_label = 'Install package' bl_description = 'Downloads and installs a Blender add-on package' bl_options = {'REGISTER'} package_name = bpy.props.StringProperty(name='package_name', description='The name of the package to uninstall') log = logging.getLogger(__name__ + '.PACKAGE_OT_uninstall') def invoke(self, context, event): if not self.package_name: self.report({'ERROR'}, 'Package name not given') return {'CANCELLED'} return super().invoke(context, event) def create_subprocess(self): """Starts the uninstall process and registers the message handlers. :rtype: multiprocessing.Process """ import multiprocessing self.msg_handlers = { messages.UninstallError: self._subproc_uninstall_error, messages.Success: self._subproc_success, } import pathlib install_path = pathlib.Path(bpy.utils.user_resource('SCRIPTS', 'addons', create=True)) global _packages package = _packages[self.package_name].get_latest_version() proc = multiprocessing.Process(target=subproc.uninstall_package, args=(self.pipe_subproc, package, install_path)) return proc def _subproc_uninstall_error(self, error: messages.InstallError): self.report({'ERROR'}, error.message) self.quit() def _subproc_success(self, success: messages.Success): self.report({'INFO'}, 'Package uninstalled successfully') bpy.ops.package.refresh_packages() self.quit() def report_process_died(self): if self.process.exitcode: self.log.error('Process died without telling us! Exit code was %i', self.process.exitcode) self.report({'ERROR'}, 'Error downloading package, exit code %i' % self.process.exitcode) else: self.log.error('Process died without telling us! Exit code was 0 though') self.report({'WARNING'}, 'Error downloading package, but process finished OK. This is weird.') def get_packages_from_disk(refresh=False) -> list: """Get list of packages installed on disk""" import addon_utils return [Package.from_module(mod) for mod in addon_utils.modules(refresh=refresh)] def get_packages_from_repo() -> list: """Get list of packages from cached repository lists (does not refresh them from server)""" import pathlib storage_path = pathlib.Path(bpy.utils.user_resource('CONFIG', 'packages', create=True)) / 'repo.json' try: repo = bpkg.Repository.from_file(storage_path) except FileNotFoundError: return [] for pkg in repo.packages: pkg.repository = repo.name return repo.packages class PACKAGE_OT_refresh_packages(bpy.types.Operator): bl_idname = "package.refresh_packages" bl_label = "Refresh Packages" bl_description = "Scan for packages on disk" log = logging.getLogger(__name__ + ".PACKAGE_OT_refresh_packages") def execute(self, context): global _packages installed_packages = get_packages_from_disk(refresh=True) available_packages = get_packages_from_repo() _packages = build_composite_packagelist(installed_packages, available_packages) context.area.tag_redraw() return {'FINISHED'} class PACKAGE_OT_refresh_repositories(SubprocMixin, bpy.types.Operator): bl_idname = "package.refresh_repositories" bl_label = "Refresh Repositories" bl_description = 'Check repositories for new and updated packages' bl_options = {'REGISTER'} log = logging.getLogger(__name__ + ".PACKAGE_OT_refresh_repositories") _running = False def invoke(self, context, event): self.repolist = bpy.context.user_preferences.addons[__package__].preferences.repositories if len(self.repolist) == 0: self.report({'ERROR'}, "No repositories to refresh") return {'CANCELLED'} PACKAGE_OT_refresh_repositories._running = True return super().invoke(context, event) @classmethod def poll(cls, context): return not cls._running def cancel(self, context): PACKAGE_OT_refresh_repositories._running = False context.area.tag_redraw() def create_subprocess(self): """Starts the download process. Also registers the message handlers. :rtype: multiprocessing.Process """ import multiprocessing #TODO: make sure all possible messages are handled self.msg_handlers = { messages.Progress: self._subproc_progress, messages.SubprocError: self._subproc_error, messages.DownloadError: self._subproc_download_error, messages.Success: self._subproc_success, messages.RepositoryResult: self._subproc_repository_result, messages.BadRepositoryError: self._subproc_repository_error, messages.Aborted: self._subproc_aborted, } import pathlib storage_path = pathlib.Path(bpy.utils.user_resource('CONFIG', 'packages', create=True)) repository_url = self.repolist[0].url proc = multiprocessing.Process(target=subproc.refresh_repository, args=(self.pipe_subproc, storage_path, repository_url)) return proc def _subproc_progress(self, progress: messages.Progress): self.log.info('Task progress at %i%%', progress.progress * 100) def _subproc_error(self, error: messages.SubprocError): self.report({'ERROR'}, 'Unable to refresh package list: %s' % error.message) self.quit() def _subproc_download_error(self, error: messages.DownloadError): self.report({'ERROR'}, 'Unable to download package list: %s' % error.message) self.quit() def _subproc_repository_error(self, error: messages.BadRepositoryError): self.report({'ERROR'}, str(error.message)) self.quit() def _subproc_success(self, success: messages.Success): self.quit() def _subproc_repository_result(self, result: messages.RepositoryResult): available_packages = result.repository.packages installed_packages = get_packages_from_disk(refresh=False) # TODO: deduplicate creation of view-packages.. for pkg in available_packages: pkg.repository = result.repository.name global _packages _packages = build_composite_packagelist(installed_packages, available_packages) self.report({'INFO'}, 'Package list retrieved successfully') def _subproc_aborted(self, aborted: messages.Aborted): self.report({'ERROR'}, 'Package list retrieval aborted per your request') self.quit() def report_process_died(self): if self.process.exitcode: self.log.error('Refresh process died without telling us! Exit code was %i', self.process.exitcode) self.report({'ERROR'}, 'Error refreshing package lists, exit code %i' % self.process.exitcode) else: self.log.error('Refresh process died without telling us! Exit code was 0 though') self.report({'WARNING'}, 'Error refreshing package lists, but process finished OK. This is weird.') #TODO: # monkey patch refresh_repositories and add refresh_packages in the success callback # this way refresh_packages is always called after repositories have been refreshed class PACKAGE_OT_refresh(bpy.types.Operator): bl_idname = "package.refresh" bl_label = "Refresh" bl_description = "Check for new and updated packages" def execute(self, context): bpy.ops.package.refresh_repositories() # getattr(bpy.ops, __package__).refresh_packages() return {'FINISHED'} class RepositoryProperty(bpy.types.PropertyGroup): url = bpy.props.StringProperty(name="URL") status = bpy.props.EnumProperty(name="Status", items=[ ("OK", "Okay", "FILE_TICK"), ("NOTFOUND", "Not found", "ERROR"), ("NOCONNECT", "Could not connect", "QUESTION"), ]) class PACKAGE_UL_repositories(bpy.types.UIList): def draw_item(self, context, layout, data, item, icon, active_data, active_propname): layout.alignment='LEFT' if len(item.name) == 0: layout.label(item['url']) else: layout.label(item.name) class PACKAGE_OT_add_repository(bpy.types.Operator): bl_idname = "package.add_repository" bl_label = "Add Repository" url = bpy.props.StringProperty(name="Repository URL") def invoke(self, context, event): wm = context.window_manager return wm.invoke_props_dialog(self) def execute(self, context): prefs = context.user_preferences.addons[__package__].preferences if len(prefs.repositories) > 0: self.report({'ERROR'}, "Only one repository at a time is currently supported") return {'CANCELLED'} if len(self.url) == 0: self.report({'ERROR'}, "Repository URL not specified") return {'CANCELLED'} repo = prefs.repositories.add() repo.url = utils.parse_repository_url(self.url) context.area.tag_redraw() return {'FINISHED'} class PACKAGE_OT_remove_repository(bpy.types.Operator): bl_idname = "package.remove_repository" bl_label = "Remove Repository" def execute(self, context): prefs = context.user_preferences.addons[__package__].preferences prefs.repositories.remove(prefs.active_repository) return {'FINISHED'} class USERPREF_PT_packages(bpy.types.Panel): bl_label = "Package Management" bl_space_type = 'USER_PREFERENCES' bl_region_type = 'WINDOW' bl_options = {'HIDE_HEADER'} log = logging.getLogger(__name__ + '.USERPREF_PT_packages') # available_packages = [] # installed_packages = [] displayed_packages = [] expanded_packages = [] @classmethod def poll(cls, context): userpref = context.user_preferences return (userpref.active_section == 'PACKAGES') def draw(self, context): layout = self.layout wm = context.window_manager prefs = context.user_preferences.addons[__package__].preferences main = layout.row() spl = main.split(.2) sidebar = spl.column(align=True) pkgzone = spl.column() sidebar.label("Repositories") row = sidebar.row() row.template_list("PACKAGE_UL_repositories", "", prefs, "repositories", prefs, "active_repository") col = row.column(align=True) col.operator(PACKAGE_OT_add_repository.bl_idname, text="", icon='ZOOMIN') col.operator(PACKAGE_OT_remove_repository.bl_idname, text="", icon='ZOOMOUT') sidebar.separator() sidebar.operator(PACKAGE_OT_refresh_repositories.bl_idname) sidebar.separator() sidebar.label("Category") sidebar.prop(wm, "addon_filter", text="") sidebar.separator() sidebar.label("Support level") sidebar.prop(wm, "addon_support") top = pkgzone.row() spl = top.split(.6) spl.prop(wm, "package_search", text="", icon='VIEWZOOM') spl_r = spl.row() spl_r.prop(wm, "package_install_filter", expand=True) def filtered_packages(filters: dict, packages: OrderedDict) -> list: """Returns filtered and sorted list of names of packages which match filters""" #TODO: using lower() for case-insensitive comparison doesn't work in some languages def match_contains(blinfo) -> bool: if blinfo['name'].lower().__contains__(filters['search'].lower()): return True return False def match_startswith(blinfo) -> bool: if blinfo['name'].lower().startswith(filters['search'].lower()): return True return False def match_support(blinfo) -> bool: if 'support' in blinfo: if set((blinfo['support'],)).issubset(filters['support']): return True else: if {'COMMUNITY'}.issubset(filters['support']): return True return False def match_installstate(metapkg: ConsolidatedPackage) -> bool: if filters['installstate'] == 'AVAILABLE': return True if filters['installstate'] == 'INSTALLED': if metapkg.installed: return True if filters['installstate'] == 'UPDATES': if metapkg.installed: if metapkg.get_latest_installed_version().version < metapkg.get_latest_version().version: return True return False def match_category(blinfo) -> bool: if filters['category'].lower() == 'all': return True if 'category' not in blinfo: return False if blinfo['category'].lower() == filters['category'].lower(): return True return False # use two lists as a simple way of putting "matches from the beginning" on top contains = [] startswith = [] for pkgname, metapkg in packages.items(): blinfo = metapkg.versions[0].bl_info if match_category(blinfo)\ and match_support(blinfo)\ and match_installstate(metapkg): if len(filters['search']) == 0: startswith.append(pkgname) continue if match_startswith(blinfo): startswith.append(pkgname) continue if match_contains(blinfo): contains.append(pkgname) continue return startswith + contains def draw_package(metapkg: ConsolidatedPackage, layout: bpy.types.UILayout):# {{{ """Draws the given package""" def collapsed(): """Draw collapsed version of package layout""" lr1 = leftcol.row() lr2 = leftcol.row() if pkg.name: lr1.label(text=pkg.name) if pkg.description: lr2.label(text=pkg.description) lr2.enabled = False #Give name more visual weight def expanded(): """Draw expanded version of package layout""" def fmt_version(version_number: tuple) -> str: """Take version number as a tuple and format it as a string""" vstr = str(version_number[0]) for component in version_number[1:]: vstr += "." + str(component) return vstr row1 = leftcol.row() row1.label(pkg.name) if pkg.description: row2 = leftcol.row() row2.label(pkg.description) # row2.scale_y = 1.2 def draw_metadatum(label: str, value: str, layout: bpy.types.UILayout): """Draw the given key value pair in a new row in given layout container""" row = layout.row() row.scale_y = .8 spl = row.split(.15) spl.label("{}:".format(label)) spl.label(value) # don't compare against None here; we don't want to display empty arrays/strings either if pkg.location: draw_metadatum("Location", pkg.location, leftcol) if pkg.version: draw_metadatum("Version", fmt_version(pkg.version), leftcol) # if pkg.blender: # draw_metadatum("Compatible blender version", fmt_version(pkg.blender), leftcol) if pkg.category: draw_metadatum("Category", pkg.category, leftcol) if pkg.author: draw_metadatum("Author", pkg.author, leftcol) if pkg.support: draw_metadatum("Support level", pkg.support.title(), leftcol) if pkg.warning: draw_metadatum("Warning", pkg.warning, leftcol) if pkg.wiki_url or pkg.tracker_url: padrow = leftcol.row() padrow.label() padrow.scale_y = .5 urlrow = leftcol.row() urlrow.alignment = 'LEFT' if pkg.wiki_url: urlrow.operator("wm.url_open", text="Documentation", icon='HELP').url=pkg.wiki_url if pkg.tracker_url: urlrow.operator("wm.url_open", text="Report a Bug", icon='URL').url=pkg.tracker_url def draw_version(layout: bpy.types.UILayout, pkg: Package): """Draw version of package""" spl = layout.split(.9) left = spl.column() right = spl.column() right.alignment = 'RIGHT' left.label(text=fmt_version(pkg.version)) if pkg.repository is not None: draw_metadatum("Repository", pkg.repository, left) if pkg.installed: right.label(text="Installed") draw_metadatum("Installed to", str(pkg.installed_location), left) if len(metapkg.versions) > 1: row = pkgbox.row() row.label(text="There are multiple providers of this package:") for version in metapkg.versions: # row = pkgbox.row() subvbox = pkgbox.box() draw_version(subvbox, version) pkg = metapkg.get_latest_version() is_expanded = (pkg.name in self.expanded_packages) pkgbox = layout.box() spl = pkgbox.split(.8) left = spl.row(align=True) # for install/uninstall buttons right = spl.row() right.alignment = 'RIGHT' right.scale_y = 1.5 left.operator( WM_OT_package_toggle_expand.bl_idname, icon='TRIA_DOWN' if is_expanded else 'TRIA_RIGHT', emboss=False, ).package_name=pkg.name # for metadata leftcol = left.column(align=True) if pkg.installed: if pkg.url and pkg.user: right.operator(PACKAGE_OT_uninstall.bl_idname, text="Uninstall").package_name=pkg.name elif pkg.user: right.label("Installed, but not in repo") right.scale_y = 2 right.enabled = False elif not pkg.user: l = right.label("System package") right.scale_y = 2 right.enabled = False else: if pkg.url: right.operator(PACKAGE_OT_install.bl_idname, text="Install").package_name=pkg.name else: right.label("Not installed, but no URL?") if is_expanded: expanded() else: collapsed()# }}} def center_message(layout, msg: str): """draw a label in the center of an extra-tall row""" row = layout.row() row.label(text=msg) row.alignment='CENTER' row.scale_y = 10 global _packages if len(_packages) == 0: # TODO: read repository and installed packages synchronously for now; # can't run an operator from draw code to do async monitoring installed_packages = get_packages_from_disk() available_packages = get_packages_from_repo() _packages = build_composite_packagelist(installed_packages, available_packages) if len(_packages) == 0: center_message(pkgzone, "No packages found") return wm = bpy.context.window_manager filters = { 'category': wm.addon_filter, 'search': wm.package_search, 'support': wm.addon_support, 'installstate': wm.package_install_filter, } USERPREF_PT_packages.displayed_packages = filtered_packages(filters, _packages) for pkgname in USERPREF_PT_packages.displayed_packages: row = pkgzone.row() draw_package(_packages[pkgname], row) class WM_OT_package_toggle_expand(bpy.types.Operator): bl_idname = "wm.package_toggle_expand" bl_label = "" bl_description = "Toggle display of extended information for given package (hold shift to collapse all other packages)" bl_options = {'INTERNAL'} log = logging.getLogger(__name__ + ".WM_OT_package_toggle_expand") package_name = bpy.props.StringProperty( name="Package Name", description="Name of package to expand/collapse", ) def invoke(self, context, event): if event.shift: USERPREF_PT_packages.expanded_packages = [] if self.package_name in USERPREF_PT_packages.expanded_packages: USERPREF_PT_packages.expanded_packages.remove(self.package_name) else: USERPREF_PT_packages.expanded_packages.append(self.package_name) return {'FINISHED'} class PackageManagerPreferences(bpy.types.AddonPreferences): bl_idname = __package__ package_url = bpy.props.StringProperty( name='Package URL', description='Just a temporary place to store the URL of a package to download') repository_url = bpy.props.StringProperty( name='Repository URL', description='Temporary repository URL') repositories = bpy.props.CollectionProperty( type=RepositoryProperty, name="Repositories", ) active_repository = bpy.props.IntProperty() def draw(self, context): layout = self.layout temp_box = layout.box() temp_box.label(text="Temporary stuff while we're developing") temp_box.prop(self, 'repository_url') temp_box.operator(PACKAGE_OT_refresh.bl_idname) def validate_packagelist(pkglist: list) -> list: """Ensures all packages have required fields; strips out bad packages and returns them in a list""" pass def build_composite_packagelist(installed: list, available: list) -> OrderedDict: """Merge list of installed and available packages into one dict, keyed by package name""" log = logging.getLogger(__name__ + ".build_composite_packagelist") masterlist = {} def packages_are_equivilent(pkg1: Package, pkg2: Package) -> bool: """Check that packages are the same version and provide the same files""" return pkg1.version == pkg2.version\ and pkg1.files == pkg2.files def is_user_package(pkg: Package) -> bool: """Check if package's install location is in user scripts path or preferences scripts path""" from pathlib import Path user_script_path = bpy.utils.script_path_user() prefs_script_path = bpy.utils.script_path_pref() if user_script_path is not None: in_user = Path(user_script_path) in Path(pkg.installed_location).parents else: in_user = False if prefs_script_path is not None: in_prefs = Path(prefs_script_path) in Path(pkg.installed_location).parents else: in_prefs = False return in_user or in_prefs for pkg in available: pkgname = pkg.bl_info['name'] if pkgname in masterlist: masterlist[pkgname].add_version(pkg) else: masterlist[pkgname] = ConsolidatedPackage(pkg) for pkg in installed: pkg.installed = True pkg.user = is_user_package(pkg) if pkg.name in masterlist: for masterpkg in masterlist[pkg.name]: if packages_are_equivilent(pkg, masterpkg): masterpkg.installed = True masterpkg.installed_location = pkg.installed_location masterpkg.user = pkg.user break else: masterlist[pkg.name].add_version(pkg) else: masterlist[pkg.name] = ConsolidatedPackage(pkg) return OrderedDict(sorted(masterlist.items())) def register(): bpy.utils.register_class(PACKAGE_OT_install) bpy.utils.register_class(PACKAGE_OT_uninstall) bpy.utils.register_class(PACKAGE_OT_refresh_repositories) bpy.utils.register_class(PACKAGE_OT_refresh_packages) bpy.utils.register_class(PACKAGE_OT_refresh) bpy.utils.register_class(USERPREF_PT_packages) bpy.utils.register_class(WM_OT_package_toggle_expand) bpy.types.WindowManager.package_search = bpy.props.StringProperty( name="Search", description="Filter packages by name", options={'TEXTEDIT_UPDATE'} ) bpy.types.WindowManager.package_install_filter = bpy.props.EnumProperty( items=[('AVAILABLE', "Available", "All packages in selected repositories"), ('INSTALLED', "Installed", "All installed packages"), ('UPDATES', "Updates", "All installed packages for which there is a newer version availabe") ], name="Install filter", default='AVAILABLE', ) bpy.utils.register_class(RepositoryProperty) bpy.utils.register_class(PACKAGE_OT_add_repository) bpy.utils.register_class(PACKAGE_OT_remove_repository) bpy.utils.register_class(PACKAGE_UL_repositories) bpy.utils.register_class(PackageManagerPreferences) def unregister(): bpy.utils.unregister_class(PACKAGE_OT_install) bpy.utils.unregister_class(PACKAGE_OT_uninstall) bpy.utils.unregister_class(PACKAGE_OT_refresh_repositories) bpy.utils.unregister_class(PACKAGE_OT_refresh_packages) bpy.utils.unregister_class(PACKAGE_OT_refresh) bpy.utils.unregister_class(USERPREF_PT_packages) bpy.utils.unregister_class(WM_OT_package_toggle_expand) del bpy.types.WindowManager.package_search del bpy.types.WindowManager.package_install_filter bpy.utils.unregister_class(RepositoryProperty) bpy.utils.unregister_class(PACKAGE_OT_add_repository) bpy.utils.unregister_class(PACKAGE_OT_remove_repository) bpy.utils.unregister_class(PACKAGE_UL_repositories) bpy.utils.unregister_class(PackageManagerPreferences)