418 lines
15 KiB
Python
418 lines
15 KiB
Python
from pathlib import Path
|
|
import datetime
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import os.path
|
|
import tempfile
|
|
import toml
|
|
import typing
|
|
import zipfile
|
|
|
|
from packaging.utils import InvalidWheelFilename, parse_wheel_filename
|
|
from PIL import Image
|
|
from django.conf import settings
|
|
from django.core.files.storage import default_storage
|
|
from ffmpeg import FFmpeg, FFmpegFileNotFound, FFmpegInvalidCommand, FFmpegError
|
|
from lxml import etree
|
|
import clamd
|
|
import magic
|
|
import requests
|
|
|
|
from constants.base import THUMBNAIL_FORMAT, THUMBNAIL_SIZES, THUMBNAIL_QUALITY
|
|
|
|
logger = logging.getLogger(__name__)
|
|
FORBIDDEN_FILEPATHS = [
|
|
'.git/',
|
|
'.svn/',
|
|
'__MACOSX/',
|
|
'Thumbs.db',
|
|
'ehthumbs.db',
|
|
]
|
|
MANIFEST_NAME = 'blender_manifest.toml'
|
|
MODULE_DIR = Path(__file__).resolve().parent
|
|
THEME_SCHEMA = []
|
|
|
|
|
|
def _get_theme_schema():
|
|
if not THEME_SCHEMA:
|
|
with open(MODULE_DIR / 'theme.xsd', 'rb') as f:
|
|
THEME_SCHEMA.append(etree.XMLSchema(etree.XML(f.read())))
|
|
return THEME_SCHEMA[0]
|
|
|
|
|
|
def get_sha256(file_obj):
|
|
"""Calculate a sha256 hash for `file_obj`.
|
|
|
|
`file_obj` must either be be an open file descriptor, in which case the
|
|
caller needs to take care of closing it properly, or a django File-like
|
|
object with a chunks() method to iterate over its contents.
|
|
"""
|
|
hash_ = hashlib.sha256()
|
|
if hasattr(file_obj, 'chunks') and callable(file_obj.chunks):
|
|
iterator = file_obj.chunks()
|
|
else:
|
|
iterator = iter(lambda: file_obj.read(io.DEFAULT_BUFFER_SIZE), b'')
|
|
for chunk in iterator:
|
|
hash_.update(chunk)
|
|
# This file might be read again by validation or other utilities
|
|
file_obj.seek(0)
|
|
return hash_.hexdigest()
|
|
|
|
|
|
def get_sha256_from_value(value: str):
|
|
"""Calculate a sha256 hash for a given string value."""
|
|
hash_ = hashlib.sha256()
|
|
hash_.update(str(value).encode())
|
|
return hash_.hexdigest()
|
|
|
|
|
|
def find_path_by_name(paths: typing.List[str], name: str) -> typing.Optional[str]:
|
|
"""Return the first occurrence of file name in a given list of paths."""
|
|
for file_path in paths:
|
|
# Remove leading/trailing whitespace from file path
|
|
file_path_stripped = file_path.strip()
|
|
# Check if the basename of the stripped path is equal to the target file name
|
|
if os.path.basename(file_path_stripped) == name:
|
|
return file_path_stripped
|
|
return None
|
|
|
|
|
|
def find_exact_path(paths: typing.List[str], exact_path: str) -> typing.Optional[str]:
|
|
"""Return a first path equal to a given one if it exists in a given list of paths."""
|
|
matching_paths = (path for path in paths if path == exact_path)
|
|
return next(matching_paths, None)
|
|
|
|
|
|
def filter_paths_by_ext(paths: typing.List[str], ext: str) -> typing.Iterable[str]:
|
|
"""Generate a list of paths having a given extension from a given list of paths."""
|
|
for file_path in paths:
|
|
# Get file path's extension
|
|
_, file_path_ext = os.path.splitext(file_path)
|
|
# Check if this file's extension matches the extension we are looking for
|
|
if file_path_ext.lower() == ext.lower():
|
|
yield file_path
|
|
|
|
|
|
def read_manifest_from_zip(archive_path):
|
|
"""Read and validate extension's manifest file and contents of the archive.
|
|
|
|
In any extension archive, a valid `blender_manifest.toml` file is expected
|
|
to be found at the top level of the archive, or inside a single nested directory.
|
|
Additionally, depending on the extension type defined in the manifest,
|
|
the archive is expected to have a particular file structure:
|
|
|
|
* for themes, a single XML file is expected next to the manifest;
|
|
|
|
* for add-ons, the following structure is expected:
|
|
|
|
```
|
|
some-addon.zip
|
|
└─ an-optional-dir
|
|
├─ blender_manifest.toml
|
|
├─ __init__.py
|
|
└─ (...)
|
|
```
|
|
"""
|
|
error_codes = []
|
|
file_list = []
|
|
manifest_content = None
|
|
|
|
try:
|
|
with zipfile.ZipFile(archive_path) as myzip:
|
|
bad_file = myzip.testzip()
|
|
if bad_file is not None:
|
|
logger.error('Bad file in ZIP')
|
|
error_codes.append('invalid_zip_archive')
|
|
return None, error_codes
|
|
|
|
file_list = myzip.namelist()
|
|
manifest_filepath = find_path_by_name(file_list, MANIFEST_NAME)
|
|
|
|
if manifest_filepath is None:
|
|
logger.info(f"File '{MANIFEST_NAME}' not found in the archive.")
|
|
error_codes.append('missing_manifest_toml')
|
|
return None, error_codes
|
|
|
|
# Manifest file is expected to be no deeper than one directory down
|
|
if os.path.dirname(os.path.dirname(manifest_filepath)) != '':
|
|
error_codes.append('invalid_manifest_path')
|
|
return None, error_codes
|
|
|
|
with myzip.open(manifest_filepath) as file_content:
|
|
manifest_content = file_content.read().decode()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting from archive: {e}")
|
|
error_codes.append('invalid_zip_archive')
|
|
return None, error_codes
|
|
|
|
try:
|
|
toml_content = toml.loads(manifest_content)
|
|
except toml.decoder.TomlDecodeError as e:
|
|
error_codes.append(
|
|
{
|
|
'code': 'invalid_manifest_toml',
|
|
'params': {'msg': e.msg, 'lineno': e.lineno},
|
|
}
|
|
)
|
|
return None, error_codes
|
|
|
|
file_list_error_codes = validate_file_list(toml_content, manifest_filepath, file_list)
|
|
error_codes.extend(file_list_error_codes)
|
|
return toml_content, error_codes
|
|
|
|
|
|
def find_forbidden_filepaths(file_list):
|
|
result = set()
|
|
for forbidden_path in FORBIDDEN_FILEPATHS:
|
|
for filepath in file_list:
|
|
if filepath.startswith(forbidden_path) or ('/' + forbidden_path) in filepath:
|
|
result.add(forbidden_path)
|
|
break
|
|
return result
|
|
|
|
|
|
def get_wheels_from_manifest(manifest):
|
|
wheels = None
|
|
if (
|
|
'build' in manifest
|
|
and 'generated' in manifest['build']
|
|
and 'wheels' in manifest['build']['generated']
|
|
):
|
|
wheels = manifest['build']['generated']['wheels']
|
|
else:
|
|
wheels = manifest.get('wheels')
|
|
return wheels
|
|
|
|
|
|
def validate_file_list(toml_content, manifest_filepath, file_list):
|
|
"""Check the files in in the archive against manifest."""
|
|
error_codes = []
|
|
|
|
found_forbidden_filepaths = find_forbidden_filepaths(file_list)
|
|
if found_forbidden_filepaths:
|
|
error_codes.append(
|
|
{
|
|
'code': 'forbidden_filepaths',
|
|
'params': {'paths': ', '.join(found_forbidden_filepaths)},
|
|
}
|
|
)
|
|
type_slug = toml_content['type']
|
|
if type_slug == 'theme':
|
|
theme_xmls = filter_paths_by_ext(file_list, '.xml')
|
|
# Special treatment for Mac, so the same problem (__MACOSX folders)
|
|
# doesn't lead to two errors showing.
|
|
if len(list(theme_xmls)) != 1 and '__MACOSX/' not in found_forbidden_filepaths:
|
|
error_codes.append('missing_or_multiple_theme_xml')
|
|
elif type_slug == 'add-on':
|
|
# __init__.py is expected to be next to the manifest
|
|
expected_init_path = _canonical_path('__init__.py', manifest_filepath)
|
|
init_filepath = find_exact_path(file_list, expected_init_path)
|
|
if not init_filepath:
|
|
error_codes.append('invalid_missing_init')
|
|
if wheels := get_wheels_from_manifest(toml_content):
|
|
for wheel in wheels:
|
|
expected_wheel_path = _canonical_path(wheel, manifest_filepath)
|
|
wheel_filepath = find_exact_path(file_list, expected_wheel_path)
|
|
if not wheel_filepath:
|
|
error_codes.append(
|
|
{'code': 'missing_wheel', 'params': {'path': expected_wheel_path}}
|
|
)
|
|
return error_codes
|
|
|
|
|
|
def _canonical_path(path, manifest_filepath):
|
|
"""Transform path before checking against the zip file list.
|
|
|
|
We expect to support other manifest fields (e.g. in the [build] section) that will potentially
|
|
point to directories, including the "current" directory, which has to be denoted as "./".
|
|
To avoid inconsistencies in file path notations supported for different fields, we process all
|
|
paths values in manifest in a uniform way, allowing the leading "./" in all file paths.
|
|
|
|
All paths mentioned in manifest are treated as relative for the directory that contains
|
|
manifest_filepath.
|
|
"""
|
|
if path.startswith('./'):
|
|
path = path[2:]
|
|
return os.path.join(os.path.dirname(manifest_filepath), path)
|
|
|
|
|
|
def guess_mimetype_from_ext(file_name: str) -> str:
|
|
"""Guess MIME-type from the extension of the given file name."""
|
|
mimetype_from_ext, _ = mimetypes.guess_type(file_name)
|
|
return mimetype_from_ext
|
|
|
|
|
|
def guess_mimetype_from_content(file_obj) -> str:
|
|
"""Guess MIME-type based on a portion of the given file's bytes."""
|
|
mimetype_from_bytes = magic.from_buffer(file_obj.read(2048), mime=True)
|
|
# This file might be read again by validation or other utilities
|
|
file_obj.seek(0)
|
|
return mimetype_from_bytes
|
|
|
|
|
|
def run_clamdscan(abs_path: str) -> tuple:
|
|
logger.info('Scanning file at path=%s', abs_path)
|
|
clamd_socket = clamd.ClamdUnixSocket()
|
|
with open(abs_path, 'rb') as f:
|
|
result = clamd_socket.instream(f)['stream']
|
|
logger.info('File at path=%s scanned: %s', abs_path, result)
|
|
return result
|
|
|
|
|
|
def delete_file_in_storage(file_name: str) -> None:
|
|
"""Delete file from disk or whatever other default storage."""
|
|
if not file_name:
|
|
return
|
|
|
|
if not default_storage.exists(file_name):
|
|
logger.warning("%s doesn't exist in storage, nothing to delete", file_name)
|
|
else:
|
|
logger.info('Deleting %s from storage', file_name)
|
|
default_storage.delete(file_name)
|
|
|
|
|
|
def delete_thumbnails(file_metadata: dict) -> None:
|
|
"""Read thumbnail paths from given metadata and delete them from storage."""
|
|
thumbnails = file_metadata.get('thumbnails', {})
|
|
for _, thumb in thumbnails.items():
|
|
path = thumb.get('path', '')
|
|
if not path:
|
|
continue
|
|
delete_file_in_storage(path)
|
|
|
|
|
|
def get_thumbnail_upload_to(file_hash: str, width: int = None, height: int = None) -> str:
|
|
"""Return a full media path of a thumbnail.
|
|
|
|
Optionally, append thumbnail dimensions to the file name.
|
|
"""
|
|
prefix = 'thumbnails/'
|
|
_hash = file_hash.split(':')[-1]
|
|
thumbnail_ext = THUMBNAIL_FORMAT.lower()
|
|
if thumbnail_ext == 'jpeg':
|
|
thumbnail_ext = 'jpg'
|
|
suffix = f'.{thumbnail_ext}'
|
|
size_suffix = f'_{width}x{height}' if width and height else ''
|
|
path = Path(prefix, _hash[:2], f'{_hash}{size_suffix}').with_suffix(suffix)
|
|
return str(path)
|
|
|
|
|
|
def resize_image(image: Image, size: tuple, output, output_format: str = 'PNG', **output_params):
|
|
"""Resize a models.ImageField to a given size and write it into output file."""
|
|
start_t = datetime.datetime.now()
|
|
|
|
source_image = image.convert('RGBA' if output_format == 'PNG' else 'RGB')
|
|
source_image.thumbnail(size, Image.LANCZOS)
|
|
source_image.save(output, output_format, **output_params)
|
|
|
|
end_t = datetime.datetime.now()
|
|
args = {'source': image, 'size': size, 'time': (end_t - start_t).microseconds / 1000}
|
|
logger.info('%(source)s to %(size)s done in %(time)sms', args)
|
|
|
|
|
|
def make_thumbnails(
|
|
source_path: str, file_hash: str, output_format: str = THUMBNAIL_FORMAT
|
|
) -> dict:
|
|
"""Generate thumbnail files for given file and a predefined list of dimensions.
|
|
|
|
Resulting thumbnail paths a derived from the given file hash and thumbnail sizes.
|
|
Return a dict of size keys to output paths of generated thumbnail images.
|
|
"""
|
|
start_t = datetime.datetime.now()
|
|
thumbnails = {}
|
|
abs_path = os.path.join(settings.MEDIA_ROOT, source_path)
|
|
image = Image.open(abs_path)
|
|
for size_key, size in THUMBNAIL_SIZES.items():
|
|
w, h = size
|
|
output_path = get_thumbnail_upload_to(file_hash, width=w, height=h)
|
|
with tempfile.TemporaryFile() as f:
|
|
logger.info('Resizing %s to %s (%s)', abs_path, size, output_format)
|
|
resize_image(
|
|
image,
|
|
size,
|
|
f,
|
|
output_format=THUMBNAIL_FORMAT,
|
|
quality=THUMBNAIL_QUALITY,
|
|
optimize=True,
|
|
progressive=True,
|
|
)
|
|
output_path = default_storage.save(output_path, f)
|
|
logger.info('Saved a thumbnail to %s', output_path)
|
|
thumbnails[size_key] = {'size': size, 'path': output_path}
|
|
image.close()
|
|
|
|
end_t = datetime.datetime.now()
|
|
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
|
|
logger.info('%(source)s done in %(time)sms', args)
|
|
return thumbnails
|
|
|
|
|
|
def extract_frame(source_path: str, output_path: str, at_time: str = '00:00:00.01'):
|
|
"""Extract a single frame of a video at a given path, write it to the given output path."""
|
|
try:
|
|
start_t = datetime.datetime.now()
|
|
abs_path = os.path.join(settings.MEDIA_ROOT, output_path)
|
|
ffmpeg = (
|
|
FFmpeg()
|
|
.option('y')
|
|
.input(source_path)
|
|
.output(abs_path, {'ss': at_time, 'frames:v': 1, 'update': 'true'})
|
|
)
|
|
output_dir = os.path.dirname(abs_path)
|
|
if not os.path.isdir(output_dir):
|
|
os.makedirs(output_dir)
|
|
ffmpeg.execute()
|
|
|
|
end_t = datetime.datetime.now()
|
|
args = {'source': source_path, 'time': (end_t - start_t).microseconds / 1000}
|
|
logger.info('%(source)s done in %(time)sms', args)
|
|
except (FFmpegError, FFmpegFileNotFound, FFmpegInvalidCommand) as e:
|
|
logger.exception(f'Failed to extract a frame: {e.message}, {" ".join(ffmpeg.arguments)}')
|
|
raise
|
|
|
|
|
|
def get_wheel_sha256_from_pypi(wheel_name, session):
|
|
try:
|
|
name, version, *_ = parse_wheel_filename(wheel_name)
|
|
except InvalidWheelFilename:
|
|
return (None, 'invalid wheel filename')
|
|
url = f'https://pypi.org/pypi/{name}/{version}/json'
|
|
r = session.get(
|
|
url,
|
|
headers={'User-Agent': 'extensions.blender.org <extensions@blender.org>'},
|
|
timeout=10,
|
|
)
|
|
if r.status_code == 404:
|
|
return (None, f'wheel not found: {url}')
|
|
if r.status_code >= 500:
|
|
raise Exception(f'{url} returned {r.status_code} error')
|
|
data = r.json()
|
|
for item in data.get('urls', []):
|
|
if item['filename'] == wheel_name and item['packagetype'] == 'bdist_wheel':
|
|
return (item['digests']['sha256'], None)
|
|
return (None, 'no matching $.urls item in json response')
|
|
|
|
|
|
def validate_wheels(archive_path, wheels):
|
|
results = {}
|
|
with zipfile.ZipFile(archive_path) as myzip:
|
|
manifest_filepath = find_path_by_name(myzip.namelist(), MANIFEST_NAME)
|
|
session = requests.Session()
|
|
for wheel in wheels:
|
|
wheel_path_in_archive = _canonical_path(wheel, manifest_filepath)
|
|
wheel_digest = None
|
|
with myzip.open(wheel_path_in_archive) as wheel_file:
|
|
wheel_digest = get_sha256(wheel_file)
|
|
wheel_name = os.path.basename(wheel)
|
|
pypi_digest, err = get_wheel_sha256_from_pypi(wheel_name, session)
|
|
if err:
|
|
results[wheel] = err
|
|
continue
|
|
if pypi_digest != wheel_digest:
|
|
results[wheel] = f'sha256 in archive={wheel_digest}, sha256 on pypi={pypi_digest}'
|
|
return results
|