221 lines
8.4 KiB
Python
221 lines
8.4 KiB
Python
import logging
|
|
import os
|
|
import zipfile
|
|
import tempfile
|
|
|
|
from django import forms
|
|
from django.utils.safestring import mark_safe
|
|
from django.utils.translation import gettext_lazy as _
|
|
import django.core.exceptions
|
|
|
|
from .validators import (
|
|
ExtensionIDManifestValidator,
|
|
FileMIMETypeValidator,
|
|
ManifestValidator,
|
|
)
|
|
from constants.base import EXTENSION_SLUG_TYPES, ALLOWED_EXTENSION_MIMETYPES
|
|
import files.models
|
|
import files.utils as utils
|
|
|
|
# Soft limit for the files, effectively we accept up to 1MB extra
|
|
MAX_UPLOAD_SIZE_MB = 10
|
|
BYTES_TO_MEGABYTE = 1048576 # constant
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FileForm(forms.ModelForm):
|
|
msg_only_zip_files = _('Only .zip files are accepted.')
|
|
|
|
# Mimicking how django.forms.fields.Field handles validation error messages.
|
|
# TODO: maybe this should be a custom SourceFileField with all these validators and messages
|
|
error_messages = {
|
|
'invalid_manifest_path': _(
|
|
'The manifest file should be at the top level of the archive, or one level deep.'
|
|
),
|
|
# TODO: surface TOML parsing errors?
|
|
'invalid_manifest_toml': _('Could not parse the manifest file.'),
|
|
'invalid_missing_init': _('An add-on should have an __init__.py file.'),
|
|
'missing_or_multiple_theme_xml': _('A theme should have exactly one XML file.'),
|
|
'invalid_zip_archive': msg_only_zip_files,
|
|
'missing_manifest_toml': _('The manifest file is missing.'),
|
|
}
|
|
|
|
class Meta:
|
|
model = files.models.File
|
|
fields = ('source', 'type', 'metadata', 'agreed_with_terms', 'user')
|
|
|
|
source = forms.FileField(
|
|
allow_empty_file=False,
|
|
required=True,
|
|
validators=[
|
|
FileMIMETypeValidator(
|
|
allowed_mimetypes=ALLOWED_EXTENSION_MIMETYPES,
|
|
message=error_messages['invalid_zip_archive'],
|
|
),
|
|
],
|
|
widget=forms.ClearableFileInput(attrs={'accept': ','.join(ALLOWED_EXTENSION_MIMETYPES)}),
|
|
help_text=msg_only_zip_files,
|
|
)
|
|
agreed_with_terms = forms.BooleanField(
|
|
initial=False,
|
|
required=True,
|
|
label=mark_safe(
|
|
'I have read and agreed with Blender Extensions'
|
|
' and <a href="/terms-of-service/" target="_blank">terms of service</a>'
|
|
),
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.request = kwargs.pop('request')
|
|
self.extension = kwargs.pop('extension', None)
|
|
for field in self.base_fields:
|
|
if field not in {'source', 'agreed_with_terms'}:
|
|
self.base_fields[field].required = False
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def clean(self):
|
|
super().clean()
|
|
|
|
if 'agreed_with_terms' not in self.cleaned_data:
|
|
return
|
|
# FIXME: not sure if this is a real use case
|
|
if self.cleaned_data['agreed_with_terms'] is not True:
|
|
raise forms.ValidationError(
|
|
{'agreed_with_terms', ['This field is required.']}, code='required'
|
|
)
|
|
|
|
if 'source' not in self.cleaned_data:
|
|
return
|
|
|
|
source = self.cleaned_data['source']
|
|
# We accept files up to 1MB higher than the limit, so the
|
|
# error message can always show complete integers
|
|
if source.size >= (MAX_UPLOAD_SIZE_MB + 1) * BYTES_TO_MEGABYTE:
|
|
raise forms.ValidationError(
|
|
{
|
|
'source': [
|
|
f'Please keep filesize under {MAX_UPLOAD_SIZE_MB}MB. '
|
|
'Current filesize is {source.size // BYTES_TO_MEGABYTE}MB.'
|
|
]
|
|
},
|
|
code='invalid',
|
|
)
|
|
|
|
hash_ = files.models.File.generate_hash(source)
|
|
existing_file = files.models.File.objects.filter(original_hash=hash_).first()
|
|
if existing_file:
|
|
logger.warning(f'Found existing File: {existing_file!r} with a matching {hash_!r}')
|
|
# TODO: handle reupload by someone else
|
|
if self.request.user != existing_file.user:
|
|
raise forms.ValidationError(
|
|
{'source': ['This file appears to have already been uploaded by someone else']},
|
|
code='invalid',
|
|
)
|
|
# TODO: figure out conditions when file has to be updated on repeated upload
|
|
self.cleaned_data['id'] = existing_file.pk
|
|
self.instance = existing_file
|
|
self.cleaned_data.update(
|
|
{
|
|
'original_name': source.name,
|
|
'user': self.request.user,
|
|
'size_bytes': source.size,
|
|
'original_hash': hash_,
|
|
'hash': hash_,
|
|
}
|
|
)
|
|
|
|
# TODO: the file should probably be parsed ONLY AFTER some sanity checks.
|
|
# TODO: the sanity checks might include ClamAV scan,
|
|
# a report from which can later be used to generate a maliciousness score
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
logger.debug('Created temporary directory %s' % tmpdirname)
|
|
file_path = os.path.join(tmpdirname, source.name)
|
|
with open(file_path, 'wb+') as destination:
|
|
for chunk in source.chunks():
|
|
destination.write(chunk)
|
|
|
|
errors = []
|
|
if not zipfile.is_zipfile(file_path):
|
|
raise forms.ValidationError(self.error_messages['invalid_zip_archive'])
|
|
|
|
manifest, error_codes = utils.read_manifest_from_zip(file_path)
|
|
for code in error_codes:
|
|
errors.append(forms.ValidationError(self.error_messages[code]))
|
|
if errors:
|
|
self.add_error('source', errors)
|
|
|
|
if manifest:
|
|
ManifestValidator(manifest)
|
|
ExtensionIDManifestValidator(manifest, self.extension)
|
|
|
|
self.cleaned_data['metadata'] = manifest
|
|
self.cleaned_data['type'] = EXTENSION_SLUG_TYPES[manifest['type']]
|
|
|
|
return self.cleaned_data
|
|
|
|
|
|
class BaseMediaFileForm(forms.ModelForm):
|
|
class Meta:
|
|
model = files.models.File
|
|
fields = ('source', 'original_hash')
|
|
widgets = {'original_hash': forms.HiddenInput()}
|
|
|
|
source = forms.ImageField(widget=forms.FileInput, allow_empty_file=False)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.request = kwargs.pop('request')
|
|
self.extension = kwargs.pop('extension')
|
|
# Set current File so that the form actually displays it:
|
|
if hasattr(self, 'to_field'):
|
|
kwargs['instance'] = getattr(self.extension, getattr(self, 'to_field'))
|
|
|
|
source_field = self.base_fields['source']
|
|
|
|
# File might not be required depending on the context (saving draft vs sending to review)
|
|
source_field.required = False
|
|
|
|
accept = ','.join(self.allowed_mimetypes)
|
|
source_field.widget.attrs.update({'accept': accept})
|
|
|
|
# Replace ImageField's file extension validator with one that also check file's content
|
|
source_field.validators = [
|
|
FileMIMETypeValidator(
|
|
allowed_mimetypes=self.allowed_mimetypes,
|
|
message=self.error_messages['invalid_mimetype'],
|
|
)
|
|
]
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.instance.user = self.request.user
|
|
|
|
def clean_original_hash(self, *args, **kwargs):
|
|
"""Calculate original hash of the uploaded file."""
|
|
source = self.cleaned_data.get('source')
|
|
if not source:
|
|
return
|
|
return files.models.File.generate_hash(source)
|
|
|
|
def add_error(self, field, error):
|
|
"""Add hidden `original_hash` errors to the visible `source` field instead."""
|
|
if isinstance(error, django.core.exceptions.ValidationError):
|
|
if getattr(error, 'error_dict', None):
|
|
hash_error = error.error_dict.pop('original_hash', None)
|
|
if hash_error:
|
|
error.error_dict['source'] = hash_error
|
|
super(forms.ModelForm, self).add_error(field, error)
|
|
|
|
def save(self, *args, **kwargs):
|
|
"""Save as `to_field` on the parent object (Extension)."""
|
|
source = self.cleaned_data['source']
|
|
self.instance.hash = self.instance.original_hash
|
|
self.instance.original_name = source.name
|
|
self.instance.size_bytes = source.size
|
|
instance = super().save(*args, **kwargs)
|
|
|
|
if hasattr(self, 'to_field'):
|
|
to_field = self.to_field
|
|
setattr(self.extension, to_field, instance)
|
|
return instance
|