257 lines
9.9 KiB
Python
257 lines
9.9 KiB
Python
import logging
|
|
import os
|
|
import zipfile
|
|
import tempfile
|
|
|
|
from django import forms
|
|
from django.utils.safestring import mark_safe
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from .validators import (
|
|
ExtensionIDManifestValidator,
|
|
ExtensionNameManifestValidator,
|
|
ExtensionVersionManifestValidator,
|
|
FileMIMETypeValidator,
|
|
ManifestValidator,
|
|
)
|
|
from constants.base import EXTENSION_SLUG_TYPES, ALLOWED_EXTENSION_MIMETYPES
|
|
import files.models
|
|
import files.utils as utils
|
|
|
|
# Soft limit for the files, effectively we accept up to 1MB extra
|
|
# This value is also limited by CloudFlare plan and nginx configuration
|
|
MAX_UPLOAD_SIZE_MB = 200
|
|
BYTES_TO_MEGABYTE = 1048576 # constant
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FileForm(forms.ModelForm):
|
|
msg_only_zip_files = _('Only .zip files are accepted.')
|
|
|
|
# Mimicking how django.forms.fields.Field handles validation error messages.
|
|
# TODO: maybe this should be a custom SourceFileField with all these validators and messages
|
|
error_messages = {
|
|
'invalid_manifest_path': _(
|
|
'The manifest file should be at the top level of the archive, or one level deep.'
|
|
),
|
|
'invalid_manifest_toml': _(
|
|
'Manifest file contains invalid code: %(msg)s at line %(lineno)s'
|
|
),
|
|
'invalid_missing_init': mark_safe(_('Add-on file missing: <strong>__init__.py</strong>.')),
|
|
'missing_or_multiple_theme_xml': mark_safe(
|
|
_('Themes can only contain <strong>one XML file</strong>.')
|
|
),
|
|
'invalid_zip_archive': msg_only_zip_files,
|
|
'missing_manifest_toml': _('The manifest file is missing.'),
|
|
'missing_wheel': _('Python wheel missing: %(path)s'), # TODO <strong>%(path)s</strong>
|
|
'forbidden_filepaths': _(
|
|
'Archive contains forbidden files or directories: %(paths)s'
|
|
), # TODO <strong>%(paths)s</strong>
|
|
}
|
|
|
|
class Meta:
|
|
model = files.models.File
|
|
fields = ('source', 'type', 'metadata', 'agreed_with_terms', 'user')
|
|
|
|
source = forms.FileField(
|
|
allow_empty_file=False,
|
|
required=True,
|
|
validators=[
|
|
FileMIMETypeValidator(
|
|
allowed_mimetypes=ALLOWED_EXTENSION_MIMETYPES,
|
|
message=error_messages['invalid_zip_archive'],
|
|
),
|
|
],
|
|
widget=forms.ClearableFileInput(attrs={'accept': ','.join(ALLOWED_EXTENSION_MIMETYPES)}),
|
|
help_text=msg_only_zip_files,
|
|
)
|
|
agreed_with_terms = forms.BooleanField(
|
|
initial=False,
|
|
required=True,
|
|
label=mark_safe(
|
|
'I have read and agreed with Blender Extensions'
|
|
' and <a href="/terms-of-service/" target="_blank">terms of service</a>'
|
|
),
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.extension = kwargs.pop('extension', None)
|
|
self.request = kwargs.pop('request')
|
|
for field in self.base_fields:
|
|
if field not in {'source', 'agreed_with_terms'}:
|
|
self.base_fields[field].required = False
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def clean(self):
|
|
super().clean()
|
|
|
|
if 'agreed_with_terms' not in self.cleaned_data:
|
|
return
|
|
# FIXME: not sure if this is a real use case
|
|
if self.cleaned_data['agreed_with_terms'] is not True:
|
|
raise forms.ValidationError(
|
|
{'agreed_with_terms', ['This field is required.']}, code='required'
|
|
)
|
|
|
|
if 'source' not in self.cleaned_data:
|
|
return
|
|
|
|
source = self.cleaned_data['source']
|
|
# We accept files up to 1MB higher than the limit, so the
|
|
# error message can always show complete integers
|
|
if source.size >= (MAX_UPLOAD_SIZE_MB + 1) * BYTES_TO_MEGABYTE:
|
|
raise forms.ValidationError(
|
|
{
|
|
'source': [
|
|
f'Please keep filesize under {MAX_UPLOAD_SIZE_MB}MB. '
|
|
f'Current filesize is {source.size // BYTES_TO_MEGABYTE}MB.'
|
|
]
|
|
},
|
|
code='invalid',
|
|
)
|
|
|
|
hash_ = files.models.File.generate_hash(source)
|
|
existing_file = files.models.File.objects.filter(original_hash=hash_).first()
|
|
if existing_file:
|
|
logger.warning(f'Found existing File: {existing_file!r} with a matching {hash_!r}')
|
|
# TODO: handle reupload by someone else
|
|
if self.request.user != existing_file.user:
|
|
raise forms.ValidationError(
|
|
{'source': ['This file appears to have already been uploaded by someone else']},
|
|
code='invalid',
|
|
)
|
|
# TODO: figure out conditions when file has to be updated on repeated upload
|
|
self.cleaned_data['id'] = existing_file.pk
|
|
self.instance = existing_file
|
|
self.cleaned_data.update(
|
|
{
|
|
'original_name': source.name,
|
|
'user': self.request.user,
|
|
'size_bytes': source.size,
|
|
'original_hash': hash_,
|
|
'hash': hash_,
|
|
}
|
|
)
|
|
|
|
# TODO: the file should probably be parsed ONLY AFTER some sanity checks.
|
|
# TODO: the sanity checks might include ClamAV scan,
|
|
# a report from which can later be used to generate a maliciousness score
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
logger.debug('Created temporary directory %s' % tmpdirname)
|
|
file_path = os.path.join(tmpdirname, source.name)
|
|
with open(file_path, 'wb+') as destination:
|
|
for chunk in source.chunks():
|
|
destination.write(chunk)
|
|
|
|
errors = []
|
|
if not zipfile.is_zipfile(file_path):
|
|
raise forms.ValidationError(self.error_messages['invalid_zip_archive'])
|
|
|
|
manifest, error_codes = utils.read_manifest_from_zip(file_path)
|
|
for code in error_codes:
|
|
if isinstance(code, dict):
|
|
errors.append(
|
|
forms.ValidationError(
|
|
self.error_messages[code['code']],
|
|
params=code['params'],
|
|
)
|
|
)
|
|
else:
|
|
errors.append(forms.ValidationError(self.error_messages[code]))
|
|
if errors:
|
|
self.add_error('source', errors)
|
|
|
|
if manifest:
|
|
ManifestValidator(manifest)
|
|
ExtensionIDManifestValidator(manifest, self.extension)
|
|
ExtensionNameManifestValidator(manifest, self.extension)
|
|
ExtensionVersionManifestValidator(
|
|
manifest,
|
|
self.extension,
|
|
)
|
|
|
|
self.cleaned_data['metadata'] = manifest
|
|
self.cleaned_data['type'] = EXTENSION_SLUG_TYPES[manifest['type']]
|
|
|
|
return self.cleaned_data
|
|
|
|
|
|
class FileFormSkipAgreed(FileForm):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.fields['agreed_with_terms'].required = False
|
|
|
|
def clean(self):
|
|
self.cleaned_data['agreed_with_terms'] = True
|
|
super().clean()
|
|
|
|
|
|
class BaseMediaFileForm(forms.ModelForm):
|
|
class Meta:
|
|
model = files.models.File
|
|
fields = ('source',)
|
|
|
|
source = forms.ImageField(widget=forms.FileInput, allow_empty_file=False)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.request = kwargs.pop('request')
|
|
self.extension = kwargs.pop('extension')
|
|
# Set current File so that the form actually displays it:
|
|
if hasattr(self, 'to_field'):
|
|
kwargs['instance'] = getattr(self.extension, getattr(self, 'to_field'))
|
|
|
|
source_field = self.base_fields['source']
|
|
|
|
# File might not be required depending on the context (saving draft vs sending to review)
|
|
source_field.required = False
|
|
|
|
accept = ','.join(self.allowed_mimetypes)
|
|
source_field.widget.attrs.update({'accept': accept})
|
|
|
|
# Replace ImageField's file extension validator with one that also check file's content
|
|
source_field.validators = [
|
|
FileMIMETypeValidator(
|
|
allowed_mimetypes=self.allowed_mimetypes,
|
|
message=self.error_messages['invalid_mimetype'],
|
|
)
|
|
]
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def clean_source(self, *args, **kwargs):
|
|
"""Calculate original hash of the uploaded file, reuse existing record matching it."""
|
|
source = self.cleaned_data.get('source')
|
|
if 'source' in self.changed_data and source:
|
|
original_hash = files.models.File.generate_hash(source)
|
|
instance = files.models.File.objects.filter(original_hash=original_hash).first()
|
|
if instance:
|
|
# File with this hash exists already, make sure it's reused here
|
|
if instance.pk != self.instance.pk:
|
|
self.instance = instance
|
|
else:
|
|
previous_hash = self.instance.hash
|
|
if previous_hash and original_hash != previous_hash and self.instance.pk:
|
|
# Create a new file instead of changing the existing one
|
|
self.instance.pk = None
|
|
self.instance.original_hash = original_hash
|
|
return source
|
|
|
|
def save(self, *args, **kwargs):
|
|
"""Save as `to_field` on the parent object (Extension)."""
|
|
source = self.cleaned_data.get('source')
|
|
if 'source' in self.changed_data and source:
|
|
self.instance.hash = self.instance.original_hash
|
|
self.instance.original_name = source.name
|
|
self.instance.size_bytes = source.size
|
|
|
|
if not self.instance.user_id:
|
|
self.instance.user = self.request.user
|
|
|
|
instance = super().save(*args, **kwargs)
|
|
|
|
if hasattr(self, 'to_field'):
|
|
to_field = self.to_field
|
|
setattr(self.extension, to_field, instance)
|
|
return instance
|