extensions-website/files/utils.py

418 lines
15 KiB
Python

from pathlib import Path
import datetime
import hashlib
import io
import logging
import mimetypes
import os
import os.path
import tempfile
import toml
import typing
import zipfile
from packaging.utils import InvalidWheelFilename, parse_wheel_filename
from PIL import Image
from django.conf import settings
from django.core.files.storage import default_storage
from ffmpeg import FFmpeg, FFmpegFileNotFound, FFmpegInvalidCommand, FFmpegError
from lxml import etree
import clamd
import magic
import requests
from constants.base import THUMBNAIL_FORMAT, THUMBNAIL_SIZES, THUMBNAIL_QUALITY
logger = logging.getLogger(__name__)
FORBIDDEN_FILEPATHS = [
'.git/',
'.svn/',
'__MACOSX/',
'Thumbs.db',
'ehthumbs.db',
]
MANIFEST_NAME = 'blender_manifest.toml'
MODULE_DIR = Path(__file__).resolve().parent
THEME_SCHEMA = []
def _get_theme_schema():
if not THEME_SCHEMA:
with open(MODULE_DIR / 'theme.xsd', 'rb') as f:
THEME_SCHEMA.append(etree.XMLSchema(etree.XML(f.read())))
return THEME_SCHEMA[0]
def get_sha256(file_obj):
"""Calculate a sha256 hash for `file_obj`.
`file_obj` must either be be an open file descriptor, in which case the
caller needs to take care of closing it properly, or a django File-like
object with a chunks() method to iterate over its contents.
"""
hash_ = hashlib.sha256()
if hasattr(file_obj, 'chunks') and callable(file_obj.chunks):
iterator = file_obj.chunks()
else:
iterator = iter(lambda: file_obj.read(io.DEFAULT_BUFFER_SIZE), b'')
for chunk in iterator:
hash_.update(chunk)
# This file might be read again by validation or other utilities
file_obj.seek(0)
return hash_.hexdigest()
def get_sha256_from_value(value: str):
"""Calculate a sha256 hash for a given string value."""
hash_ = hashlib.sha256()
hash_.update(str(value).encode())
return hash_.hexdigest()
def find_path_by_name(paths: typing.List[str], name: str) -> typing.Optional[str]:
"""Return the first occurrence of file name in a given list of paths."""
for file_path in paths:
# Remove leading/trailing whitespace from file path
file_path_stripped = file_path.strip()
# Check if the basename of the stripped path is equal to the target file name
if os.path.basename(file_path_stripped) == name:
return file_path_stripped
return None
def find_exact_path(paths: typing.List[str], exact_path: str) -> typing.Optional[str]:
"""Return a first path equal to a given one if it exists in a given list of paths."""
matching_paths = (path for path in paths if path == exact_path)
return next(matching_paths, None)
def filter_paths_by_ext(paths: typing.List[str], ext: str) -> typing.Iterable[str]:
"""Generate a list of paths having a given extension from a given list of paths."""
for file_path in paths:
# Get file path's extension
_, file_path_ext = os.path.splitext(file_path)
# Check if this file's extension matches the extension we are looking for
if file_path_ext.lower() == ext.lower():
yield file_path
def read_manifest_from_zip(archive_path):
"""Read and validate extension's manifest file and contents of the archive.
In any extension archive, a valid `blender_manifest.toml` file is expected
to be found at the top level of the archive, or inside a single nested directory.
Additionally, depending on the extension type defined in the manifest,
the archive is expected to have a particular file structure:
* for themes, a single XML file is expected next to the manifest;
* for add-ons, the following structure is expected:
```
some-addon.zip
└─ an-optional-dir
├─ blender_manifest.toml
├─ __init__.py
└─ (...)
```
"""
error_codes = []
file_list = []
manifest_content = None
try:
with zipfile.ZipFile(archive_path) as myzip:
bad_file = myzip.testzip()
if bad_file is not None:
logger.error('Bad file in ZIP')
error_codes.append('invalid_zip_archive')
return None, error_codes
file_list = myzip.namelist()
manifest_filepath = find_path_by_name(file_list, MANIFEST_NAME)
if manifest_filepath is None:
logger.info(f"File '{MANIFEST_NAME}' not found in the archive.")
error_codes.append('missing_manifest_toml')
return None, error_codes
# Manifest file is expected to be no deeper than one directory down
if os.path.dirname(os.path.dirname(manifest_filepath)) != '':
error_codes.append('invalid_manifest_path')
return None, error_codes
with myzip.open(manifest_filepath) as file_content:
manifest_content = file_content.read().decode()
except Exception as e:
logger.error(f"Error extracting from archive: {e}")
error_codes.append('invalid_zip_archive')
return None, error_codes
try:
toml_content = toml.loads(manifest_content)
except toml.decoder.TomlDecodeError as e:
error_codes.append(
{
'code': 'invalid_manifest_toml',
'params': {'msg': e.msg, 'lineno': e.lineno},
}
)
return None, error_codes
file_list_error_codes = validate_file_list(toml_content, manifest_filepath, file_list)
error_codes.extend(file_list_error_codes)
return toml_content, error_codes
def find_forbidden_filepaths(file_list):
result = set()
for forbidden_path in FORBIDDEN_FILEPATHS:
for filepath in file_list:
if filepath.startswith(forbidden_path) or ('/' + forbidden_path) in filepath:
result.add(forbidden_path)
break
return result
def get_wheels_from_manifest(manifest):
wheels = None
if (
'build' in manifest
and 'generated' in manifest['build']
and 'wheels' in manifest['build']['generated']
):
wheels = manifest['build']['generated']['wheels']
else:
wheels = manifest.get('wheels')
return wheels
def validate_file_list(toml_content, manifest_filepath, file_list):
"""Check the files in in the archive against manifest."""
error_codes = []
found_forbidden_filepaths = find_forbidden_filepaths(file_list)
if found_forbidden_filepaths:
error_codes.append(
{
'code': 'forbidden_filepaths',
'params': {'paths': ', '.join(found_forbidden_filepaths)},
}
)
type_slug = toml_content['type']
if type_slug == 'theme':
theme_xmls = filter_paths_by_ext(file_list, '.xml')
# Special treatment for Mac, so the same problem (__MACOSX folders)
# doesn't lead to two errors showing.
if len(list(theme_xmls)) != 1 and '__MACOSX/' not in found_forbidden_filepaths:
error_codes.append('missing_or_multiple_theme_xml')
elif type_slug == 'add-on':
# __init__.py is expected to be next to the manifest
expected_init_path = _canonical_path('__init__.py', manifest_filepath)
init_filepath = find_exact_path(file_list, expected_init_path)
if not init_filepath:
error_codes.append('invalid_missing_init')
if wheels := get_wheels_from_manifest(toml_content):
for wheel in wheels:
expected_wheel_path = _canonical_path(wheel, manifest_filepath)
wheel_filepath = find_exact_path(file_list, expected_wheel_path)
if not wheel_filepath:
error_codes.append(
{'code': 'missing_wheel', 'params': {'path': expected_wheel_path}}
)
return error_codes
def _canonical_path(path, manifest_filepath):
"""Transform path before checking against the zip file list.
We expect to support other manifest fields (e.g. in the [build] section) that will potentially
point to directories, including the "current" directory, which has to be denoted as "./".
To avoid inconsistencies in file path notations supported for different fields, we process all
paths values in manifest in a uniform way, allowing the leading "./" in all file paths.
All paths mentioned in manifest are treated as relative for the directory that contains
manifest_filepath.
"""
if path.startswith('./'):
path = path[2:]
return os.path.join(os.path.dirname(manifest_filepath), path)
def guess_mimetype_from_ext(file_name: str) -> str:
"""Guess MIME-type from the extension of the given file name."""
mimetype_from_ext, _ = mimetypes.guess_type(file_name)
return mimetype_from_ext
def guess_mimetype_from_content(file_obj) -> str:
"""Guess MIME-type based on a portion of the given file's bytes."""
mimetype_from_bytes = magic.from_buffer(file_obj.read(2048), mime=True)
# This file might be read again by validation or other utilities
file_obj.seek(0)
return mimetype_from_bytes
def run_clamdscan(abs_path: str) -> tuple:
logger.info('Scanning file at path=%s', abs_path)
clamd_socket = clamd.ClamdUnixSocket()
with open(abs_path, 'rb') as f:
result = clamd_socket.instream(f)['stream']
logger.info('File at path=%s scanned: %s', abs_path, result)
return result
def delete_file_in_storage(file_name: str) -> None:
"""Delete file from disk or whatever other default storage."""
if not file_name:
return
if not default_storage.exists(file_name):
logger.warning("%s doesn't exist in storage, nothing to delete", file_name)
else:
logger.info('Deleting %s from storage', file_name)
default_storage.delete(file_name)
def delete_thumbnails(file_metadata: dict) -> None:
"""Read thumbnail paths from given metadata and delete them from storage."""
thumbnails = file_metadata.get('thumbnails', {})
for _, thumb in thumbnails.items():
path = thumb.get('path', '')
if not path:
continue
delete_file_in_storage(path)
def get_thumbnail_upload_to(file_hash: str, width: int = None, height: int = None) -> str:
"""Return a full media path of a thumbnail.
Optionally, append thumbnail dimensions to the file name.
"""
prefix = 'thumbnails/'
_hash = file_hash.split(':')[-1]
thumbnail_ext = THUMBNAIL_FORMAT.lower()
if thumbnail_ext == 'jpeg':
thumbnail_ext = 'jpg'
suffix = f'.{thumbnail_ext}'
size_suffix = f'_{width}x{height}' if width and height else ''
path = Path(prefix, _hash[:2], f'{_hash}{size_suffix}').with_suffix(suffix)
return str(path)
def resize_image(image: Image, size: tuple, output, output_format: str = 'PNG', **output_params):
"""Resize a models.ImageField to a given size and write it into output file."""
start_t = datetime.datetime.now()
source_image = image.convert('RGBA' if output_format == 'PNG' else 'RGB')
source_image.thumbnail(size, Image.LANCZOS)
source_image.save(output, output_format, **output_params)
end_t = datetime.datetime.now()
args = {'source': image, 'size': size, 'time': (end_t - start_t).microseconds / 1000}
logger.info('%(source)s to %(size)s done in %(time)sms', args)
def make_thumbnails(
source_path: str, file_hash: str, output_format: str = THUMBNAIL_FORMAT
) -> dict:
"""Generate thumbnail files for given file and a predefined list of dimensions.
Resulting thumbnail paths a derived from the given file hash and thumbnail sizes.
Return a dict of size keys to output paths of generated thumbnail images.
"""
start_t = datetime.datetime.now()
thumbnails = {}
abs_path = os.path.join(settings.MEDIA_ROOT, source_path)
image = Image.open(abs_path)
for size_key, size in THUMBNAIL_SIZES.items():
w, h = size
output_path = get_thumbnail_upload_to(file_hash, width=w, height=h)
with tempfile.TemporaryFile() as f:
logger.info('Resizing %s to %s (%s)', abs_path, size, output_format)
resize_image(
image,
size,
f,
output_format=THUMBNAIL_FORMAT,
quality=THUMBNAIL_QUALITY,
optimize=True,
progressive=True,
)
output_path = default_storage.save(output_path, f)
logger.info('Saved a thumbnail to %s', output_path)
thumbnails[size_key] = {'size': size, 'path': output_path}
image.close()
end_t = datetime.datetime.now()
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
logger.info('%(source)s done in %(time)sms', args)
return thumbnails
def extract_frame(source_path: str, output_path: str, at_time: str = '00:00:00.01'):
"""Extract a single frame of a video at a given path, write it to the given output path."""
try:
start_t = datetime.datetime.now()
abs_path = os.path.join(settings.MEDIA_ROOT, output_path)
ffmpeg = (
FFmpeg()
.option('y')
.input(source_path)
.output(abs_path, {'ss': at_time, 'frames:v': 1, 'update': 'true'})
)
output_dir = os.path.dirname(abs_path)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
ffmpeg.execute()
end_t = datetime.datetime.now()
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
logger.info('%(source)s done in %(time)sms', args)
except (FFmpegError, FFmpegFileNotFound, FFmpegInvalidCommand) as e:
logger.exception(f'Failed to extract a frame: {e.message}, {" ".join(ffmpeg.arguments)}')
raise
def get_wheel_sha256_from_pypi(wheel_name, session):
try:
name, version, *_ = parse_wheel_filename(wheel_name)
except InvalidWheelFilename:
return (None, 'invalid wheel filename')
url = f'https://pypi.org/pypi/{name}/{version}/json'
r = session.get(
url,
headers={'User-Agent': 'extensions.blender.org <extensions@blender.org>'},
timeout=10,
)
if r.status_code == 404:
return (None, f'wheel not found: {url}')
if r.status_code >= 500:
raise Exception(f'{url} returned {r.status_code} error')
data = r.json()
for item in data.get('urls', []):
if item['filename'] == wheel_name and item['packagetype'] == 'bdist_wheel':
return (item['digests']['sha256'], None)
return (None, 'no matching $.urls item in json response')
def validate_wheels(archive_path, wheels):
results = {}
with zipfile.ZipFile(archive_path) as myzip:
manifest_filepath = find_path_by_name(myzip.namelist(), MANIFEST_NAME)
session = requests.Session()
for wheel in wheels:
wheel_path_in_archive = _canonical_path(wheel, manifest_filepath)
wheel_digest = None
with myzip.open(wheel_path_in_archive) as wheel_file:
wheel_digest = get_sha256(wheel_file)
wheel_name = os.path.basename(wheel)
pypi_digest, err = get_wheel_sha256_from_pypi(wheel_name, session)
if err:
results[wheel] = err
continue
if pypi_digest != wheel_digest:
results[wheel] = f'sha256 in archive={wheel_digest}, sha256 on pypi={pypi_digest}'
return results