Copying files to other backend now works
This commit is contained in:
@@ -383,10 +383,10 @@ def ensure_valid_link(response):
|
|||||||
else:
|
else:
|
||||||
log_link.debug('No expiry date for link; generating new link')
|
log_link.debug('No expiry date for link; generating new link')
|
||||||
|
|
||||||
_generate_all_links(response, now)
|
generate_all_links(response, now)
|
||||||
|
|
||||||
|
|
||||||
def _generate_all_links(response, now):
|
def generate_all_links(response, now):
|
||||||
"""Generate a new link for the file and all its variations.
|
"""Generate a new link for the file and all its variations.
|
||||||
|
|
||||||
:param response: the file document that should be updated.
|
:param response: the file document that should be updated.
|
||||||
@@ -445,7 +445,7 @@ def on_pre_get_files(_, lookup):
|
|||||||
cursor = current_app.data.find('files', parsed_req, lookup_expired)
|
cursor = current_app.data.find('files', parsed_req, lookup_expired)
|
||||||
for file_doc in cursor:
|
for file_doc in cursor:
|
||||||
# log.debug('Updating expired links for file %r.', file_doc['_id'])
|
# log.debug('Updating expired links for file %r.', file_doc['_id'])
|
||||||
_generate_all_links(file_doc, now)
|
generate_all_links(file_doc, now)
|
||||||
|
|
||||||
|
|
||||||
def refresh_links_for_project(project_uuid, chunk_size, expiry_seconds):
|
def refresh_links_for_project(project_uuid, chunk_size, expiry_seconds):
|
||||||
@@ -473,7 +473,7 @@ def refresh_links_for_project(project_uuid, chunk_size, expiry_seconds):
|
|||||||
|
|
||||||
for file_doc in to_refresh:
|
for file_doc in to_refresh:
|
||||||
log.debug('Refreshing links for file %s', file_doc['_id'])
|
log.debug('Refreshing links for file %s', file_doc['_id'])
|
||||||
_generate_all_links(file_doc, now)
|
generate_all_links(file_doc, now)
|
||||||
|
|
||||||
log.info('Refreshed %i links', min(chunk_size, to_refresh.count()))
|
log.info('Refreshed %i links', min(chunk_size, to_refresh.count()))
|
||||||
|
|
||||||
@@ -528,7 +528,7 @@ def refresh_links_for_backend(backend_name, chunk_size, expiry_seconds):
|
|||||||
log.debug('Refreshing links for file %s', file_id)
|
log.debug('Refreshing links for file %s', file_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_generate_all_links(file_doc, now)
|
generate_all_links(file_doc, now)
|
||||||
except gcloud.exceptions.Forbidden:
|
except gcloud.exceptions.Forbidden:
|
||||||
log.warning('Skipping file %s, GCS forbids us access to '
|
log.warning('Skipping file %s, GCS forbids us access to '
|
||||||
'project %s bucket.', file_id, project_id)
|
'project %s bucket.', file_id, project_id)
|
||||||
|
@@ -1,7 +1,22 @@
|
|||||||
"""Code for moving files between backends."""
|
"""Code for moving files between backends."""
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
from bson import ObjectId
|
||||||
|
import bson.tz_util
|
||||||
|
from flask import current_app
|
||||||
|
import requests
|
||||||
|
import requests.exceptions
|
||||||
|
|
||||||
|
from . import stream_to_gcs, generate_all_links, ensure_valid_link
|
||||||
|
|
||||||
__all__ = ['PrerequisiteNotMetError', 'change_file_storage_backend']
|
__all__ = ['PrerequisiteNotMetError', 'change_file_storage_backend']
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class PrerequisiteNotMetError(RuntimeError):
|
class PrerequisiteNotMetError(RuntimeError):
|
||||||
"""Raised when a file cannot be moved due to unmet prerequisites."""
|
"""Raised when a file cannot be moved due to unmet prerequisites."""
|
||||||
@@ -13,47 +28,83 @@ def change_file_storage_backend(file_id, dest_backend):
|
|||||||
Files on the original backend are not deleted automatically.
|
Files on the original backend are not deleted automatically.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
dest_backend = unicode(dest_backend)
|
||||||
|
file_id = ObjectId(file_id)
|
||||||
|
|
||||||
# Fetch file document
|
# Fetch file document
|
||||||
files_collection = current_app.data.driver.db['files']
|
files_collection = current_app.data.driver.db['files']
|
||||||
f = files_collection.find_one(ObjectId(file_id))
|
f = files_collection.find_one(file_id)
|
||||||
if f is None:
|
if f is None:
|
||||||
raise ValueError('File with _id: {} not found'.format(file_id))
|
raise ValueError('File with _id: {} not found'.format(file_id))
|
||||||
|
|
||||||
# Check that new backend differs from current one
|
# Check that new backend differs from current one
|
||||||
if dest_backend == f['backend']:
|
if dest_backend == f['backend']:
|
||||||
log.warning('Destination backend ({}) matches the current backend, we '
|
raise PrerequisiteNotMetError('Destination backend ({}) matches the current backend, we '
|
||||||
'are not moving the file'.format(dest_backend))
|
'are not moving the file'.format(dest_backend))
|
||||||
return
|
|
||||||
|
|
||||||
# TODO Check that new backend is allowed (make conf var)
|
# TODO Check that new backend is allowed (make conf var)
|
||||||
|
|
||||||
# Check that the file has a project; without project, we don't know
|
# Check that the file has a project; without project, we don't know
|
||||||
# which bucket to store the file into.
|
# which bucket to store the file into.
|
||||||
if 'project' not in f:
|
try:
|
||||||
raise PrerequisiteNotMetError('File document {} does not have a project'.format(file_id))
|
project_id = f['project']
|
||||||
|
except KeyError:
|
||||||
|
raise PrerequisiteNotMetError('File document does not have a project')
|
||||||
|
|
||||||
# Upload file (TODO: and variations) to the new backend
|
# Ensure that all links are up to date before we even attempt a download.
|
||||||
move_file_to_backend(f, dest_backend)
|
ensure_valid_link(f)
|
||||||
|
|
||||||
# Update document to reflect the changes
|
# Upload file and variations to the new backend
|
||||||
|
variations = f.get('variations', ())
|
||||||
|
|
||||||
|
try:
|
||||||
|
copy_file_to_backend(file_id, project_id, f, f['backend'], dest_backend)
|
||||||
|
except requests.exceptions.HTTPError as ex:
|
||||||
|
# allow the main file to be removed from storage.
|
||||||
|
if ex.response.status_code not in {404, 410}:
|
||||||
|
raise
|
||||||
|
if not variations:
|
||||||
|
raise PrerequisiteNotMetError('Main file ({link}) does not exist on server, '
|
||||||
|
'and no variations exist either'.format(**f))
|
||||||
|
log.warning('Main file %s does not exist; skipping main and visiting variations', f['link'])
|
||||||
|
|
||||||
|
for var in variations:
|
||||||
|
copy_file_to_backend(file_id, project_id, var, f['backend'], dest_backend)
|
||||||
|
|
||||||
|
# Generate new links for the file & all variations. This also saves
|
||||||
|
# the new backend we set here.
|
||||||
|
f['backend'] = dest_backend
|
||||||
|
now = datetime.datetime.now(tz=bson.tz_util.utc)
|
||||||
|
generate_all_links(f, now)
|
||||||
|
|
||||||
|
|
||||||
def move_file_to_backend(file_doc, dest_backend):
|
def copy_file_to_backend(file_id, project_id, file_or_var, src_backend, dest_backend):
|
||||||
|
# Filenames on GCS do not contain paths, by our convention
|
||||||
|
internal_fname = os.path.basename(file_or_var['file_path'])
|
||||||
|
file_or_var['file_path'] = internal_fname
|
||||||
|
|
||||||
# If the file is not local already, fetch it
|
# If the file is not local already, fetch it
|
||||||
if file_doc['backend'] != 'local':
|
if src_backend == 'pillar':
|
||||||
# TODO ensure that file['link'] is up to date
|
local_finfo = fetch_file_from_local(file_or_var)
|
||||||
local_file = fetch_file_from_link(file_doc['link'])
|
else:
|
||||||
|
local_finfo = fetch_file_from_link(file_or_var['link'])
|
||||||
|
|
||||||
# Upload to GCS
|
# Upload to GCS
|
||||||
if dest_backend == 'gcs':
|
if dest_backend != 'gcs':
|
||||||
# Filenames on GCS do not contain paths, by our convention
|
raise ValueError('Only dest_backend="gcs" is supported now.')
|
||||||
internal_fname = os.path.basename(file_doc['file_path'])
|
|
||||||
|
if current_app.config['TESTING']:
|
||||||
|
log.warning('Skipping actual upload to GCS due to TESTING')
|
||||||
|
else:
|
||||||
# TODO check for name collisions
|
# TODO check for name collisions
|
||||||
stream_to_gcs(file_doc['_id'], local_file['file_size'],
|
stream_to_gcs(file_id, local_finfo['file_size'],
|
||||||
internal_fname=internal_fname,
|
internal_fname=internal_fname,
|
||||||
project_id=str(file_doc['project']),
|
project_id=str(project_id),
|
||||||
stream_for_gcs=local_file['local_file'],
|
stream_for_gcs=local_finfo['local_file'],
|
||||||
content_type=local_file['content_type'])
|
content_type=local_finfo['content_type'])
|
||||||
|
|
||||||
|
# No longer needed, so it can be closed & dispersed of.
|
||||||
|
local_finfo['local_file'].close()
|
||||||
|
|
||||||
|
|
||||||
def fetch_file_from_link(link):
|
def fetch_file_from_link(link):
|
||||||
@@ -61,22 +112,39 @@ def fetch_file_from_link(link):
|
|||||||
additional info (for upload to a different storage backend).
|
additional info (for upload to a different storage backend).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
log.info('Downloading %s', link)
|
||||||
r = requests.get(link, stream=True)
|
r = requests.get(link, stream=True)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
|
|
||||||
# If the file is not found we will use one from the variations. Original
|
local_file = tempfile.NamedTemporaryFile(dir=current_app.config['STORAGE_DIR'])
|
||||||
# files might not exists because they were too large to keep.
|
log.info('Downloading to %s', local_file.name)
|
||||||
if r.status_code == 404:
|
|
||||||
pass
|
|
||||||
local_file = tempfile.NamedTemporaryFile(
|
|
||||||
dir=current_app.config['STORAGE_DIR'])
|
|
||||||
with open(local_file, 'wb') as f:
|
|
||||||
for chunk in r.iter_content(chunk_size=1024):
|
for chunk in r.iter_content(chunk_size=1024):
|
||||||
if chunk:
|
if chunk:
|
||||||
f.write(chunk)
|
local_file.write(chunk)
|
||||||
|
local_file.seek(0)
|
||||||
|
|
||||||
file_dict = {
|
file_dict = {
|
||||||
'file_size': os.fstat(local_file.fileno()).st_size,
|
'file_size': os.fstat(local_file.fileno()).st_size,
|
||||||
'content_type': r.headers['content-type'],
|
'content_type': r.headers['content-type'],
|
||||||
'local_file': local_file
|
'local_file': local_file
|
||||||
}
|
}
|
||||||
return file_dict
|
return file_dict
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_file_from_local(file_doc):
|
||||||
|
"""Mimicks fetch_file_from_link(), but just returns the local file.
|
||||||
|
|
||||||
|
:param file_doc: dict with 'link' key pointing to a path in STORAGE_DIR, and
|
||||||
|
'content_type' key.
|
||||||
|
:type file_doc: dict
|
||||||
|
:rtype: dict
|
||||||
|
"""
|
||||||
|
|
||||||
|
local_file = open(os.path.join(current_app.config['STORAGE_DIR'], file_doc['file_path']), 'rb')
|
||||||
|
local_finfo = {
|
||||||
|
'file_size': os.fstat(local_file.fileno()).st_size,
|
||||||
|
'content_type': file_doc['content_type'],
|
||||||
|
'local_file': local_file
|
||||||
|
}
|
||||||
|
return local_finfo
|
||||||
|
@@ -400,3 +400,41 @@ def file_change_backend(file_id, dest_backend='gcs'):
|
|||||||
|
|
||||||
from pillar.api.file_storage.moving import change_file_storage_backend
|
from pillar.api.file_storage.moving import change_file_storage_backend
|
||||||
change_file_storage_backend(file_id, dest_backend)
|
change_file_storage_backend(file_id, dest_backend)
|
||||||
|
|
||||||
|
|
||||||
|
@manager.command
|
||||||
|
def mass_copy_between_backends(src_backend='cdnsun', dest_backend='gcs'):
|
||||||
|
"""Copies all files from one backend to the other, updating them in Mongo.
|
||||||
|
|
||||||
|
Files on the original backend are not deleted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import requests.exceptions
|
||||||
|
|
||||||
|
from pillar.api.file_storage import moving
|
||||||
|
|
||||||
|
logging.getLogger('pillar').setLevel(logging.INFO)
|
||||||
|
log.info('Mass-moving all files from backend %r to %r',
|
||||||
|
src_backend, dest_backend)
|
||||||
|
|
||||||
|
files_coll = current_app.data.driver.db['files']
|
||||||
|
|
||||||
|
fdocs = files_coll.find({'backend': src_backend},
|
||||||
|
projection={'_id': True})
|
||||||
|
copied_ok = 0
|
||||||
|
copy_errs = 0
|
||||||
|
for fdoc in fdocs:
|
||||||
|
try:
|
||||||
|
moving.change_file_storage_backend(fdoc['_id'], dest_backend)
|
||||||
|
except moving.PrerequisiteNotMetError as ex:
|
||||||
|
log.error('Error copying %s: %s', fdoc['_id'], ex)
|
||||||
|
copy_errs += 1
|
||||||
|
except requests.exceptions.HTTPError as ex:
|
||||||
|
log.error('Error copying %s (%s): %s',
|
||||||
|
fdoc['_id'], ex.response.url, ex)
|
||||||
|
copy_errs += 1
|
||||||
|
else:
|
||||||
|
copied_ok += 1
|
||||||
|
|
||||||
|
log.info('%i files copied ok', copied_ok)
|
||||||
|
log.info('%i files we did not copy', copy_errs)
|
||||||
|
152
tests/test_api/test_file_storage_moving.py
Normal file
152
tests/test_api/test_file_storage_moving.py
Normal file
@@ -0,0 +1,152 @@
|
|||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
import responses
|
||||||
|
|
||||||
|
from pillar.tests import AbstractPillarTest
|
||||||
|
|
||||||
|
# Always do a final test run (and commit with) assert_all_requests_are_fired=True.
|
||||||
|
# Setting it to False can help track down other issues, though, that can be masked
|
||||||
|
# by the error of RequestsMock.
|
||||||
|
mock = responses.RequestsMock(
|
||||||
|
assert_all_requests_are_fired=True
|
||||||
|
# assert_all_requests_are_fired=False
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ChangeBackendTest(AbstractPillarTest):
|
||||||
|
def setUp(self, **kwargs):
|
||||||
|
AbstractPillarTest.setUp(self, **kwargs)
|
||||||
|
|
||||||
|
self.project_id, self.project = self.ensure_project_exists()
|
||||||
|
responses.assert_all_requests_are_fired = True
|
||||||
|
|
||||||
|
@mock.activate
|
||||||
|
def test_file_and_variations(self):
|
||||||
|
from pillar.api.file_storage import moving, generate_link
|
||||||
|
|
||||||
|
image_file_id, fdoc = self._create_image_file_doc()
|
||||||
|
|
||||||
|
# Expect GETs on regenerated links.
|
||||||
|
mock.add(mock.GET,
|
||||||
|
generate_link('unittest', fdoc['file_path']),
|
||||||
|
body='file-content',
|
||||||
|
content_type='image/jpeg')
|
||||||
|
|
||||||
|
for variation in fdoc['variations']:
|
||||||
|
mock.add(mock.GET,
|
||||||
|
generate_link('unittest', variation['file_path']),
|
||||||
|
body='file-content',
|
||||||
|
content_type='image/jpeg')
|
||||||
|
|
||||||
|
with self.app.test_request_context():
|
||||||
|
moving.change_file_storage_backend(image_file_id, 'gcs')
|
||||||
|
|
||||||
|
# Check that the file document has been updated correctly
|
||||||
|
files_coll = self.app.data.driver.db['files']
|
||||||
|
fdoc = files_coll.find_one(image_file_id)
|
||||||
|
|
||||||
|
self.assertEqual(u'gcs', fdoc['backend'])
|
||||||
|
self.assertIn(u'/path/to/testing/gcs/', fdoc['link'])
|
||||||
|
|
||||||
|
for variation in fdoc['variations']:
|
||||||
|
self.assertIn(u'/path/to/testing/gcs/', variation['link'])
|
||||||
|
|
||||||
|
@mock.activate
|
||||||
|
def test_only_variations(self):
|
||||||
|
from pillar.api.file_storage import moving, generate_link
|
||||||
|
|
||||||
|
image_file_id, fdoc = self._create_image_file_doc()
|
||||||
|
|
||||||
|
# Expect GETs on regenerated links.
|
||||||
|
mock.add(mock.GET,
|
||||||
|
generate_link('unittest', fdoc['file_path']),
|
||||||
|
status=404)
|
||||||
|
|
||||||
|
for variation in fdoc['variations']:
|
||||||
|
mock.add(mock.GET,
|
||||||
|
generate_link('unittest', variation['file_path']),
|
||||||
|
body='file-content',
|
||||||
|
content_type='image/jpeg')
|
||||||
|
|
||||||
|
with self.app.test_request_context():
|
||||||
|
moving.change_file_storage_backend(image_file_id, 'gcs')
|
||||||
|
|
||||||
|
# Check that the file document has been updated correctly
|
||||||
|
files_coll = self.app.data.driver.db['files']
|
||||||
|
fdoc = files_coll.find_one(image_file_id)
|
||||||
|
|
||||||
|
self.assertEqual(u'gcs', fdoc['backend'])
|
||||||
|
self.assertIn(u'/path/to/testing/gcs/', fdoc['link'])
|
||||||
|
|
||||||
|
for variation in fdoc['variations']:
|
||||||
|
self.assertIn(u'/path/to/testing/gcs/', variation['link'])
|
||||||
|
|
||||||
|
@mock.activate
|
||||||
|
def test_no_variations(self):
|
||||||
|
from pillar.api.file_storage import moving, generate_link
|
||||||
|
|
||||||
|
image_file_id, fdoc = self._create_image_file_doc(variations=False)
|
||||||
|
|
||||||
|
# Expect GETs on regenerated links.
|
||||||
|
mock.add(mock.GET,
|
||||||
|
generate_link('unittest', fdoc['file_path']),
|
||||||
|
body='file-content',
|
||||||
|
content_type='image/jpeg')
|
||||||
|
|
||||||
|
with self.app.test_request_context():
|
||||||
|
moving.change_file_storage_backend(image_file_id, 'gcs')
|
||||||
|
|
||||||
|
# Check that the file document has been updated correctly
|
||||||
|
files_coll = self.app.data.driver.db['files']
|
||||||
|
fdoc = files_coll.find_one(image_file_id)
|
||||||
|
|
||||||
|
self.assertEqual(u'gcs', fdoc['backend'])
|
||||||
|
self.assertIn(u'/path/to/testing/gcs/', fdoc['link'])
|
||||||
|
|
||||||
|
def _create_image_file_doc(self, variations=True):
|
||||||
|
fdoc = {'status': 'complete', 'name': 'some-hash.jpg', 'backend': 'unittest',
|
||||||
|
'format': 'jpeg',
|
||||||
|
'filename': 'image-micak.jpg', 'project': self.project_id, 'length': 2708160,
|
||||||
|
'content_type': 'image/jpeg', 'file_path': '3c61e953ee644786b98027e043fd3af3.jpg',
|
||||||
|
'length_aggregate_in_bytes': 3196056,
|
||||||
|
'link': 'https://server.cdnsun/projid/_%2Fsome-hash.jpg',
|
||||||
|
'link_expires': datetime.datetime(2016, 8, 23, 15, 23, 48), 'md5': '',}
|
||||||
|
|
||||||
|
if variations:
|
||||||
|
fdoc['variations'] = [
|
||||||
|
{'length': 3312, 'link': 'https://server.cdnsun/projid/_%2Fsome-hash-b.jpg',
|
||||||
|
'content_type': 'image/jpeg',
|
||||||
|
'file_path': '3c61e953ee644786b98027e043fd3af3-b.jpg', 'size': 'b', 'md5': ''},
|
||||||
|
{'height': 2048, 'width': 2048, 'length': 381736,
|
||||||
|
'link': 'https://server.cdnsun/projid/_%2Fsome-hash-h.jpg',
|
||||||
|
'content_type': 'image/jpeg', 'md5': '',
|
||||||
|
'file_path': '3c61e953ee644786b98027e043fd3af3-h.jpg', 'size': 'h'},
|
||||||
|
{'height': 320, 'width': 320, 'length': 8818,
|
||||||
|
'link': 'https://server.cdnsun/projid/_%2Fsome-hash-m.jpg',
|
||||||
|
'content_type': 'image/jpeg', 'md5': '',
|
||||||
|
'file_path': '3c61e953ee644786b98027e043fd3af3-m.jpg', 'size': 'm'},
|
||||||
|
{'height': 1024, 'width': 1024, 'length': 89012,
|
||||||
|
'link': 'https://server.cdnsun/projid/_%2Fsome-hash-l.jpg',
|
||||||
|
'content_type': 'image/jpeg', 'md5': '',
|
||||||
|
'file_path': '3c61e953ee644786b98027e043fd3af3-l.jpg', 'size': 'l'},
|
||||||
|
{'height': 90, 'width': 90, 'length': 1774,
|
||||||
|
'link': 'https://server.cdnsun/projid/_%2Fsome-hash-s.jpg',
|
||||||
|
'content_type': 'image/jpeg', 'md5': '',
|
||||||
|
'file_path': '3c61e953ee644786b98027e043fd3af3-s.jpg', 'size': 's'},
|
||||||
|
{'height': 160, 'width': 160, 'length': 3244,
|
||||||
|
'link': 'https://server.cdnsun/projid/_%2Fsome-hash-t.jpg',
|
||||||
|
'content_type': 'image/jpeg', 'is_public': True, 'md5': '',
|
||||||
|
'file_path': '3c61e953ee644786b98027e043fd3af3-t.jpg', 'size': 't'}]
|
||||||
|
|
||||||
|
with self.app.test_request_context():
|
||||||
|
files_coll = self.app.data.driver.db['files']
|
||||||
|
|
||||||
|
result = files_coll.insert_one(fdoc)
|
||||||
|
file_id = result.inserted_id
|
||||||
|
|
||||||
|
# Re-fetch from the database, so that we're sure we return the same as is stored.
|
||||||
|
# This is necessary as datetimes are rounded by MongoDB.
|
||||||
|
from_db = files_coll.find_one(file_id)
|
||||||
|
return file_id, from_db
|
Reference in New Issue
Block a user