Implement rename-after-Zencoder-is-done using our Storage API.

This means it's no longer GCS-specific, and can be tested using the local
storage implementation.

Required implementation of a rename operation. To mirror Google's API, I've
implemented the renaming of a Blob as a function on the Bucket class.
To me this makes sense, as it requires creating a new Blob instance, which
shouldn't be done by another Blob.
This commit is contained in:
Sybren A. Stüvel 2018-01-26 14:36:46 +01:00
parent 2e2314c16b
commit 230b2c669c
6 changed files with 112 additions and 26 deletions

View File

@ -10,8 +10,7 @@ from flask import current_app
from flask import request from flask import request
from pillar.api import utils from pillar.api import utils
from pillar.api.file_storage_backends.gcs import GoogleCloudStorageBucket from pillar.api.file_storage_backends import Bucket
from pillar.api.utils import skip_when_testing
encoding = Blueprint('encoding', __name__) encoding = Blueprint('encoding', __name__)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -45,14 +44,6 @@ def size_descriptor(width, height):
return '%ip' % height return '%ip' % height
@skip_when_testing
def rename_on_gcs(bucket_name, from_path, to_path):
gcs = GoogleCloudStorageBucket(str(bucket_name))
# FIXME: don't use internal property, but use our bucket/blob API.
blob = gcs._gcs_bucket.blob(from_path)
gcs._gcs_bucket.rename_blob(blob, to_path)
@encoding.route('/zencoder/notifications', methods=['POST']) @encoding.route('/zencoder/notifications', methods=['POST'])
def zencoder_notifications(): def zencoder_notifications():
""" """
@ -119,7 +110,11 @@ def zencoder_notifications():
job_state) job_state)
# For every variation encoded, try to update the file object # For every variation encoded, try to update the file object
root, _ = os.path.splitext(file_doc['file_path']) storage_name, _ = os.path.splitext(file_doc['file_path'])
nice_name, _ = os.path.splitext(file_doc['filename'])
bucket_class = Bucket.for_backend(file_doc['backend'])
bucket = bucket_class(str(file_doc['project']))
for output in data['outputs']: for output in data['outputs']:
video_format = output['format'] video_format = output['format']
@ -140,16 +135,16 @@ def zencoder_notifications():
# Rename the file to include the now-known size descriptor. # Rename the file to include the now-known size descriptor.
size = size_descriptor(output['width'], output['height']) size = size_descriptor(output['width'], output['height'])
new_fname = '{}-{}.{}'.format(root, size, video_format) new_fname = f'{storage_name}-{size}.{video_format}'
# Rename on Google Cloud Storage # Rename the file on the storage.
blob = bucket.blob(variation['file_path'])
try: try:
rename_on_gcs(file_doc['project'], new_blob = bucket.rename_blob(blob, new_fname)
'_/' + variation['file_path'], new_blob.update_filename(f'{nice_name}-{size}.{video_format}')
'_/' + new_fname)
except Exception: except Exception:
log.warning('Unable to rename GCS blob %r to %r. Keeping old name.', log.warning('Unable to rename blob %r to %r. Keeping old name.',
variation['file_path'], new_fname, exc_info=True) blob, new_fname, exc_info=True)
else: else:
variation['file_path'] = new_fname variation['file_path'] = new_fname

View File

@ -67,6 +67,10 @@ class Bucket(metaclass=abc.ABCMeta):
same storage backend. same storage backend.
""" """
@abc.abstractmethod
def rename_blob(self, blob: 'Blob', new_name: str) -> 'Blob':
"""Rename the blob, returning the new Blob."""
@classmethod @classmethod
def copy_to_bucket(cls, blob_name, src_project_id: ObjectId, dest_project_id: ObjectId): def copy_to_bucket(cls, blob_name, src_project_id: ObjectId, dest_project_id: ObjectId):
"""Copies a file from one bucket to the other.""" """Copies a file from one bucket to the other."""

View File

