extensions-website/files/signals.py
Oleg Komarov c975e8cb95 File scanning: validate wheel digests against pypi.org (#199)
This PR adds a new check to background file scans:
wheel digests are verified using PyPI json API
https://warehouse.pypa.io/api-reference/json.html

This check should flag uploads that try to ship code not published on PyPI.
Although the fact that something is published on PyPI is not a guarantee
that the code is safe to load, this additional step should introduce at least
some barriers to uploading malicious code.

We can potentially improve on this further by e.g. integrating with
https://docs.virustotal.com/docs/api-overview

Reviewed-on: #199
Reviewed-by: Anna Sirota <annasirota@noreply.localhost>
2024-07-11 10:45:22 +02:00

92 lines
3.0 KiB
Python

import logging
from django.conf import settings
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete
from django.dispatch import receiver
import files.models
import files.tasks
import files.utils
logger = logging.getLogger(__name__)
@receiver(pre_save, sender=files.models.File)
def _record_changes(
sender: object, instance: files.models.File, update_fields: object, **kwargs: object
) -> None:
was_changed, old_state = instance.pre_save_record(update_fields=update_fields)
instance.record_status_change(was_changed, old_state, **kwargs)
def schedule_scan(file: files.models.File) -> None:
"""Schedule a scan of a given file."""
logger.info('Scheduling a scan for file pk=%s', file.pk)
verbose_name = f'scan of "{file.source.name}"'
files.tasks.scan_file(file_id=file.pk, creator=file, verbose_name=verbose_name)
@receiver(post_save, sender=files.models.File)
def _scan_new_file(
sender: object, instance: files.models.File, created: bool, **kwargs: object
) -> None:
if not created:
return
schedule_scan(instance)
def schedule_thumbnails(file: files.models.File) -> None:
"""Schedule thumbnail generation for a given file."""
if not file.is_image and not file.is_video:
return
args = {'pk': file.pk, 'type': file.get_type_display()}
logger.info('Scheduling thumbnail generation for file pk=%(pk)s type=%(type)s', args)
verbose_name = f'make thumbnails for "{file.source.name}"'
files.tasks.make_thumbnails(file_id=file.pk, creator=file, verbose_name=verbose_name)
def _schedule_thumbnails_when_created(
sender: object, instance: files.models.File, created: bool, **kwargs: object
) -> None:
if not created:
return
schedule_thumbnails(instance)
def _schedule_thumbnails_when_validated(
sender: object, instance: files.models.FileValidation, created: bool, **kwargs: object
) -> None:
if not created:
return
if not instance.is_ok:
return
# Generate thumbnails if initial scan found no issues
schedule_thumbnails(instance.file)
if settings.REQUIRE_FILE_VALIDATION:
# Only schedule thumbnails when file is validated
post_save.connect(_schedule_thumbnails_when_validated, sender=files.models.FileValidation)
else:
# Schedule thumbnails when a new file is created
post_save.connect(_schedule_thumbnails_when_created, sender=files.models.File)
@receiver(pre_delete, sender=files.models.File)
@receiver(pre_delete, sender=files.models.FileValidation)
def _log_deletion(sender: object, instance: files.models.File, **kwargs: object) -> None:
instance.record_deletion()
@receiver(post_delete, sender=files.models.File)
def delete_orphaned_files(sender: object, instance: files.models.File, **kwargs: object) -> None:
"""Delete source and thumbnail files from storage when File record is deleted."""
files.utils.delete_file_in_storage(instance.source.name)
files.utils.delete_file_in_storage(instance.thumbnail.name)
files.utils.delete_thumbnails(instance.metadata)