Moving blobs between nodes now uses storage API
Instead of only being GCS-specific, it now works for all storage backends.
This commit is contained in:
parent
eb18e5b933
commit
cd42ce6cba
@ -12,7 +12,7 @@ from flask import current_app
|
|||||||
from pillar.api import utils
|
from pillar.api import utils
|
||||||
from . import stream_to_gcs, generate_all_links, ensure_valid_link
|
from . import stream_to_gcs, generate_all_links, ensure_valid_link
|
||||||
|
|
||||||
__all__ = ['PrerequisiteNotMetError', 'change_file_storage_backend']
|
__all__ = ['PrerequisiteNotMetError', 'change_file_storage_backend', 'move_to_bucket']
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -150,31 +150,39 @@ def fetch_file_from_local(file_doc):
|
|||||||
return local_finfo
|
return local_finfo
|
||||||
|
|
||||||
|
|
||||||
def gcs_move_to_bucket(file_id, dest_project_id, skip_gcs=False):
|
def move_to_bucket(file_id: ObjectId, dest_project_id: ObjectId, *, skip_storage=False):
|
||||||
"""Moves a file from its own bucket to the new project_id bucket."""
|
"""Move a file + variations from its own bucket to the new project_id bucket.
|
||||||
|
|
||||||
files_coll = current_app.db()['files']
|
:param file_id: ID of the file to move.
|
||||||
|
:param dest_project_id: Project to move to.
|
||||||
|
:param skip_storage: If True, the storage bucket will not be touched.
|
||||||
|
Only use this when you know what you're doing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
files_coll = current_app.db('files')
|
||||||
f = files_coll.find_one(file_id)
|
f = files_coll.find_one(file_id)
|
||||||
if f is None:
|
if f is None:
|
||||||
raise ValueError('File with _id: {} not found'.format(file_id))
|
raise ValueError(f'File with _id: {file_id} not found')
|
||||||
|
|
||||||
# Check that new backend differs from current one
|
|
||||||
if f['backend'] != 'gcs':
|
|
||||||
raise ValueError('Only Google Cloud Storage is supported for now.')
|
|
||||||
|
|
||||||
# Move file and variations to the new bucket.
|
# Move file and variations to the new bucket.
|
||||||
if skip_gcs:
|
if skip_storage:
|
||||||
log.warning('NOT ACTUALLY MOVING file %s on GCS, just updating MongoDB', file_id)
|
log.warning('NOT ACTUALLY MOVING file %s on storage, just updating MongoDB', file_id)
|
||||||
else:
|
else:
|
||||||
from pillar.api.file_storage_backends.gcs import GoogleCloudStorageBucket
|
from pillar.api.file_storage_backends import Bucket
|
||||||
|
bucket_class = Bucket.for_backend(f['backend'])
|
||||||
|
src_bucket = bucket_class(str(f['project']))
|
||||||
|
dst_bucket = bucket_class(str(dest_project_id))
|
||||||
|
|
||||||
|
src_blob = src_bucket.get_blob(f['file_path'])
|
||||||
|
src_bucket.copy_blob(src_blob, dst_bucket)
|
||||||
|
|
||||||
src_project = f['project']
|
|
||||||
GoogleCloudStorageBucket.copy_to_bucket(f['file_path'], src_project, dest_project_id)
|
|
||||||
for var in f.get('variations', []):
|
for var in f.get('variations', []):
|
||||||
GoogleCloudStorageBucket.copy_to_bucket(var['file_path'], src_project, dest_project_id)
|
src_blob = src_bucket.get_blob(var['file_path'])
|
||||||
|
src_bucket.copy_blob(src_blob, dst_bucket)
|
||||||
|
|
||||||
# Update the file document after moving was successful.
|
# Update the file document after moving was successful.
|
||||||
|
# No need to update _etag or _updated, since that'll be done when
|
||||||
|
# the links are regenerated at the end of this function.
|
||||||
log.info('Switching file %s to project %s', file_id, dest_project_id)
|
log.info('Switching file %s to project %s', file_id, dest_project_id)
|
||||||
update_result = files_coll.update_one({'_id': file_id},
|
update_result = files_coll.update_one({'_id': file_id},
|
||||||
{'$set': {'project': dest_project_id}})
|
{'$set': {'project': dest_project_id}})
|
||||||
|
@ -12,8 +12,8 @@ __all__ = ['Bucket', 'Blob', 'Path', 'FileType']
|
|||||||
Path = pathlib.PurePosixPath
|
Path = pathlib.PurePosixPath
|
||||||
|
|
||||||
# This is a mess: typing.IO keeps mypy-0.501 happy, but not in all cases,
|
# This is a mess: typing.IO keeps mypy-0.501 happy, but not in all cases,
|
||||||
# and io.FileIO keeps PyCharm-2017.1 happy.
|
# and io.FileIO + io.BytesIO keeps PyCharm-2017.1 happy.
|
||||||
FileType = typing.Union[typing.IO, io.FileIO]
|
FileType = typing.Union[typing.IO, io.FileIO, io.BytesIO]
|
||||||
|
|
||||||
|
|
||||||
class Bucket(metaclass=abc.ABCMeta):
|
class Bucket(metaclass=abc.ABCMeta):
|
||||||
@ -31,7 +31,7 @@ class Bucket(metaclass=abc.ABCMeta):
|
|||||||
backend_name: str = None # define in subclass.
|
backend_name: str = None # define in subclass.
|
||||||
|
|
||||||
def __init__(self, name: str) -> None:
|
def __init__(self, name: str) -> None:
|
||||||
self.name = name
|
self.name = str(name)
|
||||||
|
|
||||||
def __init_subclass__(cls):
|
def __init_subclass__(cls):
|
||||||
assert cls.backend_name, '%s.backend_name must be non-empty string' % cls
|
assert cls.backend_name, '%s.backend_name must be non-empty string' % cls
|
||||||
|
@ -1,11 +1,9 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import pathlib
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
import pathlib
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
|
||||||
from pillar.api.utils.imaging import generate_local_thumbnails
|
|
||||||
|
|
||||||
__all__ = ['LocalBucket', 'LocalBlob']
|
__all__ = ['LocalBucket', 'LocalBlob']
|
||||||
|
|
||||||
from .abstract import Bucket, Blob, FileType, Path
|
from .abstract import Bucket, Blob, FileType, Path
|
||||||
@ -22,7 +20,7 @@ class LocalBucket(Bucket):
|
|||||||
# For local storage, the name is actually a partial path, relative
|
# For local storage, the name is actually a partial path, relative
|
||||||
# to the local storage root.
|
# to the local storage root.
|
||||||
self.root = pathlib.Path(current_app.config['STORAGE_DIR'])
|
self.root = pathlib.Path(current_app.config['STORAGE_DIR'])
|
||||||
self.bucket_path = pathlib.PurePosixPath(name[:2]) / name
|
self.bucket_path = pathlib.PurePosixPath(self.name[:2]) / self.name
|
||||||
self.abspath = self.root / self.bucket_path
|
self.abspath = self.root / self.bucket_path
|
||||||
|
|
||||||
def blob(self, blob_name: str) -> 'LocalBlob':
|
def blob(self, blob_name: str) -> 'LocalBlob':
|
||||||
@ -49,7 +47,14 @@ class LocalBucket(Bucket):
|
|||||||
# TODO: implement content type handling for local storage.
|
# TODO: implement content type handling for local storage.
|
||||||
self._log.warning('Unable to set correct file content type for %s', dest_blob)
|
self._log.warning('Unable to set correct file content type for %s', dest_blob)
|
||||||
|
|
||||||
with open(blob.abspath(), 'rb') as src_file:
|
fpath = blob.abspath()
|
||||||
|
if not fpath.exists():
|
||||||
|
if not fpath.parent.exists():
|
||||||
|
raise FileNotFoundError(f'File {fpath} does not exist, and neither does its parent,'
|
||||||
|
f' unable to copy to {to_bucket}')
|
||||||
|
raise FileNotFoundError(f'File {fpath} does not exist, unable to copy to {to_bucket}')
|
||||||
|
|
||||||
|
with open(fpath, 'rb') as src_file:
|
||||||
dest_blob.create_from_file(src_file, content_type='application/x-octet-stream')
|
dest_blob.create_from_file(src_file, content_type='application/x-octet-stream')
|
||||||
|
|
||||||
def rename_blob(self, blob: 'LocalBlob', new_name: str) -> 'LocalBlob':
|
def rename_blob(self, blob: 'LocalBlob', new_name: str) -> 'LocalBlob':
|
||||||
|
@ -61,8 +61,8 @@ class NodeMover(object):
|
|||||||
"""Moves a single file to another project"""
|
"""Moves a single file to another project"""
|
||||||
|
|
||||||
self._log.info('Moving file %s to project %s', file_id, dest_proj['_id'])
|
self._log.info('Moving file %s to project %s', file_id, dest_proj['_id'])
|
||||||
pillar.api.file_storage.moving.gcs_move_to_bucket(file_id, dest_proj['_id'],
|
pillar.api.file_storage.moving.move_to_bucket(file_id, dest_proj['_id'],
|
||||||
skip_gcs=self.skip_gcs)
|
skip_storage=self.skip_gcs)
|
||||||
|
|
||||||
def _files(self, file_ref, *properties):
|
def _files(self, file_ref, *properties):
|
||||||
"""Yields file ObjectIDs."""
|
"""Yields file ObjectIDs."""
|
||||||
|
@ -106,11 +106,11 @@ class NodeMoverTest(unittest.TestCase):
|
|||||||
self.db['nodes'].update_one.return_value = update_res
|
self.db['nodes'].update_one.return_value = update_res
|
||||||
self.mover.change_project(node, new_project)
|
self.mover.change_project(node, new_project)
|
||||||
|
|
||||||
mock_fsmoving.gcs_move_to_bucket.assert_has_calls([
|
mock_fsmoving.move_to_bucket.assert_has_calls([
|
||||||
mock.call(ObjectId(24 * 'b'), prid, skip_gcs=False),
|
mock.call(ObjectId(24 * 'b'), prid, skip_storage=False),
|
||||||
mock.call(ObjectId(24 * 'e'), prid, skip_gcs=False),
|
mock.call(ObjectId(24 * 'e'), prid, skip_storage=False),
|
||||||
mock.call(ObjectId(24 * 'c'), prid, skip_gcs=False),
|
mock.call(ObjectId(24 * 'c'), prid, skip_storage=False),
|
||||||
mock.call(ObjectId(24 * 'd'), prid, skip_gcs=False),
|
mock.call(ObjectId(24 * 'd'), prid, skip_storage=False),
|
||||||
])
|
])
|
||||||
|
|
||||||
@mock.patch('pillar.api.file_storage.moving', autospec=True)
|
@mock.patch('pillar.api.file_storage.moving', autospec=True)
|
||||||
@ -135,7 +135,7 @@ class NodeMoverTest(unittest.TestCase):
|
|||||||
self.db['nodes'].update_one.return_value = update_res
|
self.db['nodes'].update_one.return_value = update_res
|
||||||
self.mover.change_project(node, new_project)
|
self.mover.change_project(node, new_project)
|
||||||
|
|
||||||
mock_fsmoving.gcs_move_to_bucket.assert_has_calls([
|
mock_fsmoving.move_to_bucket.assert_has_calls([
|
||||||
mock.call(ObjectId(24 * 'e'), prid, skip_gcs=False),
|
mock.call(ObjectId(24 * 'e'), prid, skip_storage=False),
|
||||||
mock.call(ObjectId(24 * 'b'), prid, skip_gcs=False),
|
mock.call(ObjectId(24 * 'b'), prid, skip_storage=False),
|
||||||
])
|
])
|
||||||
|
Loading…
x
Reference in New Issue
Block a user