Thumbnails for images and videos #87
@ -100,3 +100,10 @@ ABUSE_TYPE = Choices(
|
||||
('ABUSE_USER', ABUSE_TYPE_USER, "User"),
|
||||
('ABUSE_RATING', ABUSE_TYPE_RATING, "Rating"),
|
||||
)
|
||||
|
||||
# **N.B.**: thumbnail sizes are not intended to be changed on the fly:
|
||||
# thumbnails of existing images must exist in MEDIA_ROOT before
|
||||
# the code expecting thumbnails of new dimensions can be deployed!
|
||||
THUMBNAIL_SIZES = {'1080p': [1920, 1080], '360p': [640, 360]}
|
||||
THUMBNAIL_FORMAT = 'PNG'
|
||||
THUMBNAIL_QUALITY = 83
|
||||
|
@ -1,13 +1,13 @@
|
||||
{% load common filters %}
|
||||
{% with latest=extension.latest_version %}
|
||||
{% with latest=extension.latest_version thumbnail_360p_url=extension.previews.listed.first.thumbnail_360p_url %}
|
||||
|
||||
<div class="ext-card {% if blur %}is-background-blur{% endif %}">
|
||||
{% if blur %}
|
||||
<div class="ext-card-thumbnail-blur" style="background-image: url({{ extension.previews.listed.first.source.url }});"></div>
|
||||
<div class="ext-card-thumbnail-blur" style="background-image: url({{ thumbnail_360p_url }});"></div>
|
||||
{% endif %}
|
||||
|
||||
<a class="ext-card-thumbnail" href="{{ extension.get_absolute_url }}">
|
||||
<div class="ext-card-thumbnail-img" style="background-image: url({{ extension.previews.listed.first.source.url }});" title="{{ extension.name }}"></div>
|
||||
<div class="ext-card-thumbnail-img" style="background-image: url({{ thumbnail_360p_url }});" title="{{ extension.name }}"></div>
|
||||
</a>
|
||||
|
||||
<div class="ext-card-body">
|
||||
|
@ -3,19 +3,17 @@
|
||||
{% if previews %}
|
||||
<div class="galleria-items{% if previews.count > 5 %} is-many{% endif %}{% if previews.count == 1 %} is-single{% endif %}" id="galleria-items">
|
||||
{% for preview in previews %}
|
||||
{% with thumbnail_1080p_url=preview.thumbnail_1080p_url %}
|
||||
<a
|
||||
class="galleria-item js-galleria-item-preview galleria-item-type-{{ preview.content_type|slugify|slice:5 }}{% if forloop.first %} is-active{% endif %}"
|
||||
href="{{ preview.source.url }}"
|
||||
href="{{ thumbnail_1080p_url }}"
|
||||
{% if 'video' in preview.content_type %}data-galleria-video-url="{{ preview.source.url }}"{% endif %}
|
||||
data-galleria-content-type="{{ preview.content_type }}"
|
||||
data-galleria-index="{{ forloop.counter }}">
|
||||
|
||||
{% if 'video' in preview.content_type and preview.thumbnail %}
|
||||
<img src="{{ preview.thumbnail.url }}" alt="{{ preview.preview.caption }}">
|
||||
{% else %}
|
||||
<img src="{{ preview.source.url }}" alt="{{ preview.preview.caption }}">
|
||||
{% endif %}
|
||||
<img src="{{ thumbnail_1080p_url }}" alt="{{ preview.preview.caption }}">
|
||||
</a>
|
||||
{% endwith %}
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% else %}
|
||||
|
@ -1,10 +1,16 @@
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib import admin
|
||||
from django.template.loader import render_to_string
|
||||
import background_task.admin
|
||||
import background_task.models
|
||||
|
||||
from .models import File, FileValidation
|
||||
import files.signals
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def scan_selected_files(self, request, queryset):
|
||||
"""Scan selected files."""
|
||||
@ -12,6 +18,12 @@ def scan_selected_files(self, request, queryset):
|
||||
files.signals.schedule_scan(instance)
|
||||
|
||||
|
||||
def make_thumbnails(self, request, queryset):
|
||||
"""Schedule thumbnails generation for selected files."""
|
||||
for instance in queryset.filter(type__in=(File.TYPES.IMAGE, File.TYPES.VIDEO)):
|
||||
files.tasks.make_thumbnails(file_id=instance.pk)
|
||||
|
||||
|
||||
class FileValidationInlineAdmin(admin.StackedInline):
|
||||
model = FileValidation
|
||||
readonly_fields = ('date_created', 'date_modified', 'is_ok', 'results')
|
||||
@ -27,6 +39,28 @@ class FileValidationInlineAdmin(admin.StackedInline):
|
||||
|
||||
@admin.register(File)
|
||||
class FileAdmin(admin.ModelAdmin):
|
||||
class Media:
|
||||
css = {'all': ('files/admin/file.css',)}
|
||||
|
||||
def thumbnails(self, obj):
|
||||
if not obj or not (obj.is_image or obj.is_video):
|
||||
return ''
|
||||
try:
|
||||
context = {'file': obj, 'MEDIA_URL': settings.MEDIA_URL}
|
||||
return render_to_string('files/admin/thumbnails.html', context)
|
||||
except Exception:
|
||||
# Make sure any exception happening here is always logged
|
||||
# (e.g. admin eats exceptions in ModelAdmin properties, making it hard to debug)
|
||||
logger.exception('Failed to render thumbnails')
|
||||
raise
|
||||
|
||||
def get_form(self, request, obj=None, **kwargs):
|
||||
"""Override metadata help text depending on file type."""
|
||||
if obj and (obj.is_image or obj.is_video):
|
||||
help_text = 'Additional information about the file, e.g. existing thumbnails.'
|
||||
kwargs.update({'help_texts': {'metadata': help_text}})
|
||||
return super().get_form(request, obj, **kwargs)
|
||||
|
||||
view_on_site = False
|
||||
save_on_top = True
|
||||
|
||||
@ -48,6 +82,9 @@ class FileAdmin(admin.ModelAdmin):
|
||||
'date_approved',
|
||||
'date_status_changed',
|
||||
'size_bytes',
|
||||
'thumbnails',
|
||||
'thumbnail',
|
||||
'type',
|
||||
'user',
|
||||
'original_hash',
|
||||
'original_name',
|
||||
@ -67,9 +104,8 @@ class FileAdmin(admin.ModelAdmin):
|
||||
{
|
||||
'fields': (
|
||||
'id',
|
||||
('source', 'thumbnail'),
|
||||
('original_name', 'content_type'),
|
||||
'type',
|
||||
('source', 'thumbnails', 'thumbnail'),
|
||||
('type', 'content_type', 'original_name'),
|
||||
'status',
|
||||
)
|
||||
},
|
||||
@ -99,7 +135,7 @@ class FileAdmin(admin.ModelAdmin):
|
||||
)
|
||||
|
||||
inlines = [FileValidationInlineAdmin]
|
||||
actions = [scan_selected_files]
|
||||
actions = [scan_selected_files, make_thumbnails]
|
||||
|
||||
def is_ok(self, obj):
|
||||
return obj.validation.is_ok if hasattr(obj, 'validation') else None
|
||||
|
19
files/migrations/0008_alter_file_thumbnail.py
Normal file
19
files/migrations/0008_alter_file_thumbnail.py
Normal file
@ -0,0 +1,19 @@
|
||||
# Generated by Django 4.2.11 on 2024-04-23 10:31
|
||||
|
||||
from django.db import migrations, models
|
||||
import files.models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('files', '0007_alter_file_status'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='file',
|
||||
name='thumbnail',
|
||||
field=models.ImageField(blank=True, editable=False, help_text='Thumbnail generated from uploaded image or video source file', max_length=256, null=True, upload_to=files.models.thumbnail_upload_to),
|
||||
),
|
||||
]
|
@ -6,11 +6,8 @@ from django.contrib.auth import get_user_model
|
||||
from django.db import models
|
||||
|
||||
from common.model_mixins import CreatedModifiedMixin, TrackChangesMixin
|
||||
from files.utils import get_sha256, guess_mimetype_from_ext
|
||||
from constants.base import (
|
||||
FILE_STATUS_CHOICES,
|
||||
FILE_TYPE_CHOICES,
|
||||
)
|
||||
from files.utils import get_sha256, guess_mimetype_from_ext, get_thumbnail_upload_to
|
||||
from constants.base import FILE_STATUS_CHOICES, FILE_TYPE_CHOICES
|
||||
import utils
|
||||
|
||||
User = get_user_model()
|
||||
@ -41,15 +38,11 @@ def file_upload_to(instance, filename):
|
||||
|
||||
|
||||
def thumbnail_upload_to(instance, filename):
|
||||
prefix = 'thumbnails/'
|
||||
_hash = instance.hash.split(':')[-1]
|
||||
extension = Path(filename).suffix
|
||||
path = Path(prefix, _hash[:2], _hash).with_suffix(extension)
|
||||
return path
|
||||
return get_thumbnail_upload_to(instance.hash)
|
||||
|
||||
|
||||
class File(CreatedModifiedMixin, TrackChangesMixin, models.Model):
|
||||
track_changes_to_fields = {'status', 'size_bytes', 'hash'}
|
||||
track_changes_to_fields = {'status', 'size_bytes', 'hash', 'thumbnail', 'metadata'}
|
||||
|
||||
TYPES = FILE_TYPE_CHOICES
|
||||
STATUSES = FILE_STATUS_CHOICES
|
||||
@ -63,7 +56,8 @@ class File(CreatedModifiedMixin, TrackChangesMixin, models.Model):
|
||||
null=True,
|
||||
blank=True,
|
||||
max_length=256,
|
||||
help_text='Image thumbnail in case file is a video',
|
||||
help_text='Thumbnail generated from uploaded image or video source file',
|
||||
editable=False,
|
||||
)
|
||||
content_type = models.CharField(max_length=256, null=True, blank=True)
|
||||
type = models.PositiveSmallIntegerField(
|
||||
@ -203,6 +197,30 @@ class File(CreatedModifiedMixin, TrackChangesMixin, models.Model):
|
||||
def get_submit_url(self) -> str:
|
||||
return self.extension.get_draft_url()
|
||||
|
||||
def get_thumbnail_of_size(self, size_key: str) -> str:
|
||||
"""Return absolute path portion of the URL of a thumbnail of this file.
|
||||
|
||||
Fall back to the source file, if no thumbnail is stored.
|
||||
Log absence of the thumbnail file instead of exploding somewhere in the templates.
|
||||
"""
|
||||
# We don't (yet?) have thumbnails for anything other than images and videos.
|
||||
assert self.is_image or self.is_video, f'File pk={self.pk} is neither image nor video'
|
||||
|
||||
try:
|
||||
path = self.metadata['thumbnails'][size_key]['path']
|
||||
return self.thumbnail.storage.url(path)
|
||||
except (KeyError, TypeError):
|
||||
log.exception(f'File pk={self.pk} is missing thumbnail "{size_key}": {self.metadata}')
|
||||
return self.source.url
|
||||
|
||||
@property
|
||||
def thumbnail_1080p_url(self) -> str:
|
||||
return self.get_thumbnail_of_size('1080p')
|
||||
|
||||
@property
|
||||
def thumbnail_360p_url(self) -> str:
|
||||
return self.get_thumbnail_of_size('360p')
|
||||
|
||||
|
||||
class FileValidation(CreatedModifiedMixin, TrackChangesMixin, models.Model):
|
||||
track_changes_to_fields = {'is_ok', 'results'}
|
||||
|
@ -5,6 +5,7 @@ from django.dispatch import receiver
|
||||
|
||||
import files.models
|
||||
import files.tasks
|
||||
import files.utils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -35,6 +36,30 @@ def _scan_new_file(
|
||||
schedule_scan(instance)
|
||||
|
||||
|
||||
def schedule_thumbnails(file: files.models.File) -> None:
|
||||
"""Schedule thumbnail generation for a given file."""
|
||||
args = {'pk': file.pk, 'type': file.get_type_display()}
|
||||
logger.info('Scheduling thumbnail generation for file pk=%(pk)s type=%(type)s', args)
|
||||
verbose_name = f'make thumbnails for "{file.source.name}"'
|
||||
files.tasks.make_thumbnails(file_id=file.pk, creator=file, verbose_name=verbose_name)
|
||||
|
||||
|
||||
@receiver(post_save, sender=files.models.FileValidation)
|
||||
def _schedule_thumbnails(
|
||||
sender: object, instance: files.models.FileValidation, created: bool, **kwargs: object
|
||||
) -> None:
|
||||
if not created:
|
||||
return
|
||||
|
||||
if not instance.is_ok:
|
||||
return
|
||||
|
||||
file = instance.file
|
||||
# Generate thumbnails if initial scan found no issues
|
||||
if file.is_image or file.is_video:
|
||||
schedule_thumbnails(file)
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=files.models.File)
|
||||
@receiver(pre_delete, sender=files.models.FileValidation)
|
||||
def _log_deletion(sender: object, instance: files.models.File, **kwargs: object) -> None:
|
||||
@ -46,3 +71,4 @@ def delete_orphaned_files(sender: object, instance: files.models.File, **kwargs:
|
||||
"""Delete source and thumbnail files from storage when File record is deleted."""
|
||||
files.utils.delete_file_in_storage(instance.source.name)
|
||||
files.utils.delete_file_in_storage(instance.thumbnail.name)
|
||||
files.utils.delete_thumbnails(instance.metadata)
|
||||
|
11
files/static/files/admin/file.css
Normal file
11
files/static/files/admin/file.css
Normal file
@ -0,0 +1,11 @@
|
||||
.file-thumbnail {
|
||||
display: inline-block;
|
||||
border: grey solid 1px;
|
||||
margin-left: 0.5rem;
|
||||
}
|
||||
.file-thumbnail-size {
|
||||
position: absolute;
|
||||
background: rgba(255, 255, 255, 0.5);
|
||||
padding-right: 0.5rem;
|
||||
padding-left: 0.5rem;
|
||||
}
|
@ -27,3 +27,45 @@ def clamdscan(file_id: int):
|
||||
file_validation.results = scan_result
|
||||
file_validation.is_ok = is_ok
|
||||
file_validation.save(update_fields={'results', 'is_ok', 'date_modified'})
|
||||
|
||||
|
||||
@background(schedule={'action': TaskSchedule.RESCHEDULE_EXISTING})
|
||||
def make_thumbnails(file_id: int) -> None:
|
||||
"""Generate thumbnails for a given file, store them in thumbnail and metadata columns."""
|
||||
file = files.models.File.objects.get(pk=file_id)
|
||||
args = {'pk': file_id, 'type': file.get_type_display()}
|
||||
|
||||
if not file.is_image and not file.is_video:
|
||||
logger.error('File pk=%(pk)s of type "%(type)s" is neither an image nor a video', args)
|
||||
return
|
||||
if not file.validation.is_ok:
|
||||
logger.error("File pk={pk} is flagged, won't make thumbnails".format(**args))
|
||||
return
|
||||
|
||||
# For an image, source of the thumbnails is the original image
|
||||
source_path = file.source.path
|
||||
thumbnail_field = file.thumbnail
|
||||
unchanged_thumbnail = thumbnail_field.name
|
||||
|
||||
if file.is_video:
|
||||
frame_path = files.utils.get_thumbnail_upload_to(file.hash)
|
||||
# For a video, source of the thumbnails is a frame extracted with ffpeg
|
||||
files.utils.extract_frame(source_path, frame_path)
|
||||
thumbnail_field.name = frame_path
|
||||
source_path = frame_path
|
||||
|
||||
thumbnails = files.utils.make_thumbnails(source_path, file.hash)
|
||||
|
||||
if not thumbnail_field.name:
|
||||
thumbnail_field.name = thumbnails['1080p']['path']
|
||||
|
||||
update_fields = set()
|
||||
if thumbnail_field.name != unchanged_thumbnail:
|
||||
update_fields.add('thumbnail')
|
||||
if file.metadata.get('thumbnails') != thumbnails:
|
||||
file.metadata.update({'thumbnails': thumbnails})
|
||||
update_fields.add('metadata')
|
||||
if update_fields:
|
||||
args['update_fields'] = update_fields
|
||||
logger.info('Made thumbnails for file pk=%(pk)s, updating %(update_fields)s', args)
|
||||
file.save(update_fields=update_fields)
|
||||
|
8
files/templates/files/admin/thumbnails.html
Normal file
8
files/templates/files/admin/thumbnails.html
Normal file
@ -0,0 +1,8 @@
|
||||
<div class="file-thumbnails">
|
||||
{% for size_key, thumb in file.metadata.thumbnails.items %}
|
||||
<div class="file-thumbnail">
|
||||
<span class="file-thumbnail-size">{{ thumb.size.0 }}x{{ thumb.size.1 }}px</span>
|
||||
<img height="{% widthratio thumb.size.1 10 1 %}" src="{{ MEDIA_URL }}{{ thumb.path }}" title={{ thumb.path }}>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
@ -42,9 +42,11 @@ class FileTest(TestCase):
|
||||
'new_state': {'status': 'Approved'},
|
||||
'object': '<File: test.zip (Approved)>',
|
||||
'old_state': {
|
||||
'status': 2,
|
||||
'hash': 'foobar',
|
||||
'metadata': {},
|
||||
'size_bytes': 7149,
|
||||
'status': 2,
|
||||
'thumbnail': '',
|
||||
},
|
||||
}
|
||||
},
|
||||
|
112
files/tests/test_tasks.py
Normal file
112
files/tests/test_tasks.py
Normal file
@ -0,0 +1,112 @@
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
import logging
|
||||
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
from common.tests.factories.files import FileFactory
|
||||
from files.tasks import make_thumbnails
|
||||
import files.models
|
||||
|
||||
TEST_MEDIA_DIR = Path(__file__).resolve().parent / 'media'
|
||||
|
||||
|
||||
@override_settings(MEDIA_ROOT=TEST_MEDIA_DIR)
|
||||
class TasksTest(TestCase):
|
||||
def test_make_thumbnails_fails_when_no_validation(self):
|
||||
file = FileFactory(original_hash='foobar', source='file/original_image_source.jpg')
|
||||
|
||||
with self.assertRaises(files.models.File.validation.RelatedObjectDoesNotExist):
|
||||
make_thumbnails.task_function(file_id=file.pk)
|
||||
|
||||
@patch('files.utils.make_thumbnails')
|
||||
def test_make_thumbnails_fails_when_validation_not_ok(self, mock_make_thumbnails):
|
||||
file = FileFactory(original_hash='foobar', source='file/original_image_source.jpg')
|
||||
files.models.FileValidation.objects.create(file=file, is_ok=False, results={})
|
||||
|
||||
with self.assertLogs(level=logging.ERROR) as logs:
|
||||
make_thumbnails.task_function(file_id=file.pk)
|
||||
|
||||
self.maxDiff = None
|
||||
self.assertEqual(
|
||||
logs.output[0], f"ERROR:files.tasks:File pk={file.pk} is flagged, won't make thumbnails"
|
||||
)
|
||||
|
||||
mock_make_thumbnails.assert_not_called()
|
||||
|
||||
@patch('files.utils.make_thumbnails')
|
||||
def test_make_thumbnails_fails_when_not_image_or_video(self, mock_make_thumbnails):
|
||||
file = FileFactory(
|
||||
original_hash='foobar', source='file/source.zip', type=files.models.File.TYPES.THEME
|
||||
)
|
||||
|
||||
with self.assertLogs(level=logging.ERROR) as logs:
|
||||
make_thumbnails.task_function(file_id=file.pk)
|
||||
|
||||
self.maxDiff = None
|
||||
self.assertEqual(
|
||||
logs.output[0],
|
||||
f'ERROR:files.tasks:File pk={file.pk} of type "Theme" is neither an image nor a video',
|
||||
)
|
||||
|
||||
mock_make_thumbnails.assert_not_called()
|
||||
|
||||
@patch('files.utils.resize_image')
|
||||
@patch('files.utils.Image')
|
||||
def test_make_thumbnails_for_image(self, mock_image, mock_resize_image):
|
||||
file = FileFactory(original_hash='foobar', source='file/original_image_source.jpg')
|
||||
files.models.FileValidation.objects.create(file=file, is_ok=True, results={})
|
||||
self.assertIsNone(file.thumbnail.name)
|
||||
self.assertEqual(file.metadata, {})
|
||||
|
||||
make_thumbnails.task_function(file_id=file.pk)
|
||||
|
||||
mock_image.open.assert_called_once_with(
|
||||
str(TEST_MEDIA_DIR / 'file' / 'original_image_source.jpg')
|
||||
)
|
||||
mock_image.open.return_value.close.assert_called_once()
|
||||
|
||||
file.refresh_from_db()
|
||||
self.assertEqual(file.thumbnail.name, 'thumbnails/fo/foobar_1920x1080.png')
|
||||
self.assertEqual(
|
||||
file.metadata,
|
||||
{
|
||||
'thumbnails': {
|
||||
'1080p': {'path': 'thumbnails/fo/foobar_1920x1080.png', 'size': [1920, 1080]},
|
||||
'360p': {'path': 'thumbnails/fo/foobar_640x360.png', 'size': [640, 360]},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@patch('files.utils.resize_image')
|
||||
@patch('files.utils.Image')
|
||||
@patch('files.utils.FFmpeg')
|
||||
def test_make_thumbnails_for_video(self, mock_ffmpeg, mock_image, mock_resize_image):
|
||||
file = FileFactory(
|
||||
original_hash='deadbeef', source='file/path.mp4', type=files.models.File.TYPES.VIDEO
|
||||
)
|
||||
files.models.FileValidation.objects.create(file=file, is_ok=True, results={})
|
||||
self.assertIsNone(file.thumbnail.name)
|
||||
self.assertEqual(file.metadata, {})
|
||||
|
||||
make_thumbnails.task_function(file_id=file.pk)
|
||||
|
||||
mock_ffmpeg.assert_called_once_with()
|
||||
mock_image.open.assert_called_once_with(
|
||||
str(TEST_MEDIA_DIR / 'thumbnails' / 'de' / 'deadbeef.png')
|
||||
)
|
||||
mock_image.open.return_value.close.assert_called_once()
|
||||
|
||||
file.refresh_from_db()
|
||||
# Check that the extracted frame is stored instead of the large thumbnail
|
||||
self.assertEqual(file.thumbnail.name, 'thumbnails/de/deadbeef.png')
|
||||
# Check that File metadata and thumbnail fields were updated
|
||||
self.assertEqual(
|
||||
file.metadata,
|
||||
{
|
||||
'thumbnails': {
|
||||
'1080p': {'path': 'thumbnails/de/deadbeef_1920x1080.png', 'size': [1920, 1080]},
|
||||
'360p': {'path': 'thumbnails/de/deadbeef_640x360.png', 'size': [640, 360]},
|
||||
},
|
||||
},
|
||||
)
|
@ -1,6 +1,20 @@
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, ANY
|
||||
import tempfile
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
from files.utils import find_path_by_name, find_exact_path, filter_paths_by_ext
|
||||
from files.utils import (
|
||||
extract_frame,
|
||||
filter_paths_by_ext,
|
||||
find_exact_path,
|
||||
find_path_by_name,
|
||||
get_thumbnail_upload_to,
|
||||
make_thumbnails,
|
||||
)
|
||||
|
||||
# Reusing test files from the extensions app
|
||||
TEST_FILES_DIR = Path(__file__).resolve().parent.parent.parent / 'extensions' / 'tests' / 'files'
|
||||
|
||||
|
||||
class UtilsTest(TestCase):
|
||||
@ -98,3 +112,49 @@ class UtilsTest(TestCase):
|
||||
]
|
||||
paths = filter_paths_by_ext(name_list, '.md')
|
||||
self.assertEqual(list(paths), [])
|
||||
|
||||
def test_get_thumbnail_upload_to(self):
|
||||
for file_hash, kwargs, expected in (
|
||||
('foobar', {}, 'thumbnails/fo/foobar.png'),
|
||||
('deadbeef', {'width': None, 'height': None}, 'thumbnails/de/deadbeef.png'),
|
||||
('deadbeef', {'width': 640, 'height': 360}, 'thumbnails/de/deadbeef_640x360.png'),
|
||||
):
|
||||
with self.subTest(file_hash=file_hash, kwargs=kwargs):
|
||||
self.assertEqual(get_thumbnail_upload_to(file_hash, **kwargs), expected)
|
||||
|
||||
@patch('files.utils.resize_image')
|
||||
def test_make_thumbnails(self, mock_resize_image):
|
||||
self.assertEqual(
|
||||
{
|
||||
'1080p': {'path': 'thumbnails/fo/foobar_1920x1080.png', 'size': [1920, 1080]},
|
||||
'360p': {'path': 'thumbnails/fo/foobar_640x360.png', 'size': [640, 360]},
|
||||
},
|
||||
make_thumbnails(TEST_FILES_DIR / 'test_preview_image_0001.png', 'foobar'),
|
||||
)
|
||||
|
||||
self.assertEqual(len(mock_resize_image.mock_calls), 2)
|
||||
for expected_size in ([1920, 1080], [640, 360]):
|
||||
with self.subTest(expected_size=expected_size):
|
||||
mock_resize_image.assert_any_call(
|
||||
ANY,
|
||||
expected_size,
|
||||
ANY,
|
||||
output_format='PNG',
|
||||
quality=83,
|
||||
optimize=True,
|
||||
progressive=True,
|
||||
)
|
||||
|
||||
@patch('files.utils.FFmpeg')
|
||||
def test_extract_frame(self, mock_ffmpeg):
|
||||
with tempfile.TemporaryDirectory() as output_dir:
|
||||
extract_frame('path/to/source/video.mp4', output_dir + '/frame.png')
|
||||
mock_ffmpeg.return_value.option.return_value.input.return_value.output.assert_any_call(
|
||||
output_dir + '/frame.png', {'ss': '00:00:00.01', 'frames:v': 1, 'update': 'true'}
|
||||
)
|
||||
|
||||
self.assertEqual(len(mock_ffmpeg.mock_calls), 5)
|
||||
mock_ffmpeg.assert_any_call()
|
||||
mock_ffmpeg.return_value.option.return_value.input.assert_any_call(
|
||||
'path/to/source/video.mp4'
|
||||
)
|
||||
|
111
files/utils.py
111
files/utils.py
@ -1,19 +1,26 @@
|
||||
from pathlib import Path
|
||||
import datetime
|
||||
import hashlib
|
||||
import io
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
import os.path
|
||||
import tempfile
|
||||
import toml
|
||||
import typing
|
||||
import zipfile
|
||||
|
||||
from PIL import Image
|
||||
from django.conf import settings
|
||||
from django.core.files.storage import default_storage
|
||||
from ffmpeg import FFmpeg, FFmpegFileNotFound, FFmpegInvalidCommand, FFmpegError
|
||||
from lxml import etree
|
||||
import clamd
|
||||
import magic
|
||||
|
||||
from constants.base import THUMBNAIL_FORMAT, THUMBNAIL_SIZES, THUMBNAIL_QUALITY
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
MODULE_DIR = Path(__file__).resolve().parent
|
||||
THEME_SCHEMA = []
|
||||
@ -185,3 +192,107 @@ def delete_file_in_storage(file_name: str) -> None:
|
||||
else:
|
||||
logger.info('Deleting %s from storage', file_name)
|
||||
default_storage.delete(file_name)
|
||||
|
||||
|
||||
def delete_thumbnails(file_metadata: dict) -> None:
|
||||
"""Read thumbnail paths from given metadata and delete them from storage."""
|
||||
thumbnails = file_metadata.get('thumbnails', {})
|
||||
for _, thumb in thumbnails.items():
|
||||
path = thumb.get('path', '')
|
||||
if not path:
|
||||
continue
|
||||
delete_file_in_storage(path)
|
||||
|
||||
|
||||
def get_thumbnail_upload_to(file_hash: str, width: int = None, height: int = None) -> str:
|
||||
"""Return a full media path of a thumbnail.
|
||||
|
||||
Optionally, append thumbnail dimensions to the file name.
|
||||
"""
|
||||
prefix = 'thumbnails/'
|
||||
_hash = file_hash.split(':')[-1]
|
||||
thumbnail_ext = THUMBNAIL_FORMAT.lower()
|
||||
if thumbnail_ext == 'jpeg':
|
||||
thumbnail_ext = 'jpg'
|
||||
suffix = f'.{thumbnail_ext}'
|
||||
size_suffix = f'_{width}x{height}' if width and height else ''
|
||||
path = Path(prefix, _hash[:2], f'{_hash}{size_suffix}').with_suffix(suffix)
|
||||
return str(path)
|
||||
|
||||
|
||||
def resize_image(image: Image, size: tuple, output, output_format: str = 'PNG', **output_params):
|
||||
"""Resize a models.ImageField to a given size and write it into output file."""
|
||||
start_t = datetime.datetime.now()
|
||||
|
||||
source_image = image.convert('RGBA' if output_format == 'PNG' else 'RGB')
|
||||
source_image.thumbnail(size, Image.LANCZOS)
|
||||
source_image.save(output, output_format, **output_params)
|
||||
|
||||
end_t = datetime.datetime.now()
|
||||
args = {'source': image, 'size': size, 'time': (end_t - start_t).microseconds / 1000}
|
||||
logger.info('%(source)s to %(size)s done in %(time)sms', args)
|
||||
|
||||
|
||||
def make_thumbnails(
|
||||
source_path: str, file_hash: str, output_format: str = THUMBNAIL_FORMAT
|
||||
) -> dict:
|
||||
"""Generate thumbnail files for given file and a predefined list of dimensions.
|
||||
|
||||
Resulting thumbnail paths a derived from the given file hash and thumbnail sizes.
|
||||
Return a dict of size keys to output paths of generated thumbnail images.
|
||||
"""
|
||||
start_t = datetime.datetime.now()
|
||||
thumbnails = {}
|
||||
abs_path = os.path.join(settings.MEDIA_ROOT, source_path)
|
||||
image = Image.open(abs_path)
|
||||
for size_key, size in THUMBNAIL_SIZES.items():
|
||||
w, h = size
|
||||
output_path = get_thumbnail_upload_to(file_hash, width=w, height=h)
|
||||
with tempfile.TemporaryFile() as f:
|
||||
logger.info('Resizing %s to %s (%s)', abs_path, size, output_format)
|
||||
resize_image(
|
||||
image,
|
||||
size,
|
||||
f,
|
||||
output_format=THUMBNAIL_FORMAT,
|
||||
quality=THUMBNAIL_QUALITY,
|
||||
optimize=True,
|
||||
progressive=True,
|
||||
)
|
||||
logger.info('Saving a thumbnail to %s', output_path)
|
||||
# Overwrite files instead of allowing storage generate a deduplicating suffix
|
||||
if default_storage.exists(output_path):
|
||||
logger.warning('%s exists, overwriting', output_path)
|
||||
default_storage.delete(output_path)
|
||||
default_storage.save(output_path, f)
|
||||
thumbnails[size_key] = {'size': size, 'path': output_path}
|
||||
image.close()
|
||||
|
||||
end_t = datetime.datetime.now()
|
||||
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
|
||||
logger.info('%(source)s done in %(time)sms', args)
|
||||
return thumbnails
|
||||
|
||||
|
||||
def extract_frame(source_path: str, output_path: str, at_time: str = '00:00:00.01'):
|
||||
"""Extract a single frame of a video at a given path, write it to the given output path."""
|
||||
try:
|
||||
start_t = datetime.datetime.now()
|
||||
abs_path = os.path.join(settings.MEDIA_ROOT, output_path)
|
||||
ffmpeg = (
|
||||
FFmpeg()
|
||||
.option('y')
|
||||
.input(source_path)
|
||||
.output(abs_path, {'ss': at_time, 'frames:v': 1, 'update': 'true'})
|
||||
)
|
||||
output_dir = os.path.dirname(abs_path)
|
||||
if not os.path.isdir(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
ffmpeg.execute()
|
||||
|
||||
end_t = datetime.datetime.now()
|
||||
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
|
||||
logger.info('%(source)s done in %(time)sms', args)
|
||||
except (FFmpegError, FFmpegFileNotFound, FFmpegInvalidCommand) as e:
|
||||
logger.exception(f'Failed to extract a frame: {e.message}, {" ".join(ffmpeg.arguments)}')
|
||||
raise
|
||||
|
@ -40,6 +40,7 @@ mistune==2.0.4
|
||||
multidict==6.0.2
|
||||
oauthlib==3.2.0
|
||||
Pillow==9.2.0
|
||||
python-ffmpeg==2.0.12
|
||||
python-magic==0.4.27
|
||||
requests==2.28.1
|
||||
requests-oauthlib==1.3.1
|
||||
|
@ -76,12 +76,14 @@
|
||||
<h3>Previews Pending Approval</h3>
|
||||
<div class="row">
|
||||
{% for preview in pending_previews %}
|
||||
{% with thumbnail_1080p_url=preview.file.thumbnail_1080p_url %}
|
||||
<div class="col-md-3">
|
||||
<a href="{{ preview.file.source.url }}" class="d-block mb-2" title="{{ preview.caption }}" target="_blank">
|
||||
<img class="img-fluid rounded" src="{{ preview.file.source.url }}" alt="{{ preview.caption }}">
|
||||
<img class="img-fluid rounded" src="{{ thumbnail_1080p_url }}" alt="{{ preview.caption }}">
|
||||
</a>
|
||||
{% include "common/components/status.html" with object=preview.file class="d-block" %}
|
||||
</div>
|
||||
{% endwith %}
|
||||
{% endfor %}
|
||||
</div>
|
||||
</section>
|
||||
|
Loading…
Reference in New Issue
Block a user