extensions-website/files/forms.py
Anna Sirota b0bb4905b2 Reuse files as previews, icons or featured images (#161)
Now it should be be possible to:

* upload the same image as a preview or featured image on different extensions;
* upload the same image as an icon on different extensions;
* select the same video/image multiple times while adding previews on Draft or Edit page: first one will be saved, the rest of the duplicates will be ignored.

If all extensions referencing the file in any way are deleted, the file remains in the database: no thumbnail generating or scanning will happen if/when the file gets re-uploaded as a preview or featured image.

In all cases of re-upload `File.user` will not change: this shouldn't be a problem because currently there's no code relying on image ownership.

Version files will remain the only exception from this changed behaviour: it will only be possible to re-upload a version file once the version itself is deleted (which also deletes its file).

As a consequence of this change `File.extension_id` is dropped, because it is no longer possible to choose which extension should be saved there.

Should take care of #157

Reviewed-on: #161
Reviewed-by: Oleg-Komarov <oleg-komarov@noreply.localhost>
2024-06-04 12:23:25 +02:00

245 lines
9.4 KiB
Python

import logging
import os
import zipfile
import tempfile
from django import forms
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from .validators import (
ExtensionIDManifestValidator,
ExtensionNameManifestValidator,
FileMIMETypeValidator,
ManifestValidator,
)
from constants.base import EXTENSION_SLUG_TYPES, ALLOWED_EXTENSION_MIMETYPES
import files.models
import files.utils as utils
# Soft limit for the files, effectively we accept up to 1MB extra
MAX_UPLOAD_SIZE_MB = 10
BYTES_TO_MEGABYTE = 1048576 # constant
logger = logging.getLogger(__name__)
class FileForm(forms.ModelForm):
msg_only_zip_files = _('Only .zip files are accepted.')
# Mimicking how django.forms.fields.Field handles validation error messages.
# TODO: maybe this should be a custom SourceFileField with all these validators and messages
error_messages = {
'invalid_manifest_path': _(
'The manifest file should be at the top level of the archive, or one level deep.'
),
# TODO: surface TOML parsing errors?
'invalid_manifest_toml': _('Could not parse the manifest file.'),
'invalid_missing_init': _('An add-on should have an __init__.py file.'),
'missing_or_multiple_theme_xml': _('A theme should have exactly one XML file.'),
'invalid_zip_archive': msg_only_zip_files,
'missing_manifest_toml': _('The manifest file is missing.'),
'missing_wheel': _('A declared wheel is missing in the zip file, expected path: %(path)s'),
}
class Meta:
model = files.models.File
fields = ('source', 'type', 'metadata', 'agreed_with_terms', 'user')
source = forms.FileField(
allow_empty_file=False,
required=True,
validators=[
FileMIMETypeValidator(
allowed_mimetypes=ALLOWED_EXTENSION_MIMETYPES,
message=error_messages['invalid_zip_archive'],
),
],
widget=forms.ClearableFileInput(attrs={'accept': ','.join(ALLOWED_EXTENSION_MIMETYPES)}),
help_text=msg_only_zip_files,
)
agreed_with_terms = forms.BooleanField(
initial=False,
required=True,
label=mark_safe(
'I have read and agreed with Blender Extensions'
' and <a href="/terms-of-service/" target="_blank">terms of service</a>'
),
)
def __init__(self, *args, **kwargs):
self.request = kwargs.pop('request')
self.extension = kwargs.pop('extension', None)
for field in self.base_fields:
if field not in {'source', 'agreed_with_terms'}:
self.base_fields[field].required = False
super().__init__(*args, **kwargs)
def clean(self):
super().clean()
if 'agreed_with_terms' not in self.cleaned_data:
return
# FIXME: not sure if this is a real use case
if self.cleaned_data['agreed_with_terms'] is not True:
raise forms.ValidationError(
{'agreed_with_terms', ['This field is required.']}, code='required'
)
if 'source' not in self.cleaned_data:
return
source = self.cleaned_data['source']
# We accept files up to 1MB higher than the limit, so the
# error message can always show complete integers
if source.size >= (MAX_UPLOAD_SIZE_MB + 1) * BYTES_TO_MEGABYTE:
raise forms.ValidationError(
{
'source': [
f'Please keep filesize under {MAX_UPLOAD_SIZE_MB}MB. '
'Current filesize is {source.size // BYTES_TO_MEGABYTE}MB.'
]
},
code='invalid',
)
hash_ = files.models.File.generate_hash(source)
existing_file = files.models.File.objects.filter(original_hash=hash_).first()
if existing_file:
logger.warning(f'Found existing File: {existing_file!r} with a matching {hash_!r}')
# TODO: handle reupload by someone else
if self.request.user != existing_file.user:
raise forms.ValidationError(
{'source': ['This file appears to have already been uploaded by someone else']},
code='invalid',
)
# TODO: figure out conditions when file has to be updated on repeated upload
self.cleaned_data['id'] = existing_file.pk
self.instance = existing_file
self.cleaned_data.update(
{
'original_name': source.name,
'user': self.request.user,
'size_bytes': source.size,
'original_hash': hash_,
'hash': hash_,
}
)
# TODO: the file should probably be parsed ONLY AFTER some sanity checks.
# TODO: the sanity checks might include ClamAV scan,
# a report from which can later be used to generate a maliciousness score
with tempfile.TemporaryDirectory() as tmpdirname:
logger.debug('Created temporary directory %s' % tmpdirname)
file_path = os.path.join(tmpdirname, source.name)
with open(file_path, 'wb+') as destination:
for chunk in source.chunks():
destination.write(chunk)
errors = []
if not zipfile.is_zipfile(file_path):
raise forms.ValidationError(self.error_messages['invalid_zip_archive'])
manifest, error_codes = utils.read_manifest_from_zip(file_path)
for code in error_codes:
if isinstance(code, dict):
errors.append(
forms.ValidationError(
self.error_messages[code['code']],
params=code['params'],
)
)
else:
errors.append(forms.ValidationError(self.error_messages[code]))
if errors:
self.add_error('source', errors)
if manifest:
ManifestValidator(manifest)
ExtensionIDManifestValidator(manifest, self.extension)
ExtensionNameManifestValidator(manifest, self.extension)
self.cleaned_data['metadata'] = manifest
self.cleaned_data['type'] = EXTENSION_SLUG_TYPES[manifest['type']]
return self.cleaned_data
class FileFormSkipAgreed(FileForm):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['agreed_with_terms'].required = False
def clean(self):
self.cleaned_data['agreed_with_terms'] = True
super().clean()
class BaseMediaFileForm(forms.ModelForm):
class Meta:
model = files.models.File
fields = ('source',)
source = forms.ImageField(widget=forms.FileInput, allow_empty_file=False)
def __init__(self, *args, **kwargs):
self.request = kwargs.pop('request')
self.extension = kwargs.pop('extension')
# Set current File so that the form actually displays it:
if hasattr(self, 'to_field'):
kwargs['instance'] = getattr(self.extension, getattr(self, 'to_field'))
source_field = self.base_fields['source']
# File might not be required depending on the context (saving draft vs sending to review)
source_field.required = False
accept = ','.join(self.allowed_mimetypes)
source_field.widget.attrs.update({'accept': accept})
# Replace ImageField's file extension validator with one that also check file's content
source_field.validators = [
FileMIMETypeValidator(
allowed_mimetypes=self.allowed_mimetypes,
message=self.error_messages['invalid_mimetype'],
)
]
super().__init__(*args, **kwargs)
def clean_source(self, *args, **kwargs):
"""Calculate original hash of the uploaded file, reuse existing record matching it."""
source = self.cleaned_data.get('source')
if 'source' in self.changed_data and source:
original_hash = files.models.File.generate_hash(source)
instance = files.models.File.objects.filter(original_hash=original_hash).first()
if instance:
# File with this hash exists already, make sure it's reused here
if instance.pk != self.instance.pk:
self.instance = instance
else:
previous_hash = self.instance.hash
if previous_hash and original_hash != previous_hash and self.instance.pk:
# Create a new file instead of changing the existing one
self.instance.pk = None
self.instance.original_hash = original_hash
return source
def save(self, *args, **kwargs):
"""Save as `to_field` on the parent object (Extension)."""
source = self.cleaned_data.get('source')
if 'source' in self.changed_data and source:
self.instance.hash = self.instance.original_hash
self.instance.original_name = source.name
self.instance.size_bytes = source.size
if not self.instance.user_id:
self.instance.user = self.request.user
instance = super().save(*args, **kwargs)
if hasattr(self, 'to_field'):
to_field = self.to_field
setattr(self.extension, to_field, instance)
return instance