Rename bpkg_manager to just bpkg

This commit is contained in:
Ellwood Zwovic
2017-07-14 17:13:40 -07:00
parent acd0792b28
commit 093ec4b606
4 changed files with 27 additions and 0 deletions

492
bpkg/__init__.py Normal file
View File

@@ -0,0 +1,492 @@
"""
Blender Package manager
"""
bl_info = {
'name': 'Package Manager',
'author': 'Sybren A. Stüvel',
'version': (0, 1, 0),
'blender': (2, 79, 0),
'location': 'Addon Preferences panel',
'description': 'Add-on package manager.',
'category': 'System',
}
import logging
if 'bpy' in locals():
import importlib
subproc = importlib.reload(subproc)
else:
from . import subproc
import bpy
class SubprocMixin:
"""Mix-in class for things that need to be run in a subprocess."""
log = logging.getLogger(__name__ + '.SubprocMixin')
_state = 'INITIALIZING'
_abort_timeout = 0 # time at which we stop waiting for an abort response and just terminate the process
# Mapping from message type (see bpkg_manager.subproc) to handler function.
# Should be constructed before modal() gets called.
msg_handlers = {}
def execute(self, context):
return self.invoke(context, None)
def quit(self):
"""Signals the state machine to stop this operator from running."""
self._state = 'QUIT'
def invoke(self, context, event):
import multiprocessing
self.log.info('Starting')
self.pipe_blender, self.pipe_subproc = multiprocessing.Pipe()
# The subprocess should just be terminated when Blender quits. Without this,
# Blender would hang while closing, until the subprocess terminates itself.
self.process = self.create_subprocess()
self.process.daemon = True
self.process.start()
self._state = 'RUNNING'
wm = context.window_manager
wm.modal_handler_add(self)
self.timer = wm.event_timer_add(0.1, context.window)
return {'RUNNING_MODAL'}
def modal(self, context, event):
import time
if event.type == 'ESC':
self.log.warning('Escape pressed, sending abort signal to subprocess')
self.abort()
return {'PASS_THROUGH'}
if event.type != 'TIMER':
return {'PASS_THROUGH'}
if self._state == 'ABORTING' and time.time() > self._abort_timeout:
self.log.error('No response from subprocess to abort request, terminating it.')
self.report({'ERROR'}, 'No response from subprocess to abort request, terminating it.')
self.process.terminate()
self._finish(context)
return {'CANCELLED'}
while self.pipe_blender.poll():
self.handle_received_data()
if self._state == 'QUIT':
self._finish(context)
return {'FINISHED'}
if not self.process.is_alive():
self.report_process_died()
self._finish(context)
return {'CANCELLED'}
return {'RUNNING_MODAL'}
def abort(self):
import time
# Allow the subprocess 10 seconds to repsond to our abort message.
self._abort_timeout = time.time() + 10
self._state = 'ABORTING'
self.pipe_blender.send(subproc.Abort())
def _finish(self, context):
import multiprocessing
global bpkg_operation_running
context.window_manager.event_timer_remove(self.timer)
bpkg_operation_running = False
if self.process and self.process.is_alive():
self.log.debug('Waiting for subprocess to quit')
try:
self.process.join(timeout=10)
except multiprocessing.TimeoutError:
self.log.warning('Subprocess is hanging, terminating it forcefully.')
self.process.terminate()
else:
self.log.debug('Subprocess stopped with exit code %i', self.process.exitcode)
def handle_received_data(self):
recvd = self.pipe_blender.recv()
self.log.info('Received message from subprocess: %s', recvd)
try:
handler = self.msg_handlers[type(recvd)]
except KeyError:
self.log.error('Unable to handle received message %s', recvd)
# Maybe we shouldn't show this to the user?
self.report({'WARNING'}, 'Unable to handle received message %s' % recvd)
return
handler(recvd)
def create_subprocess(self):
"""Implement this in a subclass.
:rtype: multiprocessing.Process
"""
raise NotImplementedError()
def report_process_died(self):
"""Provides the user with sensible information when the process has died.
Implement this in a subclass.
"""
raise NotImplementedError()
class BPKG_OT_install(SubprocMixin, bpy.types.Operator):
bl_idname = 'bpkg.install'
bl_label = 'Install package'
bl_description = 'Downloads and installs a Blender add-on package'
bl_options = {'REGISTER'}
package_url = bpy.props.StringProperty(name='package_url', description='The URL of the file to download')
log = logging.getLogger(__name__ + '.BPKG_OT_install')
def invoke(self, context, event):
if not self.package_url:
self.report({'ERROR'}, 'Package URL not given')
return {'CANCELLED'}
return super().invoke(context, event)
def create_subprocess(self):
"""Starts the download process.
Also registers the message handlers.
:rtype: multiprocessing.Process
"""
import multiprocessing
self.msg_handlers = {
subproc.Progress: self._subproc_progress,
subproc.DownloadError: self._subproc_download_error,
subproc.InstallError: self._subproc_install_error,
subproc.FileConflictError: self._subproc_conflict_error,
subproc.Success: self._subproc_success,
subproc.Aborted: self._subproc_aborted,
}
import pathlib
# TODO: We need other paths besides this one on subprocess end, so it might be better to pass them all at once.
# For now, just pass this one.
install_path = pathlib.Path(bpy.utils.user_resource('SCRIPTS', 'addons', create=True))
self.log.debug("Using %s as install path", install_path)
import addon_utils
proc = multiprocessing.Process(target=subproc.download_and_install,
args=(self.pipe_subproc, self.package_url, install_path, addon_utils.paths()))
return proc
def _subproc_progress(self, progress: subproc.Progress):
self.log.info('Task progress at %i%%', progress.progress * 100)
def _subproc_download_error(self, error: subproc.DownloadError):
self.report({'ERROR'}, 'Unable to download package: %s' % error.description)
self.quit()
def _subproc_install_error(self, error: subproc.InstallError):
self.report({'ERROR'}, 'Unable to install package: %s' % error.message)
self.quit()
def _subproc_conflict_error(self, error: subproc.FileConflictError):
self.report({'ERROR'}, 'Unable to install package: %s' % error.message)
self.quit()
def _subproc_success(self, success: subproc.Success):
self.report({'INFO'}, 'Package installed successfully')
self.quit()
def _subproc_aborted(self, aborted: subproc.Aborted):
self.report({'ERROR'}, 'Package installation aborted per your request')
self.quit()
def report_process_died(self):
if self.process.exitcode:
self.log.error('Process died without telling us! Exit code was %i', self.process.exitcode)
self.report({'ERROR'}, 'Error downloading package, exit code %i' % self.process.exitcode)
else:
self.log.error('Process died without telling us! Exit code was 0 though')
self.report({'WARNING'}, 'Error downloading package, but process finished OK. This is weird.')
class BPKG_OT_refresh(SubprocMixin, bpy.types.Operator):
bl_idname = "bpkg.refresh"
bl_label = "Refresh Packages"
bl_description = 'Check for new and updated packages'
bl_options = {'REGISTER'}
log = logging.getLogger(__name__ + ".BPKG_OT_refresh")
def invoke(self, context, event):
return super().invoke(context, event)
def create_subprocess(self):
"""Starts the download process.
Also registers the message handlers.
:rtype: multiprocessing.Process
"""
import multiprocessing
self.msg_handlers = {
subproc.Progress: self._subproc_progress,
subproc.SubprocError: self._subproc_error,
subproc.DownloadError: self._subproc_download_error,
subproc.Success: self._subproc_success,
subproc.Result: self._subproc_result,
subproc.Aborted: self._subproc_aborted,
}
import pathlib
storage_path = pathlib.Path(bpy.utils.user_resource('CONFIG', 'addons', create=True))
repository_url = bpy.context.user_preferences.addons[__package__].preferences.repository_url
proc = multiprocessing.Process(target=subproc.refresh,
args=(self.pipe_subproc, storage_path, repository_url))
return proc
def _subproc_progress(self, progress: subproc.Progress):
self.log.info('Task progress at %i%%', progress.progress * 100)
def _subproc_error(self, error: subproc.SubprocError):
self.report({'ERROR'}, 'Unable to refresh package list: %s' % error.message)
self.quit()
def _subproc_download_error(self, error: subproc.DownloadError):
self.report({'ERROR'}, 'Unable to download package list: %s' % error.description)
self.quit()
def _subproc_success(self, success: subproc.Success):
self.report({'INFO'}, 'Package list retrieved successfully')
self.quit()
def _subproc_result(self, result: subproc.Result):
prefs = bpy.context.user_preferences.addons[__package__].preferences
prefs['repo'] = result.data
self.report({'INFO'}, 'Package list retrieved successfully')
self.quit()
def _subproc_aborted(self, aborted: subproc.Aborted):
self.report({'ERROR'}, 'Package list retrieval aborted per your request')
self.quit()
def report_process_died(self):
if self.process.exitcode:
self.log.error('Process died without telling us! Exit code was %i', self.process.exitcode)
self.report({'ERROR'}, 'Error refreshing package lists, exit code %i' % self.process.exitcode)
else:
self.log.error('Process died without telling us! Exit code was 0 though')
self.report({'WARNING'}, 'Error refreshing package lists, but process finished OK. This is weird.')
class BPKG_OT_hang(SubprocMixin, bpy.types.Operator):
bl_idname = 'bpkg.hang'
bl_label = 'Hang (debug)'
bl_description = 'Starts a process that hangs for an hour, for debugging purposes'
bl_options = {'REGISTER'}
log = logging.getLogger(__name__ + '.BPKG_OT_install')
def create_subprocess(self):
"""Starts the download process.
Also registers the message handlers.
:rtype: multiprocessing.Process
"""
import multiprocessing
proc = multiprocessing.Process(target=subproc.debug_hang)
return proc
def report_process_died(self):
self.report({'ERROR'}, 'Process died, exit code %s' % self.process.exitcode)
class USERPREF_PT_packages(bpy.types.Panel):
bl_label = "Package Management"
bl_space_type = 'USER_PREFERENCES'
bl_region_type = 'WINDOW'
bl_options = {'HIDE_HEADER'}
log = logging.getLogger(__name__ + '.USERPREF_PT_packages')
@classmethod
def poll(cls, context):
userpref = context.user_preferences
return (userpref.active_section == 'PACKAGES')
def draw(self, context):
repo = context.user_preferences.addons[__package__].preferences['repo']
layout = self.layout
main = layout.row()
spl = main.split(.12)
sidebar = spl.column(align=True)
pkgzone = spl.column()
sidebar.label(text="Category")
sidebar.prop(context.window_manager, "addon_filter", text="")
top = pkgzone.row()
spl = top.split(.6)
spl.prop(context.window_manager, "package_search", text="", icon='VIEWZOOM')
spl_r = spl.row()
spl_r.prop(context.window_manager, "package_install_filter", expand=True)
#TODO: more advanced filter/sorting; sort matches which match the filter string from the start higher
#Also some caching of this would be nice, this only needs to be re-run when any of the filters change.
def filter_package(package):
"""Returns true if the given package matches all filters"""
filterstr = bpy.context.window_manager.package_search
category = bpy.context.window_manager.addon_filter
blinfo = package['bl_info']
def match_search() -> bool:
if len(filterstr) == 0:
return True
if blinfo['name'].lower().__contains__(filterstr.lower()):
return True
return False
def match_category() -> bool:
if category.upper() == 'ALL':
return True
if 'category' not in blinfo:
return True
if blinfo['category'].upper() == category.upper():
return True
return False
if match_search() and match_category():
return True
return False
def draw_package(package, layout):
"""Draws the given package"""
pkgbox = layout.box()
spl = pkgbox.split(.8)
left = spl.column(align=True)
# for install/uninstall buttons
right = spl.row()
right.alignment = 'RIGHT'
right.scale_y = 2
# for title & description
lr1 = left.row()
lr2 = left.row()
lr2.enabled = False #Give name more visual weight
lr1.label(text=pkg['bl_info'].get('name', "MISSING NAME"))
lr2.label(text=pkg['bl_info'].get('description', "MISSING DESCRIPTION"))
right.operator(BPKG_OT_install.bl_idname,
text="Install").package_url=pkg.get('url', "")
for pkg in repo['packages']:
if filter_package(pkg):
row = pkgzone.row()
draw_package(pkg, row)
# class WM_OT_package_expand(Operator):
# bl_idname = "wm.package_expand"
# bl_label = ""
# bl_description = "Display information and preferences for this package"
# bl_options = {'INTERNAL'}
#
# module = StringProperty(
# name="Module",
# description="Module name of the add-on to expand",
# )
#
# def execute(self, context):
# import addon_utils
#
# module_name = self.module
#
# mod = addon_utils.addons_fake_modules.get(module_name)
# if mod is not None:
# info = addon_utils.module_bl_info(mod)
# info["show_expanded"] = not info["show_expanded"]
#
# return {'FINISHED'}
#
class PackageManagerPreferences(bpy.types.AddonPreferences):
bl_idname = __package__
package_url = bpy.props.StringProperty(
name='Package URL',
description='Just a temporary place to store the URL of a package to download')
repository_url = bpy.props.StringProperty(
name='Repository URL',
description='Temporary repository URL')
def draw(self, context):
layout = self.layout
temp_box = layout.box()
temp_box.label(text="Temporary stuff while we're developing")
temp_box.prop(self, 'repository_url')
temp_box.operator(BPKG_OT_refresh.bl_idname)
def register():
bpy.utils.register_class(BPKG_OT_install)
bpy.utils.register_class(BPKG_OT_refresh)
bpy.utils.register_class(BPKG_OT_hang)
bpy.utils.register_class(USERPREF_PT_packages)
bpy.types.WindowManager.package_search = bpy.props.StringProperty(
name="Search",
description="Filter packages by name",
options={'TEXTEDIT_UPDATE'}
)
bpy.types.WindowManager.package_install_filter = bpy.props.EnumProperty(
items=[('AVAILABLE', "Available", "All packages in selected repositories"),
('INSTALLED', "Installed", "All installed packages"),
('UPDATES', "Updates", "All installed packages for which there is a newer version availabe")
],
name="Install filter",
default='AVAILABLE',
)
bpy.utils.register_class(PackageManagerPreferences)
def unregister():
bpy.utils.unregister_class(BPKG_OT_install)
bpy.utils.unregister_class(BPKG_OT_refresh)
bpy.utils.unregister_class(BPKG_OT_hang)
bpy.utils.unregister_class(USERPREF_PT_packages)
del bpy.types.WindowManager.package_search
del bpy.types.WindowManager.package_install_filter
bpy.utils.unregister_class(PackageManagerPreferences)