@ -10,7 +10,7 @@ import gcloud.exceptions as gcloud_exc
from flask import current_app, g from flask import current_app, g
from werkzeug.local import LocalProxy from werkzeug.local import LocalProxy
from .abstract import Bucket, Blob, Path, FileType from .abstract import Bucket, Blob, FileType
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -132,15 +132,28 @@ class GoogleCloudStorageBucket(Bucket):
return self._gcs_bucket.copy_blob(blob.gblob, to_bucket._gcs_bucket) return self._gcs_bucket.copy_blob(blob.gblob, to_bucket._gcs_bucket)
def rename_blob(self, blob: 'GoogleCloudStorageBlob', new_name: str) \
-> 'GoogleCloudStorageBlob':
"""Rename the blob, returning the new Blob."""
assert isinstance(blob, GoogleCloudStorageBlob)
new_name = os.path.join(self.subdir, new_name)
self._log.info('Renaming %s to %r', blob, new_name)
new_gblob = self._gcs_bucket.rename_blob(blob.gblob, new_name)
return GoogleCloudStorageBlob(new_gblob.name, self, gblob=new_gblob)
class GoogleCloudStorageBlob(Blob): class GoogleCloudStorageBlob(Blob):
"""GCS blob interface.""" """GCS blob interface."""
def __init__(self, name: str, bucket: GoogleCloudStorageBucket) -> None: def __init__(self, name: str, bucket: GoogleCloudStorageBucket,
*, gblob: gcloud.storage.blob.Blob=None) -> None:
super().__init__(name, bucket) super().__init__(name, bucket)
self._log = logging.getLogger(f'{__name__}.GoogleCloudStorageBlob') self._log = logging.getLogger(f'{__name__}.GoogleCloudStorageBlob')
self.gblob = bucket._gcs_get(name, chunk_size=256 * 1024 * 2) self.gblob = gblob or bucket._gcs_get(name, chunk_size=256 * 1024 * 2)
def create_from_file(self, file_obj: FileType, *, def create_from_file(self, file_obj: FileType, *,
content_type: str, content_type: str,

View File

@ -52,6 +52,21 @@ class LocalBucket(Bucket):
with open(blob.abspath(), 'rb') as src_file: with open(blob.abspath(), 'rb') as src_file:
dest_blob.create_from_file(src_file, content_type='application/x-octet-stream') dest_blob.create_from_file(src_file, content_type='application/x-octet-stream')
def rename_blob(self, blob: 'LocalBlob', new_name: str) -> 'LocalBlob':
"""Rename the blob, returning the new Blob."""
assert isinstance(blob, LocalBlob)
self._log.info('Renaming %s to %r', blob, new_name)
new_blob = LocalBlob(new_name, self)
old_path = blob.abspath()
new_path = new_blob.abspath()
new_path.parent.mkdir(parents=True, exist_ok=True)
old_path.rename(new_path)
return new_blob
class LocalBlob(Blob): class LocalBlob(Blob):
"""Blob representing a local file on the filesystem.""" """Blob representing a local file on the filesystem."""
@ -103,3 +118,9 @@ class LocalBlob(Blob):
def exists(self) -> bool: def exists(self) -> bool:
return self.abspath().exists() return self.abspath().exists()
def touch(self):
"""Touch the file, creating parent directories if needed."""
path = self.abspath()
path.parent.mkdir(parents=True, exist_ok=True)
path.touch(exist_ok=True)

View File

@ -39,6 +39,7 @@ class ZencoderNotificationTest(AbstractPillarTest):
def test_good_secret_existing_file(self): def test_good_secret_existing_file(self):
file_id, _ = self.ensure_file_exists(file_overrides={ file_id, _ = self.ensure_file_exists(file_overrides={
'backend': 'local',
'processing': {'backend': 'zencoder', 'processing': {'backend': 'zencoder',
'job_id': 'koro-007', 'job_id': 'koro-007',
'status': 'processing'} 'status': 'processing'}
@ -101,7 +102,7 @@ class ZencoderNotificationTest(AbstractPillarTest):
"filename": "4. pose-library-previews.mkv", "filename": "4. pose-library-previews.mkv",
"file_path": "02a877a1d9da45509cdba97e283ef0bc.mkv", "file_path": "02a877a1d9da45509cdba97e283ef0bc.mkv",
"user": ctd.EXAMPLE_PROJECT_OWNER_ID, "user": ctd.EXAMPLE_PROJECT_OWNER_ID,
"backend": "gcs", "backend": "local",
"md5": "", "md5": "",
"content_type": "video/x-matroska", "content_type": "video/x-matroska",
"length": 39283494, "length": 39283494,
@ -170,7 +171,7 @@ class ZencoderNotificationTest(AbstractPillarTest):
"filename": "4. pose-library-previews.mkv", "filename": "4. pose-library-previews.mkv",
"file_path": "02a877a1d9da45509cdba97e283ef0bc.mkv", "file_path": "02a877a1d9da45509cdba97e283ef0bc.mkv",
"user": ctd.EXAMPLE_PROJECT_OWNER_ID, "user": ctd.EXAMPLE_PROJECT_OWNER_ID,
"backend": "gcs", "backend": "local",
"md5": "", "md5": "",
"content_type": "video/x-matroska", "content_type": "video/x-matroska",
"length": 39283494, "length": 39283494,
@ -203,6 +204,13 @@ class ZencoderNotificationTest(AbstractPillarTest):
"link": "https://storage.googleapis.com/59d69c94f4/_%2F02.mkv" "link": "https://storage.googleapis.com/59d69c94f4/_%2F02.mkv"
} }
# Make sure the to-be-renamed file exists on the local storage bucket.
from pillar.api.file_storage_backends import Bucket, local
bucket_class = Bucket.for_backend('local')
bucket = bucket_class(str(file_doc['project']))
blob: local.LocalBlob = bucket.blob('02a877a1d9da45509cdba97e283ef0bc-1080p.mp4')
blob.touch()
files_coll = self.app.db('files') files_coll = self.app.db('files')
files_coll.insert_one(file_doc) files_coll.insert_one(file_doc)
file_id = file_doc['_id'] file_id = file_doc['_id']

View File

@ -1,3 +1,4 @@
import abc
import typing import typing
from unittest import mock from unittest import mock
@ -5,6 +6,10 @@ from pillar.tests import AbstractPillarTest
class AbstractStorageBackendTest(AbstractPillarTest): class AbstractStorageBackendTest(AbstractPillarTest):
@abc.abstractmethod
def storage_backend(self):
pass
def create_test_file(self) -> (typing.IO, bytes): def create_test_file(self) -> (typing.IO, bytes):
import io import io
import secrets import secrets
@ -21,6 +26,20 @@ class AbstractStorageBackendTest(AbstractPillarTest):
self.assertEqual(len(expected_file_contents), int(resp.headers['Content-Length'])) self.assertEqual(len(expected_file_contents), int(resp.headers['Content-Length']))
self.assertEqual(expected_file_contents, resp.data) self.assertEqual(expected_file_contents, resp.data)
def do_test_rename(self):
from pillar.api.file_storage_backends import Bucket
test_file, file_contents = self.create_test_file()
bucket_class: typing.Type[Bucket] = self.storage_backend()
bucket = bucket_class(24 * 'a')
blob = bucket.blob('somefile.bin')
blob.create_from_file(test_file, content_type='application/octet-stream')
new_blob = bucket.rename_blob(blob, 'ænother-näme.bin')
return blob, new_blob
class LocalStorageBackendTest(AbstractStorageBackendTest): class LocalStorageBackendTest(AbstractStorageBackendTest):
def storage_backend(self): def storage_backend(self):
@ -107,6 +126,21 @@ class LocalStorageBackendTest(AbstractStorageBackendTest):
self.assertTrue(blob.exists()) self.assertTrue(blob.exists())
self.assertFalse(bucket.blob('ütff-8').exists()) self.assertFalse(bucket.blob('ütff-8').exists())
def test_rename(self):
from pillar.api.file_storage_backends.local import LocalBlob
self.enter_app_context()
old_blob, new_blob = self.do_test_rename()
assert isinstance(old_blob, LocalBlob)
assert isinstance(new_blob, LocalBlob)
self.assertTrue(new_blob.abspath().exists())
self.assertFalse(old_blob.exists())
self.assertEqual(old_blob.abspath().parent.parent,
new_blob.abspath().parent.parent)
class MockedGoogleCloudStorageTest(AbstractStorageBackendTest): class MockedGoogleCloudStorageTest(AbstractStorageBackendTest):
def storage_backend(self): def storage_backend(self):
@ -114,21 +148,24 @@ class MockedGoogleCloudStorageTest(AbstractStorageBackendTest):
return Bucket.for_backend('gcs') return Bucket.for_backend('gcs')
def test_file_upload(self): def mock_gcs(self):
import pillar.api.file_storage_backends.gcs as gcs import pillar.api.file_storage_backends.gcs as gcs
from gcloud.storage import Client, Bucket, Blob from gcloud.storage import Client, Bucket, Blob
# Set up mock GCS client
mock_gcs_client = gcs.gcs = mock.MagicMock(name='mock_gcs_client', autospec=Client) mock_gcs_client = gcs.gcs = mock.MagicMock(name='mock_gcs_client', autospec=Client)
mock_bucket = mock.MagicMock(name='mock_bucket', autospec=Bucket) mock_bucket = mock.MagicMock(name='mock_bucket', autospec=Bucket)
mock_blob = mock.MagicMock(name='mock_blob', autospec=Blob) mock_blob = mock.MagicMock(name='mock_blob', autospec=Blob)
mock_gcs_client.get_bucket.return_value = mock_bucket mock_gcs_client.get_bucket.return_value = mock_bucket
mock_bucket.blob.return_value = mock_blob mock_bucket.blob.return_value = mock_blob
mock_blob.public_url = '/path/to/somefile.bin' mock_blob.public_url = '/path/to/somefile.bin'
mock_blob.size = 318 mock_blob.size = 318
mock_blob.exists.return_value = True mock_blob.exists.return_value = True
return mock_bucket, mock_blob
def test_file_upload(self):
_, mock_blob = self.mock_gcs()
test_file, file_contents = self.create_test_file() test_file, file_contents = self.create_test_file()
with self.app.test_request_context(): with self.app.test_request_context():
@ -151,3 +188,11 @@ class MockedGoogleCloudStorageTest(AbstractStorageBackendTest):
size=512, size=512,
content_type='application/octet-stream') content_type='application/octet-stream')
self.assertEqual(2, mock_blob.reload.call_count) self.assertEqual(2, mock_blob.reload.call_count)
def test_rename(self):
self.enter_app_context()
mock_bucket, mock_blob = self.mock_gcs()
self.do_test_rename()
# The storage API should have added the _ path in front.
mock_bucket.rename_blob.assert_called_with(mock_blob, '_/ænother-näme.bin')