diff --git a/pillar/api/file_storage/__init__.py b/pillar/api/file_storage/__init__.py index 3d335178..432c1ce1 100644 --- a/pillar/api/file_storage/__init__.py +++ b/pillar/api/file_storage/__init__.py @@ -383,10 +383,10 @@ def ensure_valid_link(response): else: log_link.debug('No expiry date for link; generating new link') - _generate_all_links(response, now) + generate_all_links(response, now) -def _generate_all_links(response, now): +def generate_all_links(response, now): """Generate a new link for the file and all its variations. :param response: the file document that should be updated. @@ -445,7 +445,7 @@ def on_pre_get_files(_, lookup): cursor = current_app.data.find('files', parsed_req, lookup_expired) for file_doc in cursor: # log.debug('Updating expired links for file %r.', file_doc['_id']) - _generate_all_links(file_doc, now) + generate_all_links(file_doc, now) def refresh_links_for_project(project_uuid, chunk_size, expiry_seconds): @@ -473,7 +473,7 @@ def refresh_links_for_project(project_uuid, chunk_size, expiry_seconds): for file_doc in to_refresh: log.debug('Refreshing links for file %s', file_doc['_id']) - _generate_all_links(file_doc, now) + generate_all_links(file_doc, now) log.info('Refreshed %i links', min(chunk_size, to_refresh.count())) @@ -528,7 +528,7 @@ def refresh_links_for_backend(backend_name, chunk_size, expiry_seconds): log.debug('Refreshing links for file %s', file_id) try: - _generate_all_links(file_doc, now) + generate_all_links(file_doc, now) except gcloud.exceptions.Forbidden: log.warning('Skipping file %s, GCS forbids us access to ' 'project %s bucket.', file_id, project_id) diff --git a/pillar/api/file_storage/moving.py b/pillar/api/file_storage/moving.py index ee5b2ba9..ce611c0e 100644 --- a/pillar/api/file_storage/moving.py +++ b/pillar/api/file_storage/moving.py @@ -1,7 +1,22 @@ """Code for moving files between backends.""" +import datetime +import logging +import os +import tempfile + +from bson import ObjectId +import bson.tz_util +from flask import current_app +import requests +import requests.exceptions + +from . import stream_to_gcs, generate_all_links, ensure_valid_link + __all__ = ['PrerequisiteNotMetError', 'change_file_storage_backend'] +log = logging.getLogger(__name__) + class PrerequisiteNotMetError(RuntimeError): """Raised when a file cannot be moved due to unmet prerequisites.""" @@ -13,47 +28,83 @@ def change_file_storage_backend(file_id, dest_backend): Files on the original backend are not deleted automatically. """ + dest_backend = unicode(dest_backend) + file_id = ObjectId(file_id) + # Fetch file document files_collection = current_app.data.driver.db['files'] - f = files_collection.find_one(ObjectId(file_id)) + f = files_collection.find_one(file_id) if f is None: raise ValueError('File with _id: {} not found'.format(file_id)) # Check that new backend differs from current one if dest_backend == f['backend']: - log.warning('Destination backend ({}) matches the current backend, we ' - 'are not moving the file'.format(dest_backend)) - return + raise PrerequisiteNotMetError('Destination backend ({}) matches the current backend, we ' + 'are not moving the file'.format(dest_backend)) # TODO Check that new backend is allowed (make conf var) # Check that the file has a project; without project, we don't know # which bucket to store the file into. - if 'project' not in f: - raise PrerequisiteNotMetError('File document {} does not have a project'.format(file_id)) + try: + project_id = f['project'] + except KeyError: + raise PrerequisiteNotMetError('File document does not have a project') - # Upload file (TODO: and variations) to the new backend - move_file_to_backend(f, dest_backend) + # Ensure that all links are up to date before we even attempt a download. + ensure_valid_link(f) - # Update document to reflect the changes + # Upload file and variations to the new backend + variations = f.get('variations', ()) + + try: + copy_file_to_backend(file_id, project_id, f, f['backend'], dest_backend) + except requests.exceptions.HTTPError as ex: + # allow the main file to be removed from storage. + if ex.response.status_code not in {404, 410}: + raise + if not variations: + raise PrerequisiteNotMetError('Main file ({link}) does not exist on server, ' + 'and no variations exist either'.format(**f)) + log.warning('Main file %s does not exist; skipping main and visiting variations', f['link']) + + for var in variations: + copy_file_to_backend(file_id, project_id, var, f['backend'], dest_backend) + + # Generate new links for the file & all variations. This also saves + # the new backend we set here. + f['backend'] = dest_backend + now = datetime.datetime.now(tz=bson.tz_util.utc) + generate_all_links(f, now) -def move_file_to_backend(file_doc, dest_backend): +def copy_file_to_backend(file_id, project_id, file_or_var, src_backend, dest_backend): + # Filenames on GCS do not contain paths, by our convention + internal_fname = os.path.basename(file_or_var['file_path']) + file_or_var['file_path'] = internal_fname + # If the file is not local already, fetch it - if file_doc['backend'] != 'local': - # TODO ensure that file['link'] is up to date - local_file = fetch_file_from_link(file_doc['link']) + if src_backend == 'pillar': + local_finfo = fetch_file_from_local(file_or_var) + else: + local_finfo = fetch_file_from_link(file_or_var['link']) # Upload to GCS - if dest_backend == 'gcs': - # Filenames on GCS do not contain paths, by our convention - internal_fname = os.path.basename(file_doc['file_path']) + if dest_backend != 'gcs': + raise ValueError('Only dest_backend="gcs" is supported now.') + + if current_app.config['TESTING']: + log.warning('Skipping actual upload to GCS due to TESTING') + else: # TODO check for name collisions - stream_to_gcs(file_doc['_id'], local_file['file_size'], + stream_to_gcs(file_id, local_finfo['file_size'], internal_fname=internal_fname, - project_id=str(file_doc['project']), - stream_for_gcs=local_file['local_file'], - content_type=local_file['content_type']) + project_id=str(project_id), + stream_for_gcs=local_finfo['local_file'], + content_type=local_finfo['content_type']) + + # No longer needed, so it can be closed & dispersed of. + local_finfo['local_file'].close() def fetch_file_from_link(link): @@ -61,22 +112,39 @@ def fetch_file_from_link(link): additional info (for upload to a different storage backend). """ + log.info('Downloading %s', link) r = requests.get(link, stream=True) r.raise_for_status() - - # If the file is not found we will use one from the variations. Original - # files might not exists because they were too large to keep. - if r.status_code == 404: - pass - local_file = tempfile.NamedTemporaryFile( - dir=current_app.config['STORAGE_DIR']) - with open(local_file, 'wb') as f: - for chunk in r.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) + + local_file = tempfile.NamedTemporaryFile(dir=current_app.config['STORAGE_DIR']) + log.info('Downloading to %s', local_file.name) + + for chunk in r.iter_content(chunk_size=1024): + if chunk: + local_file.write(chunk) + local_file.seek(0) + file_dict = { 'file_size': os.fstat(local_file.fileno()).st_size, 'content_type': r.headers['content-type'], 'local_file': local_file } return file_dict + + +def fetch_file_from_local(file_doc): + """Mimicks fetch_file_from_link(), but just returns the local file. + + :param file_doc: dict with 'link' key pointing to a path in STORAGE_DIR, and + 'content_type' key. + :type file_doc: dict + :rtype: dict + """ + + local_file = open(os.path.join(current_app.config['STORAGE_DIR'], file_doc['file_path']), 'rb') + local_finfo = { + 'file_size': os.fstat(local_file.fileno()).st_size, + 'content_type': file_doc['content_type'], + 'local_file': local_file + } + return local_finfo diff --git a/pillar/cli.py b/pillar/cli.py index 523ad54a..9c017e0e 100644 --- a/pillar/cli.py +++ b/pillar/cli.py @@ -399,4 +399,42 @@ def file_change_backend(file_id, dest_backend='gcs'): """ from pillar.api.file_storage.moving import change_file_storage_backend - change_file_storage_backend(file_id, dest_backend) \ No newline at end of file + change_file_storage_backend(file_id, dest_backend) + + +@manager.command +def mass_copy_between_backends(src_backend='cdnsun', dest_backend='gcs'): + """Copies all files from one backend to the other, updating them in Mongo. + + Files on the original backend are not deleted. + """ + + import requests.exceptions + + from pillar.api.file_storage import moving + + logging.getLogger('pillar').setLevel(logging.INFO) + log.info('Mass-moving all files from backend %r to %r', + src_backend, dest_backend) + + files_coll = current_app.data.driver.db['files'] + + fdocs = files_coll.find({'backend': src_backend}, + projection={'_id': True}) + copied_ok = 0 + copy_errs = 0 + for fdoc in fdocs: + try: + moving.change_file_storage_backend(fdoc['_id'], dest_backend) + except moving.PrerequisiteNotMetError as ex: + log.error('Error copying %s: %s', fdoc['_id'], ex) + copy_errs += 1 + except requests.exceptions.HTTPError as ex: + log.error('Error copying %s (%s): %s', + fdoc['_id'], ex.response.url, ex) + copy_errs += 1 + else: + copied_ok += 1 + + log.info('%i files copied ok', copied_ok) + log.info('%i files we did not copy', copy_errs) diff --git a/tests/test_api/test_file_storage_moving.py b/tests/test_api/test_file_storage_moving.py new file mode 100644 index 00000000..3cf8a7ea --- /dev/null +++ b/tests/test_api/test_file_storage_moving.py @@ -0,0 +1,152 @@ +from __future__ import absolute_import + +import datetime + +import responses + +from pillar.tests import AbstractPillarTest + +# Always do a final test run (and commit with) assert_all_requests_are_fired=True. +# Setting it to False can help track down other issues, though, that can be masked +# by the error of RequestsMock. +mock = responses.RequestsMock( + assert_all_requests_are_fired=True + # assert_all_requests_are_fired=False +) + + +class ChangeBackendTest(AbstractPillarTest): + def setUp(self, **kwargs): + AbstractPillarTest.setUp(self, **kwargs) + + self.project_id, self.project = self.ensure_project_exists() + responses.assert_all_requests_are_fired = True + + @mock.activate + def test_file_and_variations(self): + from pillar.api.file_storage import moving, generate_link + + image_file_id, fdoc = self._create_image_file_doc() + + # Expect GETs on regenerated links. + mock.add(mock.GET, + generate_link('unittest', fdoc['file_path']), + body='file-content', + content_type='image/jpeg') + + for variation in fdoc['variations']: + mock.add(mock.GET, + generate_link('unittest', variation['file_path']), + body='file-content', + content_type='image/jpeg') + + with self.app.test_request_context(): + moving.change_file_storage_backend(image_file_id, 'gcs') + + # Check that the file document has been updated correctly + files_coll = self.app.data.driver.db['files'] + fdoc = files_coll.find_one(image_file_id) + + self.assertEqual(u'gcs', fdoc['backend']) + self.assertIn(u'/path/to/testing/gcs/', fdoc['link']) + + for variation in fdoc['variations']: + self.assertIn(u'/path/to/testing/gcs/', variation['link']) + + @mock.activate + def test_only_variations(self): + from pillar.api.file_storage import moving, generate_link + + image_file_id, fdoc = self._create_image_file_doc() + + # Expect GETs on regenerated links. + mock.add(mock.GET, + generate_link('unittest', fdoc['file_path']), + status=404) + + for variation in fdoc['variations']: + mock.add(mock.GET, + generate_link('unittest', variation['file_path']), + body='file-content', + content_type='image/jpeg') + + with self.app.test_request_context(): + moving.change_file_storage_backend(image_file_id, 'gcs') + + # Check that the file document has been updated correctly + files_coll = self.app.data.driver.db['files'] + fdoc = files_coll.find_one(image_file_id) + + self.assertEqual(u'gcs', fdoc['backend']) + self.assertIn(u'/path/to/testing/gcs/', fdoc['link']) + + for variation in fdoc['variations']: + self.assertIn(u'/path/to/testing/gcs/', variation['link']) + + @mock.activate + def test_no_variations(self): + from pillar.api.file_storage import moving, generate_link + + image_file_id, fdoc = self._create_image_file_doc(variations=False) + + # Expect GETs on regenerated links. + mock.add(mock.GET, + generate_link('unittest', fdoc['file_path']), + body='file-content', + content_type='image/jpeg') + + with self.app.test_request_context(): + moving.change_file_storage_backend(image_file_id, 'gcs') + + # Check that the file document has been updated correctly + files_coll = self.app.data.driver.db['files'] + fdoc = files_coll.find_one(image_file_id) + + self.assertEqual(u'gcs', fdoc['backend']) + self.assertIn(u'/path/to/testing/gcs/', fdoc['link']) + + def _create_image_file_doc(self, variations=True): + fdoc = {'status': 'complete', 'name': 'some-hash.jpg', 'backend': 'unittest', + 'format': 'jpeg', + 'filename': 'image-micak.jpg', 'project': self.project_id, 'length': 2708160, + 'content_type': 'image/jpeg', 'file_path': '3c61e953ee644786b98027e043fd3af3.jpg', + 'length_aggregate_in_bytes': 3196056, + 'link': 'https://server.cdnsun/projid/_%2Fsome-hash.jpg', + 'link_expires': datetime.datetime(2016, 8, 23, 15, 23, 48), 'md5': '',} + + if variations: + fdoc['variations'] = [ + {'length': 3312, 'link': 'https://server.cdnsun/projid/_%2Fsome-hash-b.jpg', + 'content_type': 'image/jpeg', + 'file_path': '3c61e953ee644786b98027e043fd3af3-b.jpg', 'size': 'b', 'md5': ''}, + {'height': 2048, 'width': 2048, 'length': 381736, + 'link': 'https://server.cdnsun/projid/_%2Fsome-hash-h.jpg', + 'content_type': 'image/jpeg', 'md5': '', + 'file_path': '3c61e953ee644786b98027e043fd3af3-h.jpg', 'size': 'h'}, + {'height': 320, 'width': 320, 'length': 8818, + 'link': 'https://server.cdnsun/projid/_%2Fsome-hash-m.jpg', + 'content_type': 'image/jpeg', 'md5': '', + 'file_path': '3c61e953ee644786b98027e043fd3af3-m.jpg', 'size': 'm'}, + {'height': 1024, 'width': 1024, 'length': 89012, + 'link': 'https://server.cdnsun/projid/_%2Fsome-hash-l.jpg', + 'content_type': 'image/jpeg', 'md5': '', + 'file_path': '3c61e953ee644786b98027e043fd3af3-l.jpg', 'size': 'l'}, + {'height': 90, 'width': 90, 'length': 1774, + 'link': 'https://server.cdnsun/projid/_%2Fsome-hash-s.jpg', + 'content_type': 'image/jpeg', 'md5': '', + 'file_path': '3c61e953ee644786b98027e043fd3af3-s.jpg', 'size': 's'}, + {'height': 160, 'width': 160, 'length': 3244, + 'link': 'https://server.cdnsun/projid/_%2Fsome-hash-t.jpg', + 'content_type': 'image/jpeg', 'is_public': True, 'md5': '', + 'file_path': '3c61e953ee644786b98027e043fd3af3-t.jpg', 'size': 't'}] + + with self.app.test_request_context(): + files_coll = self.app.data.driver.db['files'] + + result = files_coll.insert_one(fdoc) + file_id = result.inserted_id + + # Re-fetch from the database, so that we're sure we return the same as is stored. + # This is necessary as datetimes are rounded by MongoDB. + from_db = files_coll.find_one(file_id) + return file_id, from_db