extensions-website/files/utils.py
Anna Sirota ad66ec6476 Thumbnails for images and videos (#87)
When an image or a video is uploaded and its initial scan comes back with an OK, thumbnails are generated for it.
In case the file is a video, a single frame is extracted from it with `ffmpeg` and used instead of `File.source`.
Thumbnails have predefined set of dimensions: 1920 x 1080 and 640 x 360 (for cards).

When original file is an image, the path of the largest thumbnail is saved in `File.thumbnail` column, when it's a video the extracted frame is stored in `File.thumbnails` instead. Additionally, all thumbnail paths are saved in `File.metadata`.
When `File` is deleted all its thumbnails are deleted along with it.

Implements #51

Part of #70

Reviewed-on: #87
Reviewed-by: Oleg-Komarov <oleg-komarov@noreply.localhost>
2024-04-25 17:50:56 +02:00

299 lines
11 KiB
Python

from pathlib import Path
import datetime
import hashlib
import io
import logging
import mimetypes
import os
import os.path
import tempfile
import toml
import typing
import zipfile
from PIL import Image
from django.conf import settings
from django.core.files.storage import default_storage
from ffmpeg import FFmpeg, FFmpegFileNotFound, FFmpegInvalidCommand, FFmpegError
from lxml import etree
import clamd
import magic
from constants.base import THUMBNAIL_FORMAT, THUMBNAIL_SIZES, THUMBNAIL_QUALITY
logger = logging.getLogger(__name__)
MODULE_DIR = Path(__file__).resolve().parent
THEME_SCHEMA = []
def _get_theme_schema():
if not THEME_SCHEMA:
with open(MODULE_DIR / 'theme.xsd', 'rb') as f:
THEME_SCHEMA.append(etree.XMLSchema(etree.XML(f.read())))
return THEME_SCHEMA[0]
def get_sha256(file_obj):
"""Calculate a sha256 hash for `file_obj`.
`file_obj` must either be be an open file descriptor, in which case the
caller needs to take care of closing it properly, or a django File-like
object with a chunks() method to iterate over its contents.
"""
hash_ = hashlib.sha256()
if hasattr(file_obj, 'chunks') and callable(file_obj.chunks):
iterator = file_obj.chunks()
else:
iterator = iter(lambda: file_obj.read(io.DEFAULT_BUFFER_SIZE), b'')
for chunk in iterator:
hash_.update(chunk)
# This file might be read again by validation or other utilities
file_obj.seek(0)
return hash_.hexdigest()
def get_sha256_from_value(value: str):
"""Calculate a sha256 hash for a given string value."""
hash_ = hashlib.sha256()
hash_.update(str(value).encode())
return hash_.hexdigest()
def find_path_by_name(paths: typing.List[str], name: str) -> typing.Optional[str]:
"""Return the first occurrence of file name in a given list of paths."""
for file_path in paths:
# Remove leading/trailing whitespace from file path
file_path_stripped = file_path.strip()
# Check if the basename of the stripped path is equal to the target file name
if os.path.basename(file_path_stripped) == name:
return file_path_stripped
return None
def find_exact_path(paths: typing.List[str], exact_path: str) -> typing.Optional[str]:
"""Return a first path equal to a given one if it exists in a given list of paths."""
matching_paths = (path for path in paths if path == exact_path)
return next(matching_paths, None)
def filter_paths_by_ext(paths: typing.List[str], ext: str) -> typing.Iterable[str]:
"""Generate a list of paths having a given extension from a given list of paths."""
for file_path in paths:
# Get file path's extension
_, file_path_ext = os.path.splitext(file_path)
# Check if this file's extension matches the extension we are looking for
if file_path_ext.lower() == ext.lower():
yield file_path
def read_manifest_from_zip(archive_path):
"""Read and validate extension's manifest file and contents of the archive.
In any extension archive, a valid `blender_manifest.toml` file is expected
to be found at the top level of the archive, or inside a single nested directory.
Additionally, depending on the extension type defined in the manifest,
the archive is expected to have a particular file structure:
* for themes, a single XML file is expected next to the manifest;
* for add-ons, the following structure is expected:
```
some-addon.zip
└─ an-optional-dir
├─ blender_manifest.toml
├─ __init__.py
└─ (...)
```
"""
manifest_name = 'blender_manifest.toml'
error_codes = []
try:
with zipfile.ZipFile(archive_path) as myzip:
bad_file = myzip.testzip()
if bad_file is not None:
logger.error('Bad file in ZIP')
error_codes.append('invalid_zip_archive')
return None, error_codes
file_list = myzip.namelist()
manifest_filepath = find_path_by_name(file_list, manifest_name)
if manifest_filepath is None:
logger.info(f"File '{manifest_name}' not found in the archive.")
error_codes.append('missing_manifest_toml')
return None, error_codes
# Manifest file is expected to be no deeper than one directory down
if os.path.dirname(os.path.dirname(manifest_filepath)) != '':
error_codes.append('invalid_manifest_path')
return None, error_codes
# Extract the file content
with myzip.open(manifest_filepath) as file_content:
toml_content = toml.loads(file_content.read().decode())
# If manifest was parsed successfully, do additional type-specific validation
type_slug = toml_content['type']
if type_slug == 'theme':
theme_xmls = filter_paths_by_ext(file_list, '.xml')
if len(list(theme_xmls)) != 1:
error_codes.append('missing_or_multiple_theme_xml')
elif type_slug == 'add-on':
# __init__.py is expected to be next to the manifest
expected_init_path = os.path.join(os.path.dirname(manifest_filepath), '__init__.py')
init_filepath = find_exact_path(file_list, expected_init_path)
if not init_filepath:
error_codes.append('invalid_missing_init')
return toml_content, error_codes
except toml.decoder.TomlDecodeError as e:
logger.error(f"Manifest Error: {e.msg}")
error_codes.append('invalid_manifest_toml')
except Exception as e:
logger.error(f"Error extracting from archive: {e}")
error_codes.append('invalid_zip_archive')
return None, error_codes
def guess_mimetype_from_ext(file_name: str) -> str:
"""Guess MIME-type from the extension of the given file name."""
mimetype_from_ext, _ = mimetypes.guess_type(file_name)
return mimetype_from_ext
def guess_mimetype_from_content(file_obj) -> str:
"""Guess MIME-type based on a portion of the given file's bytes."""
mimetype_from_bytes = magic.from_buffer(file_obj.read(2048), mime=True)
# This file might be read again by validation or other utilities
file_obj.seek(0)
return mimetype_from_bytes
def run_clamdscan(abs_path: str) -> tuple:
logger.info('Scanning file at path=%s', abs_path)
clamd_socket = clamd.ClamdUnixSocket()
with open(abs_path, 'rb') as f:
result = clamd_socket.instream(f)['stream']
logger.info('File at path=%s scanned: %s', abs_path, result)
return result
def delete_file_in_storage(file_name: str) -> None:
"""Delete file from disk or whatever other default storage."""
if not file_name:
return
if not default_storage.exists(file_name):
logger.warning("%s doesn't exist in storage, nothing to delete", file_name)
else:
logger.info('Deleting %s from storage', file_name)
default_storage.delete(file_name)
def delete_thumbnails(file_metadata: dict) -> None:
"""Read thumbnail paths from given metadata and delete them from storage."""
thumbnails = file_metadata.get('thumbnails', {})
for _, thumb in thumbnails.items():
path = thumb.get('path', '')
if not path:
continue
delete_file_in_storage(path)
def get_thumbnail_upload_to(file_hash: str, width: int = None, height: int = None) -> str:
"""Return a full media path of a thumbnail.
Optionally, append thumbnail dimensions to the file name.
"""
prefix = 'thumbnails/'
_hash = file_hash.split(':')[-1]
thumbnail_ext = THUMBNAIL_FORMAT.lower()
if thumbnail_ext == 'jpeg':
thumbnail_ext = 'jpg'
suffix = f'.{thumbnail_ext}'
size_suffix = f'_{width}x{height}' if width and height else ''
path = Path(prefix, _hash[:2], f'{_hash}{size_suffix}').with_suffix(suffix)
return str(path)
def resize_image(image: Image, size: tuple, output, output_format: str = 'PNG', **output_params):
"""Resize a models.ImageField to a given size and write it into output file."""
start_t = datetime.datetime.now()
source_image = image.convert('RGBA' if output_format == 'PNG' else 'RGB')
source_image.thumbnail(size, Image.LANCZOS)
source_image.save(output, output_format, **output_params)
end_t = datetime.datetime.now()
args = {'source': image, 'size': size, 'time': (end_t - start_t).microseconds / 1000}
logger.info('%(source)s to %(size)s done in %(time)sms', args)
def make_thumbnails(
source_path: str, file_hash: str, output_format: str = THUMBNAIL_FORMAT
) -> dict:
"""Generate thumbnail files for given file and a predefined list of dimensions.
Resulting thumbnail paths a derived from the given file hash and thumbnail sizes.
Return a dict of size keys to output paths of generated thumbnail images.
"""
start_t = datetime.datetime.now()
thumbnails = {}
abs_path = os.path.join(settings.MEDIA_ROOT, source_path)
image = Image.open(abs_path)
for size_key, size in THUMBNAIL_SIZES.items():
w, h = size
output_path = get_thumbnail_upload_to(file_hash, width=w, height=h)
with tempfile.TemporaryFile() as f:
logger.info('Resizing %s to %s (%s)', abs_path, size, output_format)
resize_image(
image,
size,
f,
output_format=THUMBNAIL_FORMAT,
quality=THUMBNAIL_QUALITY,
optimize=True,
progressive=True,
)
logger.info('Saving a thumbnail to %s', output_path)
# Overwrite files instead of allowing storage generate a deduplicating suffix
if default_storage.exists(output_path):
logger.warning('%s exists, overwriting', output_path)
default_storage.delete(output_path)
default_storage.save(output_path, f)
thumbnails[size_key] = {'size': size, 'path': output_path}
image.close()
end_t = datetime.datetime.now()
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
logger.info('%(source)s done in %(time)sms', args)
return thumbnails
def extract_frame(source_path: str, output_path: str, at_time: str = '00:00:00.01'):
"""Extract a single frame of a video at a given path, write it to the given output path."""
try:
start_t = datetime.datetime.now()
abs_path = os.path.join(settings.MEDIA_ROOT, output_path)
ffmpeg = (
FFmpeg()
.option('y')
.input(source_path)
.output(abs_path, {'ss': at_time, 'frames:v': 1, 'update': 'true'})
)
output_dir = os.path.dirname(abs_path)
if not os.path.isdir(output_dir):
os.mkdir(output_dir)
ffmpeg.execute()
end_t = datetime.datetime.now()
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
logger.info('%(source)s done in %(time)sms', args)
except (FFmpegError, FFmpegFileNotFound, FFmpegInvalidCommand) as e:
logger.exception(f'Failed to extract a frame: {e.message}, {" ".join(ffmpeg.arguments)}')
raise