extensions-website/files/utils.py
Anna Sirota 1ede405212 Files: use clamd socket directly when scanning
Calling `clamdscan` from inside the process of background tasks service
unit failed with an obscure apparmor error:

     apparmor="DENIED" operation="getattr" info="Failed name lookup - disconnected path"
     error=-13 profile="/usr/sbin/clamd" name="var/www/../media/...zip"

This has something to do with systemd sandboxing options used
in the service unit (without them it does work):

    PrivateTmp=true
    ProtectHome=true
    ProtectSystem=full

To avoid having to relax those, this sends the scan command and streams
the file content directly to clamd socket instead of doing any
forks/execs inside the sandboxed process.
2024-04-15 12:26:46 +02:00

175 lines
6.3 KiB
Python

from pathlib import Path
import hashlib
import io
import logging
import mimetypes
import os
import os.path
import toml
import typing
import zipfile
from lxml import etree
import clamd
import magic
logger = logging.getLogger(__name__)
MODULE_DIR = Path(__file__).resolve().parent
THEME_SCHEMA = []
def _get_theme_schema():
if not THEME_SCHEMA:
with open(MODULE_DIR / 'theme.xsd', 'rb') as f:
THEME_SCHEMA.append(etree.XMLSchema(etree.XML(f.read())))
return THEME_SCHEMA[0]
def get_sha256(file_obj):
"""Calculate a sha256 hash for `file_obj`.
`file_obj` must either be be an open file descriptor, in which case the
caller needs to take care of closing it properly, or a django File-like
object with a chunks() method to iterate over its contents.
"""
hash_ = hashlib.sha256()
if hasattr(file_obj, 'chunks') and callable(file_obj.chunks):
iterator = file_obj.chunks()
else:
iterator = iter(lambda: file_obj.read(io.DEFAULT_BUFFER_SIZE), b'')
for chunk in iterator:
hash_.update(chunk)
# This file might be read again by validation or other utilities
file_obj.seek(0)
return hash_.hexdigest()
def get_sha256_from_value(value: str):
"""Calculate a sha256 hash for a given string value."""
hash_ = hashlib.sha256()
hash_.update(str(value).encode())
return hash_.hexdigest()
def find_path_by_name(paths: typing.List[str], name: str) -> typing.Optional[str]:
"""Return the first occurrence of file name in a given list of paths."""
for file_path in paths:
# Remove leading/trailing whitespace from file path
file_path_stripped = file_path.strip()
# Check if the basename of the stripped path is equal to the target file name
if os.path.basename(file_path_stripped) == name:
return file_path_stripped
return None
def find_exact_path(paths: typing.List[str], exact_path: str) -> typing.Optional[str]:
"""Return a first path equal to a given one if it exists in a given list of paths."""
matching_paths = (path for path in paths if path == exact_path)
return next(matching_paths, None)
def filter_paths_by_ext(paths: typing.List[str], ext: str) -> typing.Iterable[str]:
"""Generate a list of paths having a given extension from a given list of paths."""
for file_path in paths:
# Get file path's extension
_, file_path_ext = os.path.splitext(file_path)
# Check if this file's extension matches the extension we are looking for
if file_path_ext.lower() == ext.lower():
yield file_path
def read_manifest_from_zip(archive_path):
"""Read and validate extension's manifest file and contents of the archive.
In any extension archive, a valid `blender_manifest.toml` file is expected
to be found at the top level of the archive, or inside a single nested directory.
Additionally, depending on the extension type defined in the manifest,
the archive is expected to have a particular file structure:
* for themes, a single XML file is expected next to the manifest;
* for add-ons, the following structure is expected:
```
some-addon.zip
└─ an-optional-dir
├─ blender_manifest.toml
├─ __init__.py
└─ (...)
```
"""
manifest_name = 'blender_manifest.toml'
error_codes = []
try:
with zipfile.ZipFile(archive_path) as myzip:
bad_file = myzip.testzip()
if bad_file is not None:
logger.error('Bad file in ZIP')
error_codes.append('invalid_zip_archive')
return None, error_codes
file_list = myzip.namelist()
manifest_filepath = find_path_by_name(file_list, manifest_name)
if manifest_filepath is None:
logger.info(f"File '{manifest_name}' not found in the archive.")
error_codes.append('missing_manifest_toml')
return None, error_codes
# Manifest file is expected to be no deeper than one directory down
if os.path.dirname(os.path.dirname(manifest_filepath)) != '':
error_codes.append('invalid_manifest_path')
return None, error_codes
# Extract the file content
with myzip.open(manifest_filepath) as file_content:
toml_content = toml.loads(file_content.read().decode())
# If manifest was parsed successfully, do additional type-specific validation
type_slug = toml_content['type']
if type_slug == 'theme':
theme_xmls = filter_paths_by_ext(file_list, '.xml')
if len(list(theme_xmls)) != 1:
error_codes.append('missing_or_multiple_theme_xml')
elif type_slug == 'add-on':
# __init__.py is expected to be next to the manifest
expected_init_path = os.path.join(os.path.dirname(manifest_filepath), '__init__.py')
init_filepath = find_exact_path(file_list, expected_init_path)
if not init_filepath:
error_codes.append('invalid_missing_init')
return toml_content, error_codes
except toml.decoder.TomlDecodeError as e:
logger.error(f"Manifest Error: {e.msg}")
error_codes.append('invalid_manifest_toml')
except Exception as e:
logger.error(f"Error extracting from archive: {e}")
error_codes.append('invalid_zip_archive')
return None, error_codes
def guess_mimetype_from_ext(file_name: str) -> str:
"""Guess MIME-type from the extension of the given file name."""
mimetype_from_ext, _ = mimetypes.guess_type(file_name)
return mimetype_from_ext
def guess_mimetype_from_content(file_obj) -> str:
"""Guess MIME-type based on a portion of the given file's bytes."""
mimetype_from_bytes = magic.from_buffer(file_obj.read(2048), mime=True)
# This file might be read again by validation or other utilities
file_obj.seek(0)
return mimetype_from_bytes
def run_clamdscan(abs_path: str) -> tuple:
logger.info('Scanning file at path=%s', abs_path)
clamd_socket = clamd.ClamdUnixSocket()
with open(abs_path, 'rb') as f:
result = clamd_socket.instream(f)['stream']
logger.info('File at path=%s scanned: %s', abs_path, result)
return result