Added Celery task for refreshing file links
This includes a CLI command to kick off a single run for the Celery task. This does *NOT* include a check to see whether the task is already running!
This commit is contained in:
@@ -449,6 +449,7 @@ class PillarServer(Eve):
|
|||||||
celery_task_modules = [
|
celery_task_modules = [
|
||||||
'pillar.celery.tasks',
|
'pillar.celery.tasks',
|
||||||
'pillar.celery.algolia_tasks',
|
'pillar.celery.algolia_tasks',
|
||||||
|
'pillar.celery.file_link_tasks',
|
||||||
]
|
]
|
||||||
|
|
||||||
# Allow Pillar extensions from defining their own Celery tasks.
|
# Allow Pillar extensions from defining their own Celery tasks.
|
||||||
|
19
pillar/celery/file_link_tasks.py
Normal file
19
pillar/celery/file_link_tasks.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
from pillar import current_app
|
||||||
|
|
||||||
|
|
||||||
|
@current_app.celery.task(ignore_result=True)
|
||||||
|
def regenerate_all_expired_links(backend_name: str, chunk_size: int):
|
||||||
|
"""Regenerate all expired links for all non-deleted file documents.
|
||||||
|
|
||||||
|
Probably only works on Google Cloud Storage ('gcs') backends at
|
||||||
|
the moment, since those are the only links that actually expire.
|
||||||
|
|
||||||
|
:param backend_name: name of the backend to refresh for.
|
||||||
|
:param chunk_size: the maximum number of files to refresh in this run.
|
||||||
|
"""
|
||||||
|
from pillar.api import file_storage
|
||||||
|
|
||||||
|
# Refresh all files that already have expired or will expire in the next
|
||||||
|
# two hours. Since this task is intended to run every hour, this should
|
||||||
|
# result in all regular file requests having a valid link.
|
||||||
|
file_storage.refresh_links_for_backend(backend_name, chunk_size, expiry_seconds=7200)
|
@@ -283,6 +283,20 @@ def refresh_backend_links(backend_name, chunk_size=50, quiet=False, window=12):
|
|||||||
file_storage.refresh_links_for_backend(backend_name, chunk_size, window * 3600)
|
file_storage.refresh_links_for_backend(backend_name, chunk_size, window * 3600)
|
||||||
|
|
||||||
|
|
||||||
|
@manager_maintenance.command
|
||||||
|
@manager_maintenance.option('-c', '--chunk', dest='chunk_size', default=50,
|
||||||
|
help='Number of links to update, use 0 to update all.')
|
||||||
|
def refresh_backend_links_celery(backend_name, chunk_size=50):
|
||||||
|
"""Starts a Celery task that refreshes all file links that are using a certain storage backend.
|
||||||
|
"""
|
||||||
|
from pillar.celery import file_link_tasks
|
||||||
|
|
||||||
|
chunk_size = int(chunk_size) # CLI parameters are passed as strings
|
||||||
|
file_link_tasks.regenerate_all_expired_links.delay(backend_name, chunk_size)
|
||||||
|
|
||||||
|
log.info('File link regeneration task has been queued for execution.')
|
||||||
|
|
||||||
|
|
||||||
@manager_maintenance.command
|
@manager_maintenance.command
|
||||||
def expire_all_project_links(project_uuid):
|
def expire_all_project_links(project_uuid):
|
||||||
"""Expires all file links for a certain project without refreshing.
|
"""Expires all file links for a certain project without refreshing.
|
||||||
|
@@ -164,7 +164,10 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
for modname in remove:
|
for modname in remove:
|
||||||
del sys.modules[modname]
|
del sys.modules[modname]
|
||||||
|
|
||||||
def ensure_file_exists(self, file_overrides=None):
|
def ensure_file_exists(self, file_overrides=None, *, example_file=None) -> (ObjectId, dict):
|
||||||
|
if example_file is None:
|
||||||
|
example_file = ctd.EXAMPLE_FILE
|
||||||
|
|
||||||
if file_overrides and file_overrides.get('project'):
|
if file_overrides and file_overrides.get('project'):
|
||||||
self.ensure_project_exists({'_id': file_overrides['project']})
|
self.ensure_project_exists({'_id': file_overrides['project']})
|
||||||
else:
|
else:
|
||||||
@@ -174,7 +177,7 @@ class AbstractPillarTest(TestMinimal):
|
|||||||
files_collection = self.app.data.driver.db['files']
|
files_collection = self.app.data.driver.db['files']
|
||||||
assert isinstance(files_collection, pymongo.collection.Collection)
|
assert isinstance(files_collection, pymongo.collection.Collection)
|
||||||
|
|
||||||
file = copy.deepcopy(ctd.EXAMPLE_FILE)
|
file = copy.deepcopy(example_file)
|
||||||
if file_overrides is not None:
|
if file_overrides is not None:
|
||||||
file.update(file_overrides)
|
file.update(file_overrides)
|
||||||
if '_id' in file and file['_id'] is None:
|
if '_id' in file and file['_id'] is None:
|
||||||
|
0
tests/test_celery/__init__.py
Normal file
0
tests/test_celery/__init__.py
Normal file
102
tests/test_celery/test_file_link_tasks.py
Normal file
102
tests/test_celery/test_file_link_tasks.py
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
import datetime
|
||||||
|
|
||||||
|
from bson import ObjectId, tz_util
|
||||||
|
from dateutil.parser import parse
|
||||||
|
|
||||||
|
from pillar.tests import AbstractPillarTest
|
||||||
|
from pillar.tests import common_test_data as ctd
|
||||||
|
|
||||||
|
EXAMPLE_FILE = {
|
||||||
|
'_created': datetime.datetime(2015, 12, 17, 16, 28, 49, tzinfo=tz_util.utc),
|
||||||
|
'_updated': datetime.datetime(2016, 3, 25, 10, 28, 24, tzinfo=tz_util.utc),
|
||||||
|
'_etag': '044ce3aede2e123e261c0d8bd77212f264d4f7b0',
|
||||||
|
'height': 2048,
|
||||||
|
'name': 'c2a5c897769ce1ef0eb10f8fa1c472bcb8e2d5a4.png', 'format': 'png',
|
||||||
|
'variations': [
|
||||||
|
{'format': 'jpg', 'height': 160, 'width': 160, 'length': 8558,
|
||||||
|
'link': 'http://localhost:8002/file-variant-h', 'content_type': 'image/jpeg',
|
||||||
|
'md5': '--', 'file_path': 'c2a5c897769ce1ef0eb10f8fa1c472bcb8e2d5a4-b.jpg',
|
||||||
|
'size': 'b'},
|
||||||
|
{'format': 'jpg', 'height': 2048, 'width': 2048, 'length': 819569,
|
||||||
|
'link': 'http://localhost:8002/file-variant-h', 'content_type': 'image/jpeg',
|
||||||
|
'md5': '--', 'file_path': 'c2a5c897769ce1ef0eb10f8fa1c472bcb8e2d5a4-h.jpg',
|
||||||
|
'size': 'h'},
|
||||||
|
{'format': 'jpg', 'height': 64, 'width': 64, 'length': 8195,
|
||||||
|
'link': 'http://localhost:8002/file-variant-t', 'content_type': 'image/jpeg',
|
||||||
|
'md5': '--', 'file_path': 'c2a5c897769ce1ef0eb10f8fa1c472bcb8e2d5a4-t.jpg',
|
||||||
|
'size': 't'},
|
||||||
|
],
|
||||||
|
'filename': 'brick_dutch_soft_bump.png',
|
||||||
|
'project': ctd.EXAMPLE_PROJECT_ID,
|
||||||
|
'width': 2048,
|
||||||
|
'length': 6227670,
|
||||||
|
'user': ObjectId('56264fc4fa3a250344bd10c5'),
|
||||||
|
'content_type': 'image/png',
|
||||||
|
'file_path': 'c2a5c897769ce1ef0eb10f8fa1c472bcb8e2d5a4.png',
|
||||||
|
'backend': 'pillar',
|
||||||
|
'link': 'http://localhost:8002/file',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class FileLinkCeleryTasksTest(AbstractPillarTest):
|
||||||
|
def ensure_file_exists(self, file_overrides=None, **kwargs) -> (ObjectId, dict):
|
||||||
|
"""Same as in superclass, but different EXAMPLE_FILE."""
|
||||||
|
return super().ensure_file_exists(file_overrides, example_file=EXAMPLE_FILE)
|
||||||
|
|
||||||
|
def test_refresh_all_files(self):
|
||||||
|
self.enter_app_context()
|
||||||
|
|
||||||
|
now = datetime.datetime.now(tz=tz_util.utc)
|
||||||
|
|
||||||
|
# No expiry known → refresh
|
||||||
|
fid1, _ = self.ensure_file_exists({'backend': 'gcs'})
|
||||||
|
# Expired → refresh
|
||||||
|
fid2, _ = self.ensure_file_exists({'backend': 'gcs', 'link_expires': parse('2016-01-01')})
|
||||||
|
# Going to expire within 2 hours → refresh
|
||||||
|
fid3, _ = self.ensure_file_exists({
|
||||||
|
'backend': 'gcs',
|
||||||
|
'link_expires': now + datetime.timedelta(hours=1, minutes=57)})
|
||||||
|
# Not same backend → ignore
|
||||||
|
fid4, file_4 = self.ensure_file_exists({
|
||||||
|
'backend': 'pillar',
|
||||||
|
'link_expires': now + datetime.timedelta(hours=1, minutes=58)})
|
||||||
|
# Same as fid3 → refresh
|
||||||
|
fid5, _ = self.ensure_file_exists({
|
||||||
|
'backend': 'gcs',
|
||||||
|
'link_expires': now + datetime.timedelta(hours=1, minutes=58)})
|
||||||
|
# Valid for long enough → ignore
|
||||||
|
fid6, file_6 = self.ensure_file_exists({
|
||||||
|
'backend': 'gcs',
|
||||||
|
'link_expires': now + datetime.timedelta(hours=5)})
|
||||||
|
# Expired but deleted → ignore
|
||||||
|
fid7, file_7 = self.ensure_file_exists({
|
||||||
|
'_deleted': True,
|
||||||
|
'backend': 'gcs',
|
||||||
|
'link_expires': now + datetime.timedelta(hours=-5)})
|
||||||
|
# Expired but would be the 5th in a 4-file chunk → ignore
|
||||||
|
fid8, file_8 = self.ensure_file_exists({
|
||||||
|
'backend': 'gcs',
|
||||||
|
'link_expires': now + datetime.timedelta(hours=1, minutes=59)})
|
||||||
|
|
||||||
|
from pillar.celery import file_link_tasks as flt
|
||||||
|
|
||||||
|
flt.regenerate_all_expired_links('gcs', 4)
|
||||||
|
|
||||||
|
files_coll = self.app.db('files')
|
||||||
|
|
||||||
|
# Test files that are supposed to be refreshed.
|
||||||
|
expected_refresh = {'fid1': fid1, 'fid2': fid2, 'fid3': fid3, 'fid5': fid5}
|
||||||
|
for name, fid in expected_refresh.items():
|
||||||
|
from_db = files_coll.find_one(fid)
|
||||||
|
|
||||||
|
self.assertIn('link_expires', from_db, f'checking {name}')
|
||||||
|
self.assertGreater(from_db['link_expires'], now, f'checking {name}')
|
||||||
|
|
||||||
|
# Test files that shouldn't have been touched.
|
||||||
|
expected_untouched = {'fid4': (fid4, file_4),
|
||||||
|
'fid6': (fid6, file_6),
|
||||||
|
'fid7': (fid7, file_7),
|
||||||
|
'fid8': (fid8, file_8)}
|
||||||
|
for name, (fid, before) in expected_untouched.items():
|
||||||
|
from_db = files_coll.find_one(fid)
|
||||||
|
self.assertEqual(from_db['link_expires'], before['link_expires'], f'checking {name}')
|
Reference in New Issue
Block a user