Oleg Komarov
c975e8cb95
This PR adds a new check to background file scans: wheel digests are verified using PyPI json API https://warehouse.pypa.io/api-reference/json.html This check should flag uploads that try to ship code not published on PyPI. Although the fact that something is published on PyPI is not a guarantee that the code is safe to load, this additional step should introduce at least some barriers to uploading malicious code. We can potentially improve on this further by e.g. integrating with https://docs.virustotal.com/docs/api-overview Reviewed-on: #199 Reviewed-by: Anna Sirota <annasirota@noreply.localhost>
92 lines
3.0 KiB
Python
92 lines
3.0 KiB
Python
import logging
|
|
|
|
from django.conf import settings
|
|
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete
|
|
from django.dispatch import receiver
|
|
|
|
import files.models
|
|
import files.tasks
|
|
import files.utils
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@receiver(pre_save, sender=files.models.File)
|
|
def _record_changes(
|
|
sender: object, instance: files.models.File, update_fields: object, **kwargs: object
|
|
) -> None:
|
|
was_changed, old_state = instance.pre_save_record(update_fields=update_fields)
|
|
|
|
instance.record_status_change(was_changed, old_state, **kwargs)
|
|
|
|
|
|
def schedule_scan(file: files.models.File) -> None:
|
|
"""Schedule a scan of a given file."""
|
|
logger.info('Scheduling a scan for file pk=%s', file.pk)
|
|
verbose_name = f'scan of "{file.source.name}"'
|
|
files.tasks.scan_file(file_id=file.pk, creator=file, verbose_name=verbose_name)
|
|
|
|
|
|
@receiver(post_save, sender=files.models.File)
|
|
def _scan_new_file(
|
|
sender: object, instance: files.models.File, created: bool, **kwargs: object
|
|
) -> None:
|
|
if not created:
|
|
return
|
|
|
|
schedule_scan(instance)
|
|
|
|
|
|
def schedule_thumbnails(file: files.models.File) -> None:
|
|
"""Schedule thumbnail generation for a given file."""
|
|
if not file.is_image and not file.is_video:
|
|
return
|
|
args = {'pk': file.pk, 'type': file.get_type_display()}
|
|
logger.info('Scheduling thumbnail generation for file pk=%(pk)s type=%(type)s', args)
|
|
verbose_name = f'make thumbnails for "{file.source.name}"'
|
|
files.tasks.make_thumbnails(file_id=file.pk, creator=file, verbose_name=verbose_name)
|
|
|
|
|
|
def _schedule_thumbnails_when_created(
|
|
sender: object, instance: files.models.File, created: bool, **kwargs: object
|
|
) -> None:
|
|
if not created:
|
|
return
|
|
|
|
schedule_thumbnails(instance)
|
|
|
|
|
|
def _schedule_thumbnails_when_validated(
|
|
sender: object, instance: files.models.FileValidation, created: bool, **kwargs: object
|
|
) -> None:
|
|
if not created:
|
|
return
|
|
|
|
if not instance.is_ok:
|
|
return
|
|
|
|
# Generate thumbnails if initial scan found no issues
|
|
schedule_thumbnails(instance.file)
|
|
|
|
|
|
if settings.REQUIRE_FILE_VALIDATION:
|
|
# Only schedule thumbnails when file is validated
|
|
post_save.connect(_schedule_thumbnails_when_validated, sender=files.models.FileValidation)
|
|
else:
|
|
# Schedule thumbnails when a new file is created
|
|
post_save.connect(_schedule_thumbnails_when_created, sender=files.models.File)
|
|
|
|
|
|
@receiver(pre_delete, sender=files.models.File)
|
|
@receiver(pre_delete, sender=files.models.FileValidation)
|
|
def _log_deletion(sender: object, instance: files.models.File, **kwargs: object) -> None:
|
|
instance.record_deletion()
|
|
|
|
|
|
@receiver(post_delete, sender=files.models.File)
|
|
def delete_orphaned_files(sender: object, instance: files.models.File, **kwargs: object) -> None:
|
|
"""Delete source and thumbnail files from storage when File record is deleted."""
|
|
files.utils.delete_file_in_storage(instance.source.name)
|
|
files.utils.delete_file_in_storage(instance.thumbnail.name)
|
|
files.utils.delete_thumbnails(instance.metadata)
|