This repository has been archived on 2023-10-03. You can view files and clone it, but cannot push or open issues or pull requests.
Sybren A. Stüvel 5eaee872bf Added check for user's roles -- disallow usage by non-subscribers.
This makes it clear from the get-go that users need to subscribe. Otherwise
they'll get unexpected errors once they try to download something.
2016-05-10 14:52:51 +02:00

554 lines
21 KiB
Python

import asyncio
import json
import os
import functools
import logging
from contextlib import closing, contextmanager
import pathlib
import requests
import requests.structures
import pillarsdk
import pillarsdk.exceptions
import pillarsdk.utils
from pillarsdk.utils import sanitize_filename
from . import cache
SUBCLIENT_ID = 'PILLAR'
_pillar_api = None # will become a pillarsdk.Api object.
log = logging.getLogger(__name__)
uncached_session = requests.session()
_testing_blender_id_profile = None # Just for testing, overrides what is returned by blender_id_profile.
_downloaded_urls = set() # URLs we've downloaded this Blender session.
class UserNotLoggedInError(RuntimeError):
"""Raised when the user should be logged in on Blender ID, but isn't.
This is basically for every interaction with Pillar.
"""
def __str__(self):
return self.__class__.__name__
class CredentialsNotSyncedError(UserNotLoggedInError):
"""Raised when the user may be logged in on Blender ID, but has no Blender Cloud token."""
class NotSubscribedToCloudError(UserNotLoggedInError):
"""Raised when the user may be logged in on Blender ID, but has no Blender Cloud token."""
class PillarError(RuntimeError):
"""Raised when there is some issue with the communication with Pillar.
This is only raised for logical errors (for example nodes that should
exist but still can't be found). HTTP communication errors are signalled
with other exceptions.
"""
class CloudPath(pathlib.PurePosixPath):
"""Cloud path, in the form of /project uuid/node uuid/node uuid/...
The components are:
- the root '/'
- the project UUID
- zero or more node UUIDs.
"""
@property
def project_uuid(self) -> str:
assert self.parts[0] == '/'
return self.parts[1]
@property
def node_uuids(self) -> list:
assert self.parts[0] == '/'
return self.parts[2:]
@property
def node_uuid(self) -> str:
node_uuids = self.node_uuids
if not node_uuids:
return None
return node_uuids[-1]
@contextmanager
def with_existing_dir(filename: str, open_mode: str, encoding=None):
"""Opens a file, ensuring its directory exists."""
directory = os.path.dirname(filename)
if not os.path.exists(directory):
log.debug('Creating directory %s', directory)
os.makedirs(directory, exist_ok=True)
with open(filename, open_mode, encoding=encoding) as file_object:
yield file_object
def save_as_json(pillar_resource, json_filename):
with with_existing_dir(json_filename, 'w') as outfile:
log.debug('Saving metadata to %r' % json_filename)
json.dump(pillar_resource, outfile, sort_keys=True, cls=pillarsdk.utils.PillarJSONEncoder)
def blender_id_profile() -> 'blender_id.BlenderIdProfile':
"""Returns the Blender ID profile of the currently logged in user."""
# Allow overriding before we import the bpy module.
if _testing_blender_id_profile is not None:
return _testing_blender_id_profile
import blender_id
return blender_id.get_active_profile()
def pillar_api(pillar_endpoint: str = None) -> pillarsdk.Api:
"""Returns the Pillar SDK API object for the current user.
The user must be logged in.
:param pillar_endpoint: URL of the Pillar server, for testing purposes. If not specified,
it will use the addon preferences.
"""
global _pillar_api
# Only return the Pillar API object if the user is still logged in.
profile = blender_id_profile()
if not profile:
raise UserNotLoggedInError()
subclient = profile.subclients.get(SUBCLIENT_ID)
if not subclient:
raise CredentialsNotSyncedError()
if _pillar_api is None:
# Allow overriding the endpoint before importing Blender-specific stuff.
if pillar_endpoint is None:
from . import blender
pillar_endpoint = blender.preferences().pillar_server
pillarsdk.Api.requests_session = cache.requests_session()
_pillar_api = pillarsdk.Api(endpoint=pillar_endpoint,
username=subclient['subclient_user_id'],
password=SUBCLIENT_ID,
token=subclient['token'])
return _pillar_api
# No more than this many Pillar calls should be made simultaneously
pillar_semaphore = asyncio.Semaphore(3)
async def pillar_call(pillar_func, *args, **kwargs):
partial = functools.partial(pillar_func, *args, api=pillar_api(), **kwargs)
loop = asyncio.get_event_loop()
async with pillar_semaphore:
return await loop.run_in_executor(None, partial)
async def check_pillar_credentials():
"""Tries to obtain the user at Pillar using the user's credentials.
:raises UserNotLoggedInError: when the user is not logged in on Blender ID.
:raises CredentialsNotSyncedError: when the user is logged in on Blender ID but
doesn't have a valid subclient token for Pillar.
"""
profile = blender_id_profile()
if not profile:
raise UserNotLoggedInError()
subclient = profile.subclients.get(SUBCLIENT_ID)
if not subclient:
raise CredentialsNotSyncedError()
pillar_user_id = subclient['subclient_user_id']
if not pillar_user_id:
raise CredentialsNotSyncedError()
try:
db_user = await pillar_call(pillarsdk.User.find, pillar_user_id)
except pillarsdk.UnauthorizedAccess:
raise CredentialsNotSyncedError()
roles = db_user.roles
log.debug('User has roles %r', roles)
if not roles or not {'subscriber', 'demo'}.intersection(set(roles)):
# Delete the subclient info. This forces a re-check later, which can
# then pick up on the user's new status.
del profile.subclients[SUBCLIENT_ID]
profile.save_json()
raise NotSubscribedToCloudError()
async def refresh_pillar_credentials():
"""Refreshes the authentication token on Pillar.
:raises blender_id.BlenderIdCommError: when Blender ID refuses to send a token to Pillar.
:raises Exception: when the Pillar credential check fails.
"""
global _pillar_api
import blender_id
from . import blender
pillar_endpoint = blender.preferences().pillar_server.rstrip('/')
# Create a subclient token and send it to Pillar.
# May raise a blender_id.BlenderIdCommError
blender_id.create_subclient_token(SUBCLIENT_ID, pillar_endpoint)
# Test the new URL
_pillar_api = None
await check_pillar_credentials()
async def get_project_uuid(project_url: str) -> str:
"""Returns the UUID for the project, given its '/p/<project_url>' string."""
try:
project = await pillar_call(pillarsdk.Project.find_one, {
'where': {'url': project_url},
'projection': {'permissions': 1},
})
except pillarsdk.exceptions.ResourceNotFound:
log.error('Project with URL %r does not exist', project_url)
return None
log.info('Found project %r', project)
return project['_id']
async def get_nodes(project_uuid: str = None, parent_node_uuid: str = None,
node_type: str = None) -> list:
"""Gets nodes for either a project or given a parent node.
@param project_uuid: the UUID of the project, or None if only querying by parent_node_uuid.
@param parent_node_uuid: the UUID of the parent node. Can be the empty string if the
node should be a top-level node in the project. Can also be None to query all nodes in a
project. In both these cases the project UUID should be given.
"""
if not project_uuid and not parent_node_uuid:
raise ValueError('get_nodes(): either project_uuid or parent_node_uuid must be given.')
where = {'properties.status': 'published'}
# Build the parent node where-clause
if parent_node_uuid == '':
where['parent'] = {'$exists': False}
elif parent_node_uuid is not None:
where['parent'] = parent_node_uuid
# Build the project where-clause
if project_uuid:
where['project'] = project_uuid
if node_type:
where['node_type'] = node_type
children = await pillar_call(pillarsdk.Node.all, {
'projection': {'name': 1, 'parent': 1, 'node_type': 1,
'properties.order': 1, 'properties.status': 1,
'properties.files': 1,
'properties.content_type': 1, 'picture': 1},
'where': where,
'sort': 'properties.order',
'embed': ['parent']})
return children['_items']
async def download_to_file(url, filename, *,
header_store: str,
chunk_size=100 * 1024,
future: asyncio.Future = None):
"""Downloads a file via HTTP(S) directly to the filesystem."""
stored_headers = {}
if os.path.exists(filename) and os.path.exists(header_store):
log.debug('Loading cached headers %r', header_store)
try:
with open(header_store, 'r') as infile:
stored_headers = requests.structures.CaseInsensitiveDict(json.load(infile))
# Check file length.
expected_content_length = int(stored_headers['Content-Length'])
statinfo = os.stat(filename)
if expected_content_length == statinfo.st_size:
# File exists, and is of the correct length. Don't bother downloading again
# if we already downloaded it this session.
if url in _downloaded_urls:
log.debug('Already downloaded %s this session, skipping this request.',
url)
return
else:
log.debug('File size should be %i but is %i; ignoring cache.',
expected_content_length, statinfo.st_size)
stored_headers = {}
except Exception as ex:
log.warning('Unable to load headers from %r, ignoring cache: %s', header_store, str(ex))
loop = asyncio.get_event_loop()
# Separated doing the GET and downloading the body of the GET, so that we can cancel
# the download in between.
def perform_get_request() -> requests.Request:
headers = {}
try:
if stored_headers['Last-Modified']:
headers['If-Modified-Since'] = stored_headers['Last-Modified']
except KeyError:
pass
try:
if stored_headers['ETag']:
headers['If-None-Match'] = stored_headers['ETag']
except KeyError:
pass
if is_cancelled(future):
log.debug('Downloading was cancelled before doing the GET.')
raise asyncio.CancelledError('Downloading was cancelled')
log.debug('Performing GET request, waiting for response.')
return uncached_session.get(url, headers=headers, stream=True, verify=True)
# Download the file in a different thread.
def download_loop():
with with_existing_dir(filename, 'wb') as outfile:
with closing(response):
for block in response.iter_content(chunk_size=chunk_size):
if is_cancelled(future):
raise asyncio.CancelledError('Downloading was cancelled')
outfile.write(block)
# Check for cancellation even before we start our GET request
if is_cancelled(future):
log.debug('Downloading was cancelled before doing the GET')
raise asyncio.CancelledError('Downloading was cancelled')
log.debug('Performing GET %s', url)
response = await loop.run_in_executor(None, perform_get_request)
log.debug('Status %i from GET %s', response.status_code, url)
response.raise_for_status()
if response.status_code == 304:
# The file we have cached is still good, just use that instead.
_downloaded_urls.add(url)
return
# After we performed the GET request, we should check whether we should start
# the download at all.
if is_cancelled(future):
log.debug('Downloading was cancelled before downloading the GET response')
raise asyncio.CancelledError('Downloading was cancelled')
log.debug('Downloading response of GET %s', url)
await loop.run_in_executor(None, download_loop)
log.debug('Done downloading response of GET %s', url)
# We're done downloading, now we have something cached we can use.
log.debug('Saving header cache to %s', header_store)
_downloaded_urls.add(url)
with with_existing_dir(header_store, 'w') as outfile:
json.dump({
'ETag': str(response.headers.get('etag', '')),
'Last-Modified': response.headers.get('Last-Modified'),
'Content-Length': response.headers.get('Content-Length'),
}, outfile, sort_keys=True)
async def fetch_thumbnail_info(file: pillarsdk.File, directory: str, desired_size: str):
"""Fetches thumbnail information from Pillar.
@param file: the pillar File object that represents the image whose thumbnail to download.
@param directory: the directory to save the file to.
@param desired_size: thumbnail size
@return: (url, path), where 'url' is the URL to download the thumbnail from, and 'path' is the absolute path of the
where the thumbnail should be downloaded to. Returns None, None if the task was cancelled before downloading
finished.
"""
thumb_link = await pillar_call(file.thumbnail_file, desired_size)
if thumb_link is None:
raise ValueError("File {} has no thumbnail of size {}"
.format(file['_id'], desired_size))
root, ext = os.path.splitext(file['file_path'])
thumb_fname = sanitize_filename('{0}-{1}.jpg'.format(root, desired_size))
thumb_path = os.path.abspath(os.path.join(directory, thumb_fname))
return thumb_link, thumb_path
async def fetch_texture_thumbs(parent_node_uuid: str, desired_size: str,
thumbnail_directory: str,
*,
thumbnail_loading: callable,
thumbnail_loaded: callable,
future: asyncio.Future = None):
"""Generator, fetches all texture thumbnails in a certain parent node.
@param parent_node_uuid: the UUID of the parent node. All sub-nodes will be downloaded.
@param desired_size: size indicator, from 'sbtmlh'.
@param thumbnail_directory: directory in which to store the downloaded thumbnails.
@param thumbnail_loading: callback function that takes (pillarsdk.Node, pillarsdk.File)
parameters, which is called before a thumbnail will be downloaded. This allows you to
show a "downloading" indicator.
@param thumbnail_loaded: callback function that takes (pillarsdk.Node, pillarsdk.File object,
thumbnail path) parameters, which is called for every thumbnail after it's been downloaded.
@param future: Future that's inspected; if it is not None and cancelled, texture downloading
is aborted.
"""
# Download all texture nodes in parallel.
log.debug('Getting child nodes of node %r', parent_node_uuid)
texture_nodes = await get_nodes(parent_node_uuid=parent_node_uuid,
node_type='texture')
if is_cancelled(future):
log.warning('fetch_texture_thumbs: Texture downloading cancelled')
return
coros = (download_texture_thumbnail(texture_node, desired_size,
thumbnail_directory,
thumbnail_loading=thumbnail_loading,
thumbnail_loaded=thumbnail_loaded,
future=future)
for texture_node in texture_nodes)
# raises any exception from failed handle_texture_node() calls.
await asyncio.gather(*coros)
log.info('fetch_texture_thumbs: Done downloading texture thumbnails')
async def download_texture_thumbnail(texture_node, desired_size: str,
thumbnail_directory: str,
*,
thumbnail_loading: callable,
thumbnail_loaded: callable,
future: asyncio.Future = None):
# Skip non-texture nodes, as we can't thumbnail them anyway.
if texture_node['node_type'] != 'texture':
return
if is_cancelled(future):
log.debug('fetch_texture_thumbs cancelled before finding File for texture %r',
texture_node['_id'])
return
loop = asyncio.get_event_loop()
# Find the File that belongs to this texture node
pic_uuid = texture_node['picture']
loop.call_soon_threadsafe(thumbnail_loading, texture_node, texture_node)
file_desc = await pillar_call(pillarsdk.File.find, pic_uuid, params={
'projection': {'filename': 1, 'variations': 1, 'width': 1, 'height': 1},
})
if file_desc is None:
log.warning('Unable to find file for texture node %s', pic_uuid)
thumb_path = None
else:
if is_cancelled(future):
log.debug('fetch_texture_thumbs cancelled before downloading file %r',
file_desc['_id'])
return
# Get the thumbnail information from Pillar
thumb_url, thumb_path = await fetch_thumbnail_info(file_desc, thumbnail_directory,
desired_size)
if thumb_path is None:
# The task got cancelled, we should abort too.
log.debug('fetch_texture_thumbs cancelled while downloading file %r',
file_desc['_id'])
return
# Cached headers are stored next to thumbnails in sidecar files.
header_store = '%s.headers' % thumb_path
await download_to_file(thumb_url, thumb_path, header_store=header_store, future=future)
loop.call_soon_threadsafe(thumbnail_loaded, texture_node, file_desc, thumb_path)
async def download_file_by_uuid(file_uuid,
target_directory: str,
metadata_directory: str,
*,
map_type: str = None,
file_loading: callable,
file_loaded: callable,
future: asyncio.Future):
if is_cancelled(future):
log.debug('download_file_by_uuid(%r) cancelled.', file_uuid)
return
loop = asyncio.get_event_loop()
# Find the File document.
file_desc = await pillar_call(pillarsdk.File.find, file_uuid, params={
'projection': {'link': 1, 'filename': 1},
})
# Save the file document to disk
metadata_file = os.path.join(metadata_directory, 'files', '%s.json' % file_uuid)
save_as_json(file_desc, metadata_file)
file_path = os.path.join(target_directory,
sanitize_filename('%s-%s' % (map_type, file_desc['filename'])))
file_url = file_desc['link']
# log.debug('Texture %r:\n%s', file_uuid, pprint.pformat(file_desc.to_dict()))
loop.call_soon_threadsafe(file_loading, file_path, file_desc)
# Cached headers are stored in the project space
header_store = os.path.join(metadata_directory, 'files',
sanitize_filename('%s.headers' % file_uuid))
await download_to_file(file_url, file_path, header_store=header_store, future=future)
loop.call_soon_threadsafe(file_loaded, file_path, file_desc)
async def download_texture(texture_node,
target_directory: str,
metadata_directory: str,
*,
texture_loading: callable,
texture_loaded: callable,
future: asyncio.Future):
if texture_node['node_type'] != 'texture':
raise TypeError("Node type should be 'texture', not %r" % texture_node['node_type'])
# Download every file. Eve doesn't support embedding from a list-of-dicts.
downloaders = (download_file_by_uuid(file_info['file'],
target_directory,
metadata_directory,
map_type=file_info['map_type'],
file_loading=texture_loading,
file_loaded=texture_loaded,
future=future)
for file_info in texture_node['properties']['files'])
return await asyncio.gather(*downloaders, return_exceptions=True)
def is_cancelled(future: asyncio.Future) -> bool:
# assert future is not None # for debugging purposes.
cancelled = future is not None and future.cancelled()
return cancelled