Anna Sirota
1ede405212
Calling `clamdscan` from inside the process of background tasks service unit failed with an obscure apparmor error: apparmor="DENIED" operation="getattr" info="Failed name lookup - disconnected path" error=-13 profile="/usr/sbin/clamd" name="var/www/../media/...zip" This has something to do with systemd sandboxing options used in the service unit (without them it does work): PrivateTmp=true ProtectHome=true ProtectSystem=full To avoid having to relax those, this sends the scan command and streams the file content directly to clamd socket instead of doing any forks/execs inside the sandboxed process.
175 lines
6.3 KiB
Python
175 lines
6.3 KiB
Python
from pathlib import Path
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import os.path
|
|
import toml
|
|
import typing
|
|
import zipfile
|
|
|
|
from lxml import etree
|
|
import clamd
|
|
import magic
|
|
|
|
logger = logging.getLogger(__name__)
|
|
MODULE_DIR = Path(__file__).resolve().parent
|
|
THEME_SCHEMA = []
|
|
|
|
|
|
def _get_theme_schema():
|
|
if not THEME_SCHEMA:
|
|
with open(MODULE_DIR / 'theme.xsd', 'rb') as f:
|
|
THEME_SCHEMA.append(etree.XMLSchema(etree.XML(f.read())))
|
|
return THEME_SCHEMA[0]
|
|
|
|
|
|
def get_sha256(file_obj):
|
|
"""Calculate a sha256 hash for `file_obj`.
|
|
|
|
`file_obj` must either be be an open file descriptor, in which case the
|
|
caller needs to take care of closing it properly, or a django File-like
|
|
object with a chunks() method to iterate over its contents.
|
|
"""
|
|
hash_ = hashlib.sha256()
|
|
if hasattr(file_obj, 'chunks') and callable(file_obj.chunks):
|
|
iterator = file_obj.chunks()
|
|
else:
|
|
iterator = iter(lambda: file_obj.read(io.DEFAULT_BUFFER_SIZE), b'')
|
|
for chunk in iterator:
|
|
hash_.update(chunk)
|
|
# This file might be read again by validation or other utilities
|
|
file_obj.seek(0)
|
|
return hash_.hexdigest()
|
|
|
|
|
|
def get_sha256_from_value(value: str):
|
|
"""Calculate a sha256 hash for a given string value."""
|
|
hash_ = hashlib.sha256()
|
|
hash_.update(str(value).encode())
|
|
return hash_.hexdigest()
|
|
|
|
|
|
def find_path_by_name(paths: typing.List[str], name: str) -> typing.Optional[str]:
|
|
"""Return the first occurrence of file name in a given list of paths."""
|
|
for file_path in paths:
|
|
# Remove leading/trailing whitespace from file path
|
|
file_path_stripped = file_path.strip()
|
|
# Check if the basename of the stripped path is equal to the target file name
|
|
if os.path.basename(file_path_stripped) == name:
|
|
return file_path_stripped
|
|
return None
|
|
|
|
|
|
def find_exact_path(paths: typing.List[str], exact_path: str) -> typing.Optional[str]:
|
|
"""Return a first path equal to a given one if it exists in a given list of paths."""
|
|
matching_paths = (path for path in paths if path == exact_path)
|
|
return next(matching_paths, None)
|
|
|
|
|
|
def filter_paths_by_ext(paths: typing.List[str], ext: str) -> typing.Iterable[str]:
|
|
"""Generate a list of paths having a given extension from a given list of paths."""
|
|
for file_path in paths:
|
|
# Get file path's extension
|
|
_, file_path_ext = os.path.splitext(file_path)
|
|
# Check if this file's extension matches the extension we are looking for
|
|
if file_path_ext.lower() == ext.lower():
|
|
yield file_path
|
|
|
|
|
|
def read_manifest_from_zip(archive_path):
|
|
"""Read and validate extension's manifest file and contents of the archive.
|
|
|
|
In any extension archive, a valid `blender_manifest.toml` file is expected
|
|
to be found at the top level of the archive, or inside a single nested directory.
|
|
Additionally, depending on the extension type defined in the manifest,
|
|
the archive is expected to have a particular file structure:
|
|
|
|
* for themes, a single XML file is expected next to the manifest;
|
|
|
|
* for add-ons, the following structure is expected:
|
|
|
|
```
|
|
some-addon.zip
|
|
└─ an-optional-dir
|
|
├─ blender_manifest.toml
|
|
├─ __init__.py
|
|
└─ (...)
|
|
```
|
|
"""
|
|
manifest_name = 'blender_manifest.toml'
|
|
error_codes = []
|
|
try:
|
|
with zipfile.ZipFile(archive_path) as myzip:
|
|
bad_file = myzip.testzip()
|
|
if bad_file is not None:
|
|
logger.error('Bad file in ZIP')
|
|
error_codes.append('invalid_zip_archive')
|
|
return None, error_codes
|
|
|
|
file_list = myzip.namelist()
|
|
manifest_filepath = find_path_by_name(file_list, manifest_name)
|
|
|
|
if manifest_filepath is None:
|
|
logger.info(f"File '{manifest_name}' not found in the archive.")
|
|
error_codes.append('missing_manifest_toml')
|
|
return None, error_codes
|
|
|
|
# Manifest file is expected to be no deeper than one directory down
|
|
if os.path.dirname(os.path.dirname(manifest_filepath)) != '':
|
|
error_codes.append('invalid_manifest_path')
|
|
return None, error_codes
|
|
|
|
# Extract the file content
|
|
with myzip.open(manifest_filepath) as file_content:
|
|
toml_content = toml.loads(file_content.read().decode())
|
|
|
|
# If manifest was parsed successfully, do additional type-specific validation
|
|
type_slug = toml_content['type']
|
|
if type_slug == 'theme':
|
|
theme_xmls = filter_paths_by_ext(file_list, '.xml')
|
|
if len(list(theme_xmls)) != 1:
|
|
error_codes.append('missing_or_multiple_theme_xml')
|
|
elif type_slug == 'add-on':
|
|
# __init__.py is expected to be next to the manifest
|
|
expected_init_path = os.path.join(os.path.dirname(manifest_filepath), '__init__.py')
|
|
init_filepath = find_exact_path(file_list, expected_init_path)
|
|
if not init_filepath:
|
|
error_codes.append('invalid_missing_init')
|
|
|
|
return toml_content, error_codes
|
|
|
|
except toml.decoder.TomlDecodeError as e:
|
|
logger.error(f"Manifest Error: {e.msg}")
|
|
error_codes.append('invalid_manifest_toml')
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting from archive: {e}")
|
|
error_codes.append('invalid_zip_archive')
|
|
|
|
return None, error_codes
|
|
|
|
|
|
def guess_mimetype_from_ext(file_name: str) -> str:
|
|
"""Guess MIME-type from the extension of the given file name."""
|
|
mimetype_from_ext, _ = mimetypes.guess_type(file_name)
|
|
return mimetype_from_ext
|
|
|
|
|
|
def guess_mimetype_from_content(file_obj) -> str:
|
|
"""Guess MIME-type based on a portion of the given file's bytes."""
|
|
mimetype_from_bytes = magic.from_buffer(file_obj.read(2048), mime=True)
|
|
# This file might be read again by validation or other utilities
|
|
file_obj.seek(0)
|
|
return mimetype_from_bytes
|
|
|
|
|
|
def run_clamdscan(abs_path: str) -> tuple:
|
|
logger.info('Scanning file at path=%s', abs_path)
|
|
clamd_socket = clamd.ClamdUnixSocket()
|
|
with open(abs_path, 'rb') as f:
|
|
result = clamd_socket.instream(f)['stream']
|
|
logger.info('File at path=%s scanned: %s', abs_path, result)
|
|
return result
|