extensions-website/files/forms.py

221 lines
8.4 KiB
Python

import logging
import os
import zipfile
import tempfile
from django import forms
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
import django.core.exceptions
from .validators import (
ExtensionIDManifestValidator,
FileMIMETypeValidator,
ManifestValidator,
)
from constants.base import EXTENSION_SLUG_TYPES, ALLOWED_EXTENSION_MIMETYPES
import files.models
import files.utils as utils
# Soft limit for the files, effectively we accept up to 1MB extra
MAX_UPLOAD_SIZE_MB = 10
BYTES_TO_MEGABYTE = 1048576 # constant
logger = logging.getLogger(__name__)
class FileForm(forms.ModelForm):
msg_only_zip_files = _('Only .zip files are accepted.')
# Mimicking how django.forms.fields.Field handles validation error messages.
# TODO: maybe this should be a custom SourceFileField with all these validators and messages
error_messages = {
'invalid_manifest_path': _(
'The manifest file should be at the top level of the archive, or one level deep.'
),
# TODO: surface TOML parsing errors?
'invalid_manifest_toml': _('Could not parse the manifest file.'),
'invalid_missing_init': _('An add-on should have an __init__.py file.'),
'missing_or_multiple_theme_xml': _('A theme should have exactly one XML file.'),
'invalid_zip_archive': msg_only_zip_files,
'missing_manifest_toml': _('The manifest file is missing.'),
}
class Meta:
model = files.models.File
fields = ('source', 'type', 'metadata', 'agreed_with_terms', 'user')
source = forms.FileField(
allow_empty_file=False,
required=True,
validators=[
FileMIMETypeValidator(
allowed_mimetypes=ALLOWED_EXTENSION_MIMETYPES,
message=error_messages['invalid_zip_archive'],
),
],
widget=forms.ClearableFileInput(attrs={'accept': ','.join(ALLOWED_EXTENSION_MIMETYPES)}),
help_text=msg_only_zip_files,
)
agreed_with_terms = forms.BooleanField(
initial=False,
required=True,
label=mark_safe(
'I have read and agreed with Blender Extensions'
' and <a href="/terms-of-service/" target="_blank">terms of service</a>'
),
)
def __init__(self, *args, **kwargs):
self.request = kwargs.pop('request')
self.extension = kwargs.pop('extension', None)
for field in self.base_fields:
if field not in {'source', 'agreed_with_terms'}:
self.base_fields[field].required = False
super().__init__(*args, **kwargs)
def clean(self):
super().clean()
if 'agreed_with_terms' not in self.cleaned_data:
return
# FIXME: not sure if this is a real use case
if self.cleaned_data['agreed_with_terms'] is not True:
raise forms.ValidationError(
{'agreed_with_terms', ['This field is required.']}, code='required'
)
if 'source' not in self.cleaned_data:
return
source = self.cleaned_data['source']
# We accept files up to 1MB higher than the limit, so the
# error message can always show complete integers
if source.size >= (MAX_UPLOAD_SIZE_MB + 1) * BYTES_TO_MEGABYTE:
raise forms.ValidationError(
{
'source': [
f'Please keep filesize under {MAX_UPLOAD_SIZE_MB}MB. '
'Current filesize is {source.size // BYTES_TO_MEGABYTE}MB.'
]
},
code='invalid',
)
hash_ = files.models.File.generate_hash(source)
existing_file = files.models.File.objects.filter(original_hash=hash_).first()
if existing_file:
logger.warning(f'Found existing File: {existing_file!r} with a matching {hash_!r}')
# TODO: handle reupload by someone else
if self.request.user != existing_file.user:
raise forms.ValidationError(
{'source': ['This file appears to have already been uploaded by someone else']},
code='invalid',
)
# TODO: figure out conditions when file has to be updated on repeated upload
self.cleaned_data['id'] = existing_file.pk
self.instance = existing_file
self.cleaned_data.update(
{
'original_name': source.name,
'user': self.request.user,
'size_bytes': source.size,
'original_hash': hash_,
'hash': hash_,
}
)
# TODO: the file should probably be parsed ONLY AFTER some sanity checks.
# TODO: the sanity checks might include ClamAV scan,
# a report from which can later be used to generate a maliciousness score
with tempfile.TemporaryDirectory() as tmpdirname:
logger.debug('Created temporary directory %s' % tmpdirname)
file_path = os.path.join(tmpdirname, source.name)
with open(file_path, 'wb+') as destination:
for chunk in source.chunks():
destination.write(chunk)
errors = []
if not zipfile.is_zipfile(file_path):
raise forms.ValidationError(self.error_messages['invalid_zip_archive'])
manifest, error_codes = utils.read_manifest_from_zip(file_path)
for code in error_codes:
errors.append(forms.ValidationError(self.error_messages[code]))
if errors:
self.add_error('source', errors)
if manifest:
ManifestValidator(manifest)
ExtensionIDManifestValidator(manifest, self.extension)
self.cleaned_data['metadata'] = manifest
self.cleaned_data['type'] = EXTENSION_SLUG_TYPES[manifest['type']]
return self.cleaned_data
class BaseMediaFileForm(forms.ModelForm):
class Meta:
model = files.models.File
fields = ('source', 'original_hash')
widgets = {'original_hash': forms.HiddenInput()}
source = forms.ImageField(widget=forms.FileInput, allow_empty_file=False)
def __init__(self, *args, **kwargs):
self.request = kwargs.pop('request')
self.extension = kwargs.pop('extension')
# Set current File so that the form actually displays it:
if hasattr(self, 'to_field'):
kwargs['instance'] = getattr(self.extension, getattr(self, 'to_field'))
source_field = self.base_fields['source']
# File might not be required depending on the context (saving draft vs sending to review)
source_field.required = False
accept = ','.join(self.allowed_mimetypes)
source_field.widget.attrs.update({'accept': accept})
# Replace ImageField's file extension validator with one that also check file's content
source_field.validators = [
FileMIMETypeValidator(
allowed_mimetypes=self.allowed_mimetypes,
message=self.error_messages['invalid_mimetype'],
)
]
super().__init__(*args, **kwargs)
self.instance.user = self.request.user
def clean_original_hash(self, *args, **kwargs):
"""Calculate original hash of the uploaded file."""
source = self.cleaned_data.get('source')
if not source:
return
return files.models.File.generate_hash(source)
def add_error(self, field, error):
"""Add hidden `original_hash` errors to the visible `source` field instead."""
if isinstance(error, django.core.exceptions.ValidationError):
if getattr(error, 'error_dict', None):
hash_error = error.error_dict.pop('original_hash', None)
if hash_error:
error.error_dict['source'] = hash_error
super(forms.ModelForm, self).add_error(field, error)
def save(self, *args, **kwargs):
"""Save as `to_field` on the parent object (Extension)."""
source = self.cleaned_data['source']
self.instance.hash = self.instance.original_hash
self.instance.original_name = source.name
self.instance.size_bytes = source.size
instance = super().save(*args, **kwargs)
if hasattr(self, 'to_field'):
to_field = self.to_field
setattr(self.extension, to_field, instance)
return instance