Anna Sirota
ad66ec6476
When an image or a video is uploaded and its initial scan comes back with an OK, thumbnails are generated for it. In case the file is a video, a single frame is extracted from it with `ffmpeg` and used instead of `File.source`. Thumbnails have predefined set of dimensions: 1920 x 1080 and 640 x 360 (for cards). When original file is an image, the path of the largest thumbnail is saved in `File.thumbnail` column, when it's a video the extracted frame is stored in `File.thumbnails` instead. Additionally, all thumbnail paths are saved in `File.metadata`. When `File` is deleted all its thumbnails are deleted along with it. Implements #51 Part of #70 Reviewed-on: #87 Reviewed-by: Oleg-Komarov <oleg-komarov@noreply.localhost>
299 lines
11 KiB
Python
299 lines
11 KiB
Python
from pathlib import Path
|
|
import datetime
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import os.path
|
|
import tempfile
|
|
import toml
|
|
import typing
|
|
import zipfile
|
|
|
|
from PIL import Image
|
|
from django.conf import settings
|
|
from django.core.files.storage import default_storage
|
|
from ffmpeg import FFmpeg, FFmpegFileNotFound, FFmpegInvalidCommand, FFmpegError
|
|
from lxml import etree
|
|
import clamd
|
|
import magic
|
|
|
|
from constants.base import THUMBNAIL_FORMAT, THUMBNAIL_SIZES, THUMBNAIL_QUALITY
|
|
|
|
logger = logging.getLogger(__name__)
|
|
MODULE_DIR = Path(__file__).resolve().parent
|
|
THEME_SCHEMA = []
|
|
|
|
|
|
def _get_theme_schema():
|
|
if not THEME_SCHEMA:
|
|
with open(MODULE_DIR / 'theme.xsd', 'rb') as f:
|
|
THEME_SCHEMA.append(etree.XMLSchema(etree.XML(f.read())))
|
|
return THEME_SCHEMA[0]
|
|
|
|
|
|
def get_sha256(file_obj):
|
|
"""Calculate a sha256 hash for `file_obj`.
|
|
|
|
`file_obj` must either be be an open file descriptor, in which case the
|
|
caller needs to take care of closing it properly, or a django File-like
|
|
object with a chunks() method to iterate over its contents.
|
|
"""
|
|
hash_ = hashlib.sha256()
|
|
if hasattr(file_obj, 'chunks') and callable(file_obj.chunks):
|
|
iterator = file_obj.chunks()
|
|
else:
|
|
iterator = iter(lambda: file_obj.read(io.DEFAULT_BUFFER_SIZE), b'')
|
|
for chunk in iterator:
|
|
hash_.update(chunk)
|
|
# This file might be read again by validation or other utilities
|
|
file_obj.seek(0)
|
|
return hash_.hexdigest()
|
|
|
|
|
|
def get_sha256_from_value(value: str):
|
|
"""Calculate a sha256 hash for a given string value."""
|
|
hash_ = hashlib.sha256()
|
|
hash_.update(str(value).encode())
|
|
return hash_.hexdigest()
|
|
|
|
|
|
def find_path_by_name(paths: typing.List[str], name: str) -> typing.Optional[str]:
|
|
"""Return the first occurrence of file name in a given list of paths."""
|
|
for file_path in paths:
|
|
# Remove leading/trailing whitespace from file path
|
|
file_path_stripped = file_path.strip()
|
|
# Check if the basename of the stripped path is equal to the target file name
|
|
if os.path.basename(file_path_stripped) == name:
|
|
return file_path_stripped
|
|
return None
|
|
|
|
|
|
def find_exact_path(paths: typing.List[str], exact_path: str) -> typing.Optional[str]:
|
|
"""Return a first path equal to a given one if it exists in a given list of paths."""
|
|
matching_paths = (path for path in paths if path == exact_path)
|
|
return next(matching_paths, None)
|
|
|
|
|
|
def filter_paths_by_ext(paths: typing.List[str], ext: str) -> typing.Iterable[str]:
|
|
"""Generate a list of paths having a given extension from a given list of paths."""
|
|
for file_path in paths:
|
|
# Get file path's extension
|
|
_, file_path_ext = os.path.splitext(file_path)
|
|
# Check if this file's extension matches the extension we are looking for
|
|
if file_path_ext.lower() == ext.lower():
|
|
yield file_path
|
|
|
|
|
|
def read_manifest_from_zip(archive_path):
|
|
"""Read and validate extension's manifest file and contents of the archive.
|
|
|
|
In any extension archive, a valid `blender_manifest.toml` file is expected
|
|
to be found at the top level of the archive, or inside a single nested directory.
|
|
Additionally, depending on the extension type defined in the manifest,
|
|
the archive is expected to have a particular file structure:
|
|
|
|
* for themes, a single XML file is expected next to the manifest;
|
|
|
|
* for add-ons, the following structure is expected:
|
|
|
|
```
|
|
some-addon.zip
|
|
└─ an-optional-dir
|
|
├─ blender_manifest.toml
|
|
├─ __init__.py
|
|
└─ (...)
|
|
```
|
|
"""
|
|
manifest_name = 'blender_manifest.toml'
|
|
error_codes = []
|
|
try:
|
|
with zipfile.ZipFile(archive_path) as myzip:
|
|
bad_file = myzip.testzip()
|
|
if bad_file is not None:
|
|
logger.error('Bad file in ZIP')
|
|
error_codes.append('invalid_zip_archive')
|
|
return None, error_codes
|
|
|
|
file_list = myzip.namelist()
|
|
manifest_filepath = find_path_by_name(file_list, manifest_name)
|
|
|
|
if manifest_filepath is None:
|
|
logger.info(f"File '{manifest_name}' not found in the archive.")
|
|
error_codes.append('missing_manifest_toml')
|
|
return None, error_codes
|
|
|
|
# Manifest file is expected to be no deeper than one directory down
|
|
if os.path.dirname(os.path.dirname(manifest_filepath)) != '':
|
|
error_codes.append('invalid_manifest_path')
|
|
return None, error_codes
|
|
|
|
# Extract the file content
|
|
with myzip.open(manifest_filepath) as file_content:
|
|
toml_content = toml.loads(file_content.read().decode())
|
|
|
|
# If manifest was parsed successfully, do additional type-specific validation
|
|
type_slug = toml_content['type']
|
|
if type_slug == 'theme':
|
|
theme_xmls = filter_paths_by_ext(file_list, '.xml')
|
|
if len(list(theme_xmls)) != 1:
|
|
error_codes.append('missing_or_multiple_theme_xml')
|
|
elif type_slug == 'add-on':
|
|
# __init__.py is expected to be next to the manifest
|
|
expected_init_path = os.path.join(os.path.dirname(manifest_filepath), '__init__.py')
|
|
init_filepath = find_exact_path(file_list, expected_init_path)
|
|
if not init_filepath:
|
|
error_codes.append('invalid_missing_init')
|
|
|
|
return toml_content, error_codes
|
|
|
|
except toml.decoder.TomlDecodeError as e:
|
|
logger.error(f"Manifest Error: {e.msg}")
|
|
error_codes.append('invalid_manifest_toml')
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting from archive: {e}")
|
|
error_codes.append('invalid_zip_archive')
|
|
|
|
return None, error_codes
|
|
|
|
|
|
def guess_mimetype_from_ext(file_name: str) -> str:
|
|
"""Guess MIME-type from the extension of the given file name."""
|
|
mimetype_from_ext, _ = mimetypes.guess_type(file_name)
|
|
return mimetype_from_ext
|
|
|
|
|
|
def guess_mimetype_from_content(file_obj) -> str:
|
|
"""Guess MIME-type based on a portion of the given file's bytes."""
|
|
mimetype_from_bytes = magic.from_buffer(file_obj.read(2048), mime=True)
|
|
# This file might be read again by validation or other utilities
|
|
file_obj.seek(0)
|
|
return mimetype_from_bytes
|
|
|
|
|
|
def run_clamdscan(abs_path: str) -> tuple:
|
|
logger.info('Scanning file at path=%s', abs_path)
|
|
clamd_socket = clamd.ClamdUnixSocket()
|
|
with open(abs_path, 'rb') as f:
|
|
result = clamd_socket.instream(f)['stream']
|
|
logger.info('File at path=%s scanned: %s', abs_path, result)
|
|
return result
|
|
|
|
|
|
def delete_file_in_storage(file_name: str) -> None:
|
|
"""Delete file from disk or whatever other default storage."""
|
|
if not file_name:
|
|
return
|
|
|
|
if not default_storage.exists(file_name):
|
|
logger.warning("%s doesn't exist in storage, nothing to delete", file_name)
|
|
else:
|
|
logger.info('Deleting %s from storage', file_name)
|
|
default_storage.delete(file_name)
|
|
|
|
|
|
def delete_thumbnails(file_metadata: dict) -> None:
|
|
"""Read thumbnail paths from given metadata and delete them from storage."""
|
|
thumbnails = file_metadata.get('thumbnails', {})
|
|
for _, thumb in thumbnails.items():
|
|
path = thumb.get('path', '')
|
|
if not path:
|
|
continue
|
|
delete_file_in_storage(path)
|
|
|
|
|
|
def get_thumbnail_upload_to(file_hash: str, width: int = None, height: int = None) -> str:
|
|
"""Return a full media path of a thumbnail.
|
|
|
|
Optionally, append thumbnail dimensions to the file name.
|
|
"""
|
|
prefix = 'thumbnails/'
|
|
_hash = file_hash.split(':')[-1]
|
|
thumbnail_ext = THUMBNAIL_FORMAT.lower()
|
|
if thumbnail_ext == 'jpeg':
|
|
thumbnail_ext = 'jpg'
|
|
suffix = f'.{thumbnail_ext}'
|
|
size_suffix = f'_{width}x{height}' if width and height else ''
|
|
path = Path(prefix, _hash[:2], f'{_hash}{size_suffix}').with_suffix(suffix)
|
|
return str(path)
|
|
|
|
|
|
def resize_image(image: Image, size: tuple, output, output_format: str = 'PNG', **output_params):
|
|
"""Resize a models.ImageField to a given size and write it into output file."""
|
|
start_t = datetime.datetime.now()
|
|
|
|
source_image = image.convert('RGBA' if output_format == 'PNG' else 'RGB')
|
|
source_image.thumbnail(size, Image.LANCZOS)
|
|
source_image.save(output, output_format, **output_params)
|
|
|
|
end_t = datetime.datetime.now()
|
|
args = {'source': image, 'size': size, 'time': (end_t - start_t).microseconds / 1000}
|
|
logger.info('%(source)s to %(size)s done in %(time)sms', args)
|
|
|
|
|
|
def make_thumbnails(
|
|
source_path: str, file_hash: str, output_format: str = THUMBNAIL_FORMAT
|
|
) -> dict:
|
|
"""Generate thumbnail files for given file and a predefined list of dimensions.
|
|
|
|
Resulting thumbnail paths a derived from the given file hash and thumbnail sizes.
|
|
Return a dict of size keys to output paths of generated thumbnail images.
|
|
"""
|
|
start_t = datetime.datetime.now()
|
|
thumbnails = {}
|
|
abs_path = os.path.join(settings.MEDIA_ROOT, source_path)
|
|
image = Image.open(abs_path)
|
|
for size_key, size in THUMBNAIL_SIZES.items():
|
|
w, h = size
|
|
output_path = get_thumbnail_upload_to(file_hash, width=w, height=h)
|
|
with tempfile.TemporaryFile() as f:
|
|
logger.info('Resizing %s to %s (%s)', abs_path, size, output_format)
|
|
resize_image(
|
|
image,
|
|
size,
|
|
f,
|
|
output_format=THUMBNAIL_FORMAT,
|
|
quality=THUMBNAIL_QUALITY,
|
|
optimize=True,
|
|
progressive=True,
|
|
)
|
|
logger.info('Saving a thumbnail to %s', output_path)
|
|
# Overwrite files instead of allowing storage generate a deduplicating suffix
|
|
if default_storage.exists(output_path):
|
|
logger.warning('%s exists, overwriting', output_path)
|
|
default_storage.delete(output_path)
|
|
default_storage.save(output_path, f)
|
|
thumbnails[size_key] = {'size': size, 'path': output_path}
|
|
image.close()
|
|
|
|
end_t = datetime.datetime.now()
|
|
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
|
|
logger.info('%(source)s done in %(time)sms', args)
|
|
return thumbnails
|
|
|
|
|
|
def extract_frame(source_path: str, output_path: str, at_time: str = '00:00:00.01'):
|
|
"""Extract a single frame of a video at a given path, write it to the given output path."""
|
|
try:
|
|
start_t = datetime.datetime.now()
|
|
abs_path = os.path.join(settings.MEDIA_ROOT, output_path)
|
|
ffmpeg = (
|
|
FFmpeg()
|
|
.option('y')
|
|
.input(source_path)
|
|
.output(abs_path, {'ss': at_time, 'frames:v': 1, 'update': 'true'})
|
|
)
|
|
output_dir = os.path.dirname(abs_path)
|
|
if not os.path.isdir(output_dir):
|
|
os.mkdir(output_dir)
|
|
ffmpeg.execute()
|
|
|
|
end_t = datetime.datetime.now()
|
|
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
|
|
logger.info('%(source)s done in %(time)sms', args)
|
|
except (FFmpegError, FFmpegFileNotFound, FFmpegInvalidCommand) as e:
|
|
logger.exception(f'Failed to extract a frame: {e.message}, {" ".join(ffmpeg.arguments)}')
|
|
raise
|