552
bpkg/appdirs.py Normal file
View File

@@ -0,0 +1,552 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2005-2010 ActiveState Software Inc.
# Copyright (c) 2013 Eddy Petrișor
"""Utilities for determining application-specific dirs.
See <http://github.com/ActiveState/appdirs> for details and usage.
"""
# Dev Notes:
# - MSDN on where to store app data files:
# http://support.microsoft.com/default.aspx?scid=kb;en-us;310294#XSLTH3194121123120121120120
# - Mac OS X: http://developer.apple.com/documentation/MacOSX/Conceptual/BPFileSystem/index.html
# - XDG spec for Un*x: http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
__version_info__ = (1, 4, 0)
__version__ = '.'.join(map(str, __version_info__))
import sys
import os
PY3 = sys.version_info[0] == 3
if PY3:
unicode = str
if sys.platform.startswith('java'):
import platform
os_name = platform.java_ver()[3][0]
if os_name.startswith('Windows'): # "Windows XP", "Windows 7", etc.
system = 'win32'
elif os_name.startswith('Mac'): # "Mac OS X", etc.
system = 'darwin'
else: # "Linux", "SunOS", "FreeBSD", etc.
# Setting this to "linux2" is not ideal, but only Windows or Mac
# are actually checked for and the rest of the module expects
# *sys.platform* style strings.
system = 'linux2'
else:
system = sys.platform
def user_data_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
Mac OS X: ~/Library/Application Support/<AppName>
Unix: ~/.local/share/<AppName> # or in $XDG_DATA_HOME, if defined
Win XP (not roaming): C:\Documents and Settings\<username>\Application Data\<AppAuthor>\<AppName>
Win XP (roaming): C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>
Win 7 (not roaming): C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>
Win 7 (roaming): C:\Users\<username>\AppData\Roaming\<AppAuthor>\<AppName>
For Unix, we follow the XDG spec and support $XDG_DATA_HOME.
That means, by default "~/.local/share/<AppName>".
"""
if system == "win32":
if appauthor is None:
appauthor = appname
const = roaming and "CSIDL_APPDATA" or "CSIDL_LOCAL_APPDATA"
path = os.path.normpath(_get_win_folder(const))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
elif system == 'darwin':
path = os.path.expanduser('~/Library/Application Support/')
if appname:
path = os.path.join(path, appname)
else:
path = os.getenv('XDG_DATA_HOME', os.path.expanduser("~/.local/share"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def site_data_dir(appname=None, appauthor=None, version=None, multipath=False):
"""Return full path to the user-shared data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"multipath" is an optional parameter only applicable to *nix
which indicates that the entire list of data dirs should be
returned. By default, the first item from XDG_DATA_DIRS is
returned, or '/usr/local/share/<AppName>',
if XDG_DATA_DIRS is not set
Typical user data directories are:
Mac OS X: /Library/Application Support/<AppName>
Unix: /usr/local/share/<AppName> or /usr/share/<AppName>
Win XP: C:\Documents and Settings\All Users\Application Data\<AppAuthor>\<AppName>
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory on Vista.)
Win 7: C:\ProgramData\<AppAuthor>\<AppName> # Hidden, but writeable on Win 7.
For Unix, this is using the $XDG_DATA_DIRS[0] default.
WARNING: Do not use this on Windows. See the Vista-Fail note above for why.
"""
if system == "win32":
if appauthor is None:
appauthor = appname
path = os.path.normpath(_get_win_folder("CSIDL_COMMON_APPDATA"))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
elif system == 'darwin':
path = os.path.expanduser('/Library/Application Support')
if appname:
path = os.path.join(path, appname)
else:
# XDG default for $XDG_DATA_DIRS
# only first, if multipath is False
path = os.getenv('XDG_DATA_DIRS',
os.pathsep.join(['/usr/local/share', '/usr/share']))
pathlist = [os.path.expanduser(x.rstrip(os.sep)) for x in path.split(os.pathsep)]
if appname:
if version:
appname = os.path.join(appname, version)
pathlist = [os.sep.join([x, appname]) for x in pathlist]
if multipath:
path = os.pathsep.join(pathlist)
else:
path = pathlist[0]
return path
if appname and version:
path = os.path.join(path, version)
return path
def user_config_dir(appname=None, appauthor=None, version=None, roaming=False):
r"""Return full path to the user-specific config dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
Mac OS X: same as user_data_dir
Unix: ~/.config/<AppName> # or in $XDG_CONFIG_HOME, if defined
Win *: same as user_data_dir
For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME.
That means, by deafult "~/.config/<AppName>".
"""
if system in ["win32", "darwin"]:
path = user_data_dir(appname, appauthor, None, roaming)
else:
path = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config"))
if appname:
path = os.path.join(path, appname)
if appname and version:
path = os.path.join(path, version)
return path
def site_config_dir(appname=None, appauthor=None, version=None, multipath=False):
"""Return full path to the user-shared data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"multipath" is an optional parameter only applicable to *nix
which indicates that the entire list of config dirs should be
returned. By default, the first item from XDG_CONFIG_DIRS is
returned, or '/etc/xdg/<AppName>', if XDG_CONFIG_DIRS is not set
Typical user data directories are:
Mac OS X: same as site_data_dir
Unix: /etc/xdg/<AppName> or $XDG_CONFIG_DIRS[i]/<AppName> for each value in
$XDG_CONFIG_DIRS
Win *: same as site_data_dir
Vista: (Fail! "C:\ProgramData" is a hidden *system* directory on Vista.)
For Unix, this is using the $XDG_CONFIG_DIRS[0] default, if multipath=False
WARNING: Do not use this on Windows. See the Vista-Fail note above for why.
"""
if system in ["win32", "darwin"]:
path = site_data_dir(appname, appauthor)
if appname and version:
path = os.path.join(path, version)
else:
# XDG default for $XDG_CONFIG_DIRS
# only first, if multipath is False
path = os.getenv('XDG_CONFIG_DIRS', '/etc/xdg')
pathlist = [os.path.expanduser(x.rstrip(os.sep)) for x in path.split(os.pathsep)]
if appname:
if version:
appname = os.path.join(appname, version)
pathlist = [os.sep.join([x, appname]) for x in pathlist]
if multipath:
path = os.pathsep.join(pathlist)
else:
path = pathlist[0]
return path
def user_cache_dir(appname=None, appauthor=None, version=None, opinion=True):
r"""Return full path to the user-specific cache dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"opinion" (boolean) can be False to disable the appending of
"Cache" to the base app data dir for Windows. See
discussion below.
Typical user cache directories are:
Mac OS X: ~/Library/Caches/<AppName>
Unix: ~/.cache/<AppName> (XDG default)
Win XP: C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>\Cache
Vista: C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>\Cache
On Windows the only suggestion in the MSDN docs is that local settings go in
the `CSIDL_LOCAL_APPDATA` directory. This is identical to the non-roaming
app data dir (the default returned by `user_data_dir` above). Apps typically
put cache data somewhere *under* the given dir here. Some examples:
...\Mozilla\Firefox\Profiles\<ProfileName>\Cache
...\Acme\SuperApp\Cache\1.0
OPINION: This function appends "Cache" to the `CSIDL_LOCAL_APPDATA` value.
This can be disabled with the `opinion=False` option.
"""
if system == "win32":
if appauthor is None:
appauthor = appname
path = os.path.normpath(_get_win_folder("CSIDL_LOCAL_APPDATA"))
if appname:
if appauthor is not False:
path = os.path.join(path, appauthor, appname)
else:
path = os.path.join(path, appname)
if opinion:
path = os.path.join(path, "Cache")
elif system == 'darwin':
path = os.path.expanduser('~/Library/Caches')
if appname:
path = os.path.join(path, appname)
else:
path = os.getenv('XDG_CACHE_HOME', os.path.expanduser('~/.cache'))
if appname:
path = os.path.join(path, appname.lower().replace(' ', '-'))
if appname and version:
path = os.path.join(path, version)
return path
def user_log_dir(appname=None, appauthor=None, version=None, opinion=True):
r"""Return full path to the user-specific log dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"appauthor" (only used on Windows) is the name of the
appauthor or distributing body for this application. Typically
it is the owning company name. This falls back to appname. You may
pass False to disable it.
"version" is an optional version path element to append to the
path. You might want to use this if you want multiple versions
of your app to be able to run independently. If used, this
would typically be "<major>.<minor>".
Only applied when appname is present.
"opinion" (boolean) can be False to disable the appending of
"Logs" to the base app data dir for Windows, and "log" to the
base cache dir for Unix. See discussion below.
Typical user cache directories are:
Mac OS X: ~/Library/Logs/<AppName>
Unix: ~/.cache/<AppName>/log # or under $XDG_CACHE_HOME if defined
Win XP: C:\Documents and Settings\<username>\Local Settings\Application Data\<AppAuthor>\<AppName>\Logs
Vista: C:\Users\<username>\AppData\Local\<AppAuthor>\<AppName>\Logs
On Windows the only suggestion in the MSDN docs is that local settings
go in the `CSIDL_LOCAL_APPDATA` directory. (Note: I'm interested in
examples of what some windows apps use for a logs dir.)
OPINION: This function appends "Logs" to the `CSIDL_LOCAL_APPDATA`
value for Windows and appends "log" to the user cache dir for Unix.
This can be disabled with the `opinion=False` option.
"""
if system == "darwin":
path = os.path.join(
os.path.expanduser('~/Library/Logs'),
appname)
elif system == "win32":
path = user_data_dir(appname, appauthor, version)
version = False
if opinion:
path = os.path.join(path, "Logs")
else:
path = user_cache_dir(appname, appauthor, version)
version = False
if opinion:
path = os.path.join(path, "log")
if appname and version:
path = os.path.join(path, version)
return path
class AppDirs(object):
"""Convenience wrapper for getting application dirs."""
def __init__(self, appname, appauthor=None, version=None, roaming=False,
multipath=False):
self.appname = appname
self.appauthor = appauthor
self.version = version
self.roaming = roaming
self.multipath = multipath
@property
def user_data_dir(self):
return user_data_dir(self.appname, self.appauthor,
version=self.version, roaming=self.roaming)
@property
def site_data_dir(self):
return site_data_dir(self.appname, self.appauthor,
version=self.version, multipath=self.multipath)
@property
def user_config_dir(self):
return user_config_dir(self.appname, self.appauthor,
version=self.version, roaming=self.roaming)
@property
def site_config_dir(self):
return site_config_dir(self.appname, self.appauthor,
version=self.version, multipath=self.multipath)
@property
def user_cache_dir(self):
return user_cache_dir(self.appname, self.appauthor,
version=self.version)
@property
def user_log_dir(self):
return user_log_dir(self.appname, self.appauthor,
version=self.version)
#---- internal support stuff
def _get_win_folder_from_registry(csidl_name):
"""This is a fallback technique at best. I'm not sure if using the
registry for this guarantees us the correct answer for all CSIDL_*
names.
"""
import _winreg
shell_folder_name = {
"CSIDL_APPDATA": "AppData",
"CSIDL_COMMON_APPDATA": "Common AppData",
"CSIDL_LOCAL_APPDATA": "Local AppData",
}[csidl_name]
key = _winreg.OpenKey(
_winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders"
)
dir, type = _winreg.QueryValueEx(key, shell_folder_name)
return dir
def _get_win_folder_with_pywin32(csidl_name):
from win32com.shell import shellcon, shell
dir = shell.SHGetFolderPath(0, getattr(shellcon, csidl_name), 0, 0)
# Try to make this a unicode path because SHGetFolderPath does
# not return unicode strings when there is unicode data in the
# path.
try:
dir = unicode(dir)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in dir:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
try:
import win32api
dir = win32api.GetShortPathName(dir)
except ImportError:
pass
except UnicodeError:
pass
return dir
def _get_win_folder_with_ctypes(csidl_name):
import ctypes
csidl_const = {
"CSIDL_APPDATA": 26,
"CSIDL_COMMON_APPDATA": 35,
"CSIDL_LOCAL_APPDATA": 28,
}[csidl_name]
buf = ctypes.create_unicode_buffer(1024)
ctypes.windll.shell32.SHGetFolderPathW(None, csidl_const, None, 0, buf)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in buf:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
buf2 = ctypes.create_unicode_buffer(1024)
if ctypes.windll.kernel32.GetShortPathNameW(buf.value, buf2, 1024):
buf = buf2
return buf.value
def _get_win_folder_with_jna(csidl_name):
import array
from com.sun import jna
from com.sun.jna.platform import win32
buf_size = win32.WinDef.MAX_PATH * 2
buf = array.zeros('c', buf_size)
shell = win32.Shell32.INSTANCE
shell.SHGetFolderPath(None, getattr(win32.ShlObj, csidl_name), None, win32.ShlObj.SHGFP_TYPE_CURRENT, buf)
dir = jna.Native.toString(buf.tostring()).rstrip("\0")
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = False
for c in dir:
if ord(c) > 255:
has_high_char = True
break
if has_high_char:
buf = array.zeros('c', buf_size)
kernel = win32.Kernel32.INSTANCE
if kernal.GetShortPathName(dir, buf, buf_size):
dir = jna.Native.toString(buf.tostring()).rstrip("\0")
return dir
if system == "win32":
try:
import win32com.shell
_get_win_folder = _get_win_folder_with_pywin32
except ImportError:
try:
from ctypes import windll
_get_win_folder = _get_win_folder_with_ctypes
except ImportError:
try:
import com.sun.jna
_get_win_folder = _get_win_folder_with_jna
except ImportError:
_get_win_folder = _get_win_folder_from_registry
#---- self test code
if __name__ == "__main__":
appname = "MyApp"
appauthor = "MyCompany"
props = ("user_data_dir", "site_data_dir",
"user_config_dir", "site_config_dir",
"user_cache_dir", "user_log_dir")
print("-- app dirs (with optional 'version')")
dirs = AppDirs(appname, appauthor, version="1.0")
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (without optional 'version')")
dirs = AppDirs(appname, appauthor)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (without optional 'appauthor')")
dirs = AppDirs(appname)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))
print("\n-- app dirs (with disabled 'appauthor')")
dirs = AppDirs(appname, appauthor=False)
for prop in props:
print("%s: %s" % (prop, getattr(dirs, prop)))

49
bpkg/cache.py Normal file
View File

@@ -0,0 +1,49 @@
# ##### BEGIN GPL LICENSE BLOCK #####
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# ##### END GPL LICENSE BLOCK #####
import os
import logging
import pathlib
from . import appdirs
log = logging.getLogger(__name__)
def cache_directory(*subdirs) -> pathlib.Path:
"""Returns an OS-specifc cache location, and ensures it exists.
Should be replaced with a call to bpy.utils.user_resource('CACHE', ...)
once https://developer.blender.org/T47684 is finished.
:param subdirs: extra subdirectories inside the cache directory.
>>> cache_directory()
'.../blender_cloud/your_username'
>>> cache_directory('sub1', 'sub2')
'.../blender_cloud/your_username/sub1/sub2'
"""
# TODO: use bpy.utils.user_resource('CACHE', ...)
# once https://developer.blender.org/T47684 is finished.
user_cache_dir = appdirs.user_cache_dir(appname='Blender', appauthor=False)
cache_dir = pathlib.Path(user_cache_dir) / 'blender_package_manager' / pathlib.Path(*subdirs)
cache_dir.mkdir(mode=0o700, parents=True, exist_ok=True)
return cache_dir

514
bpkg/subproc.py Normal file
View File

@@ -0,0 +1,514 @@
"""
All the stuff that needs to run in a subprocess.
"""
import logging
import pathlib
import shutil
import json
class Message:
"""Superclass for all message sent over pipes."""
class BlenderMessage(Message):
"""Superclass for all messages sent from Blender to the subprocess."""
class SubprocMessage(Message):
"""Superclass for all messages sent from the subprocess to Blender."""
class Abort(BlenderMessage):
"""Sent when the user requests abortion of a task."""
class Progress(SubprocMessage):
"""Send from subprocess to Blender to report progress.
:ivar progress: the progress percentage, from 0-1.
"""
def __init__(self, progress: float):
self.progress = progress
class SubprocError(SubprocMessage):
"""Superclass for all fatal error messages sent from the subprocess."""
def __init__(self, message: str):
self.message = message
class SubprocWarning(SubprocMessage):
"""Superclass for all non-fatal warning messages sent from the subprocess."""
def __init__(self, message: str):
self.message = message
class InstallError(SubprocError):
"""Sent when there was an error installing something."""
class FileConflictError(InstallError):
"""Sent when installation would overwrite existing files."""
def __init__(self, message: str, conflicts: list):
self.message = message
self.conflicts = conflicts
class DownloadError(SubprocMessage):
"""Sent when there was an error downloading something."""
def __init__(self, status_code: int, description: str):
self.status_code = status_code
self.description = description
class Success(SubprocMessage):
"""Sent when an operation finished sucessfully."""
class Result(SubprocMessage):
"""Sent when an operation returns data to be used on the parent process."""
def __init__(self, data):
self.data = data
class Aborted(SubprocMessage):
"""Sent as response to Abort message."""
class InstallException(Exception):
"""Raised when there is an error during installation"""
class DownloadException(Exception):
"""Raised when there is an error downloading something"""
def __init__(self, status_code: int, message: str):
self.status_code = status_code
self.message = message
class BadRepository(Exception):
"""Raised when reading a repository results in an error"""
class InplaceBackup:
"""Utility for moving a file out of the way by appending a '~'"""
log = logging.getLogger('%s.inplace-backup' % __name__)
def __init__(self, path: pathlib.Path):
self.path = path
self.backup()
def backup(self):
"""Move 'path' to 'path~'"""
self.backup_path = pathlib.Path(str(self.path) + '~')
if self.backup_path.exists():
self.log.warning("Overwriting existing backup '{}'".format(self.backup_path))
self._rm(self.backup_path)
shutil.move(str(self.path), str(self.backup_path))
def restore(self):
"""Move 'path~' to 'path'"""
if not self.backup_path:
raise RuntimeError("Can't restore file before backing it up")
if self.path.exists():
self.log.warning("Overwriting '{0}' with backup file".format(self.path))
self._rm(self.backup_path)
shutil.move(str(self.backup_path), str(self.path))
def remove(self):
"""Remove 'path~'"""
self._rm(self.backup_path)
def _rm(self, path: pathlib.Path):
"""Just delete whatever is specified by `path`"""
if path.is_file():
path.unlink()
else:
shutil.rmtree(str(path))
class Package:
"""
Stores package methods and metadata
"""
log = logging.getLogger(__name__ + ".Repository")
def __init__(self, package_dict:dict = None):
self.from_dict(package_dict)
def to_dict(self) -> dict:
"""
Return a dict representation of the package
"""
return {
'bl_info': self.bl_info,
'url': self.url,
}
def from_dict(self, package_dict: dict):
"""
Get attributes from a dict such as produced by `to_dict`
"""
if package_dict is None:
package_dict = {}
for attr in ('name', 'url', 'bl_info'):
setattr(self, attr, package_dict.get(attr))
class Repository:
"""
Stores repository metadata (including packages)
"""
log = logging.getLogger(__name__ + ".Repository")
def __init__(self, url=None):
self.set_from_dict({'url': url})
self.log.debug("Initializing repository: %s", self.to_dict())
# def cleanse_packagelist(self):
# """Remove empty packages (no bl_info), packages with no name"""
def refresh(self):
"""
Requests repo.json from URL and embeds etag/last-modification headers
"""
import requests
if self.url is None:
raise ValueError("Cannot refresh repository without a URL")
self.log.debug("Refreshing repository from %s", self.url)
req_headers = {}
# Do things this way to avoid adding empty objects/None to the req_headers dict
if self._headers:
try:
req_headers['If-None-Match'] = self._headers['etag']
except KeyError:
pass
try:
req_headers['If-Modified-Since'] = self._headers['last-modified']
except KeyError:
pass
resp = requests.get(self.url, headers=req_headers)
try:
resp.raise_for_status()
except requests.HTTPError as err:
self.log.error('Error downloading %s: %s', self.url, err)
raise DownloadException(resp.status_code, resp.reason) from err
if resp.status_code == requests.codes.not_modified:
self.log.debug("Packagelist not modified")
return
resp_headers = {}
try:
resp_headers['etag'] = resp.headers['etag']
except KeyError:
pass
try:
resp_headers['last-modified'] = resp.headers['last-modified']
except KeyError:
pass
self.log.debug("Found headers: %s", resp_headers)
repodict = resp.json()
repodict['_headers'] = resp_headers
self.set_from_dict(repodict)
def to_dict(self, sort=False) -> dict:
"""
Return a dict representation of the repository
"""
if self.packages:
packages = [p.to_dict() for p in self.packages]
if sort:
packages.sort(key=lambda p: p['bl_info']['name'].lower())
else:
packages = []
return {
'name': self.name,
'packages': packages,
'url': self.url,
'_headers': self._headers,
}
def set_from_dict(self, repodict: dict):
"""
Get repository attributes from a dict such as produced by `to_dict`
"""
if repodict is None:
repodict = {}
for attr in ('name', 'url', 'packages', '_headers'):
if attr == 'packages':
value = set(Package(pkg) for pkg in repodict.get('packages', []))
else:
value = repodict.get(attr)
if value is None:
try:
value = getattr(self, attr)
except AttributeError:
pass
setattr(self, attr, value)
@classmethod
def from_dict(cls, repodict: dict):
"""
Like `set_from_dict`, but immutable
"""
repo = cls()
repo.set_from_dict(repodict)
return repo
def to_file(self, path: pathlib.Path):
"""
Dump repository to a json file at `path`.
"""
if self.packages is None:
self.log.warning("Writing an empty repository")
with path.open('w', encoding='utf-8') as repo_file:
json.dump(self.to_dict(), repo_file, indent=4, sort_keys=True)
self.log.debug("Repository written to %s" % path)
@classmethod
def from_file(cls, path: pathlib.Path):
"""
Read repository from a json file at `path`.
"""
try:
repo_file = path.open('r', encoding='utf-8')
except IOError as err:
raise BadRepository from err
with repo_file:
try:
repo = cls.from_dict(json.load(repo_file))
except Exception as err:
raise BadRepository from err
cls.log.debug("Repository read from %s", path)
return repo
def _download(pipe_to_blender, package_url: str, download_dir: pathlib.Path) -> pathlib.Path:
"""Downloads the given package
:returns: path to the downloaded file, or None in case of error.
"""
import requests
log = logging.getLogger('%s.download' % __name__)
log.info('Going to download %s to %s', package_url, download_dir)
pipe_to_blender.send(Progress(0.0))
log.info('Downloading %s', package_url)
resp = requests.get(package_url, stream=True, verify=True)
try:
resp.raise_for_status()
except requests.HTTPError as ex:
log.error('Error downloading %s: %s', package_url, ex)
pipe_to_blender.send(DownloadError(resp.status_code, str(ex)))
return None
try:
# Use float so that we can also use infinity
content_length = float(resp.headers['content-length'])
except KeyError:
log.warning('Server did not send content length, cannot report progress.')
content_length = float('inf')
# TODO: check if there's enough disk space.
# TODO: get filename from Content-Disposition header, if available.
# TODO: use urllib.parse to parse the URL.
local_filename = package_url.split('/')[-1] or 'download.tmp'
local_fpath = download_dir / local_filename
downloaded_length = 0
with local_fpath.open('wb') as outfile:
for chunk in resp.iter_content(chunk_size=1024 ** 2):
# Handle abort messages from Blender
while pipe_to_blender.poll():
recvd = pipe_to_blender.recv()
if isinstance(recvd, Abort):
log.warning('Aborting download of %s by request', package_url)
pipe_to_blender.send(Aborted())
return None
log.warning('Unknown message %s received, ignoring', recvd)
if not chunk: # filter out keep-alive new chunks
continue
outfile.write(chunk)
downloaded_length += len(chunk)
# TODO: use multiplier for progress, so that we can count up to 70% and
# leave 30% "progress" for installation of the package.
pipe_to_blender.send(Progress(downloaded_length / content_length))
return local_fpath
def _install(pipe_to_blender, pkgpath: pathlib.Path, dest: pathlib.Path, searchpaths: list):
"""Extracts/moves package at `pkgpath` to `dest`"""
import zipfile
log = logging.getLogger('%s.install' % __name__)
log.debug("Starting installation")
pipe_to_blender.send(Progress(0.0))
if not pkgpath.is_file():
log.error("File to install isn't a file: %s", pkgpath)
pipe_to_blender.send(InstallError("Package is not a file"))
raise InstallException
if not dest.is_dir():
log.error("Destination for install isn't a directory: %s", dest)
pipe_to_blender.send(InstallError("Destination is not a directory"))
raise InstallException
# TODO: check to make sure addon/package isn't already installed elsewhere
# The following is adapted from `addon_install` in bl_operators/wm.py
# check to see if the file is in compressed format (.zip)
if zipfile.is_zipfile(pkgpath):
log.debug("Package is zipfile")
try:
file_to_extract = zipfile.ZipFile(str(pkgpath), 'r')
except Exception as err:
# TODO HACK: see if it makes sense to make a single InstallError class which inherits from both Exception and SubprocError
# It sounds weird, but it could avoid the need for duplication here; create a new InstallError with the right message,
# then send it to blender and also raise it.
pipe_to_blender.send(InstallError("Failed to read zip file: %s" % err))
raise InstallException from err
def root_files(filelist: list) -> list:
"""Some string parsing to get a list of the root contents of a zip from its namelist"""
rootlist = []
for f in filelist:
# Get all names which have no path separators (root level files)
# or have a single path separator at the end (root level directories).
if len(f.rstrip('/').split('/')) == 1:
rootlist.append(f)
return rootlist
conflicts = [dest / f for f in root_files(file_to_extract.namelist()) if (dest / f).exists()]
backups = []
for conflict in conflicts:
backups.append(InplaceBackup(conflict))
try:
file_to_extract.extractall(str(dest))
except Exception as err:
for backup in backups:
backup.restore()
pipe_to_blender.send(InstallError("Failed to extract zip file to '%s': %s" % (dest, err)))
raise InstallException from err
for backup in backups:
backup.remove()
else:
log.debug("Package is pyfile")
dest_file = (dest / pkgpath.name)
if dest_file.exists():
backup = InplaceBackup(dest_file)
try:
shutil.copyfile(str(pkgpath), str(dest_file))
except Exception as err:
backup.restore()
pipe_to_blender.send(InstallError("Failed to copy file to '%s': %s" % (dest, err)))
raise InstallException from err
try:
pkgpath.unlink()
log.debug("Removed cached package: %s", pkgpath)
except Exception as err:
pipe_to_blender.send(SubprocWarning("Failed to remove package from cache: %s" % err))
raise InstallException from err
pipe_to_blender.send(Progress(1.0))
return
def download_and_install(pipe_to_blender, package_url: str, install_path: pathlib.Path, search_paths: list):
"""Downloads and installs the given package."""
from . import cache
log = logging.getLogger('%s.download_and_install' % __name__)
# log.debug(utils.user_resource('SCRIPTS', 'blorp'))
cache_dir = cache.cache_directory('downloads')
downloaded = _download(pipe_to_blender, package_url, cache_dir)
if not downloaded:
log.debug('Download failed/aborted, not going to install anything.')
return
# Only send success if _install doesn't throw an exception
# Maybe not the best way to do this? (will also catch exceptions which occur during message sending)
try:
_install(pipe_to_blender, downloaded, install_path, search_paths)
pipe_to_blender.send(Success())
except InstallException as err:
#TODO
log.error("Failed to install package: %s", err)
pipe_to_blender.send(InstallError(err))
def refresh(pipe_to_blender, storage_path: pathlib.Path, repository_url: str):
"""Retrieves and stores the given repository"""
log = logging.getLogger(__name__ + '.refresh')
repo_path = storage_path / 'repo.json'
try:
repo = Repository.from_file(repo_path)
except BadRepository as err:
log.warning("Failed to read existing repository: %s. Continuing download.", err)
repo = Repository(repository_url)
if repo.url != repository_url:
# We're getting a new repository
repo = Repository(repository_url)
try:
repo.refresh()
except DownloadException as err:
pipe_to_blender.send(DownloadError(err.status_code, err.message))
repo.to_file(repo_path) # TODO: this always writes even if repo wasn't changed
pipe_to_blender.send(Result(repo.to_dict(sort=True)))
def debug_hang():
"""Hangs for an hour. For testing purposes only."""
import time
time.sleep(3600)