139 lines
5.0 KiB
Python
139 lines
5.0 KiB
Python
import logging
|
|
import os
|
|
import zipfile
|
|
import tempfile
|
|
|
|
from django import forms
|
|
from django.utils.safestring import mark_safe
|
|
|
|
from .validators import (
|
|
CustomFileExtensionValidator,
|
|
ExtensionIDManifestValidator,
|
|
ManifestValidator,
|
|
)
|
|
from constants.base import (
|
|
EXTENSION_TYPE_SLUGS_SINGULAR,
|
|
VALID_SOURCE_EXTENSIONS,
|
|
)
|
|
import files.models
|
|
import files.utils as utils
|
|
|
|
# Soft limit for the files, effectively we accept up to 1MB extra
|
|
MAX_UPLOAD_SIZE_MB = 10
|
|
BYTES_TO_MEGABYTE = 1048576 # constant
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FileForm(forms.ModelForm):
|
|
class Meta:
|
|
model = files.models.File
|
|
fields = ('source', 'type', 'metadata', 'agreed_with_terms', 'user')
|
|
|
|
source = forms.FileField(
|
|
allow_empty_file=False,
|
|
required=True,
|
|
validators=[CustomFileExtensionValidator(allowed_extensions=VALID_SOURCE_EXTENSIONS)],
|
|
help_text=('Only .zip files are accepted.'),
|
|
)
|
|
agreed_with_terms = forms.BooleanField(
|
|
initial=False,
|
|
required=True,
|
|
label=mark_safe(
|
|
'I have read and agreed with Blender Extensions'
|
|
' <a href="/conditions-of-use/" target="_blank">conditions of use</a>'
|
|
' and <a href="/policies/" target="_blank">policies</a>'
|
|
),
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.request = kwargs.pop('request')
|
|
self.extension = kwargs.pop('extension', None)
|
|
for field in self.base_fields:
|
|
if field not in {'source', 'agreed_with_terms'}:
|
|
self.base_fields[field].required = False
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def clean(self):
|
|
super().clean()
|
|
|
|
if 'agreed_with_terms' not in self.cleaned_data:
|
|
return
|
|
# FIXME: not sure if this is a real use case
|
|
if self.cleaned_data['agreed_with_terms'] is not True:
|
|
raise forms.ValidationError(
|
|
{'agreed_with_terms', ['This field is required.']}, code='required'
|
|
)
|
|
|
|
if 'source' not in self.cleaned_data:
|
|
return
|
|
|
|
source = self.cleaned_data['source']
|
|
# We accept files up to 1MB higher than the limit, so the
|
|
# error message can always show complete integers
|
|
if source.size >= (MAX_UPLOAD_SIZE_MB + 1) * BYTES_TO_MEGABYTE:
|
|
raise forms.ValidationError(
|
|
{
|
|
'source': [
|
|
f'Please keep filesize under {MAX_UPLOAD_SIZE_MB}MB. '
|
|
'Current filesize is {source.size // BYTES_TO_MEGABYTE}MB.'
|
|
]
|
|
},
|
|
code='invalid',
|
|
)
|
|
|
|
hash_ = files.models.File.generate_hash(source)
|
|
existing_file = files.models.File.objects.filter(original_hash=hash_).first()
|
|
if existing_file:
|
|
logger.warning(f'Found existing File: {existing_file!r} with a matching {hash_!r}')
|
|
# TODO: handle reupload by someone else
|
|
if self.request.user != existing_file.user:
|
|
raise forms.ValidationError(
|
|
{'source': ['This file appears to have already been uploaded by someone else']},
|
|
code='invalid',
|
|
)
|
|
# TODO: figure out conditions when file has to be updated on repeated upload
|
|
self.cleaned_data['id'] = existing_file.pk
|
|
self.instance = existing_file
|
|
self.cleaned_data.update(
|
|
{
|
|
'original_name': source.name,
|
|
'user': self.request.user,
|
|
'size_bytes': source.size,
|
|
'original_hash': hash_,
|
|
'hash': hash_,
|
|
}
|
|
)
|
|
|
|
# TODO: the file should probably be parsed ONLY AFTER some sanity checks.
|
|
# TODO: the sanity checks might include ClamAV scan,
|
|
# a report from which can later be used to generate a maliciousness score
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
logger.debug('Created temporary directory %s' % tmpdirname)
|
|
file_path = os.path.join(tmpdirname, source.name)
|
|
with open(file_path, 'wb+') as destination:
|
|
for chunk in source.chunks():
|
|
destination.write(chunk)
|
|
|
|
errors = []
|
|
if not zipfile.is_zipfile(file_path):
|
|
errors.append('File is not .zip')
|
|
|
|
manifest = utils.read_manifest_from_zip(file_path)
|
|
|
|
if manifest is None:
|
|
errors.append('A valid manifest file could not be found')
|
|
else:
|
|
ManifestValidator(manifest)
|
|
ExtensionIDManifestValidator(manifest, self.extension)
|
|
|
|
extension_types = {v: k for k, v in EXTENSION_TYPE_SLUGS_SINGULAR.items()}
|
|
if errors:
|
|
raise forms.ValidationError({'source': errors}, code='invalid')
|
|
|
|
self.cleaned_data['metadata'] = manifest
|
|
# TODO: Error handling
|
|
self.cleaned_data['type'] = extension_types[manifest['type']]
|
|
|
|
return self.cleaned_data
|