extensions-website/files/tasks.py
Oleg Komarov c975e8cb95 File scanning: validate wheel digests against pypi.org (#199)
This PR adds a new check to background file scans:
wheel digests are verified using PyPI json API
https://warehouse.pypa.io/api-reference/json.html

This check should flag uploads that try to ship code not published on PyPI.
Although the fact that something is published on PyPI is not a guarantee
that the code is safe to load, this additional step should introduce at least
some barriers to uploading malicious code.

We can potentially improve on this further by e.g. integrating with
https://docs.virustotal.com/docs/api-overview

Reviewed-on: #199
Reviewed-by: Anna Sirota <annasirota@noreply.localhost>
2024-07-11 10:45:22 +02:00

73 lines
3.0 KiB
Python

import logging
import os.path
from background_task import background
from background_task.tasks import TaskSchedule
from django.conf import settings
import files.models
import files.utils
logger = logging.getLogger(__name__)
@background(schedule={'action': TaskSchedule.RESCHEDULE_EXISTING})
def scan_file(file_id: int):
"""Run a scan of a given file and save its output as a FileValidation record."""
file = files.models.File.objects.get(pk=file_id)
abs_path = os.path.join(settings.MEDIA_ROOT, file.source.path)
clamd_scan_status, clamd_scan_found = files.utils.run_clamdscan(abs_path)
logger.info('File pk=%s scanned by clamd: %s', file.pk, (clamd_scan_status, clamd_scan_found))
scan_result = {'clamdscan': [clamd_scan_status, clamd_scan_found]}
is_ok = clamd_scan_status == 'OK'
if is_ok and (wheels := files.utils.get_wheels_from_manifest(file.metadata)):
if invalid_wheels := files.utils.validate_wheels(abs_path, wheels):
logger.info('File pk=%s has invalid wheels: %s', file.pk, invalid_wheels)
is_ok = False
scan_result['invalid_wheels'] = invalid_wheels
files.models.FileValidation.objects.update_or_create(
file=file, defaults={'results': scan_result, 'is_ok': is_ok}
)
@background(schedule={'action': TaskSchedule.RESCHEDULE_EXISTING})
def make_thumbnails(file_id: int) -> None:
"""Generate thumbnails for a given file, store them in thumbnail and metadata columns."""
file = files.models.File.objects.get(pk=file_id)
args = {'pk': file_id, 'type': file.get_type_display()}
if not file.is_image and not file.is_video:
logger.error('File pk=%(pk)s of type "%(type)s" is neither an image nor a video', args)
return
if settings.REQUIRE_FILE_VALIDATION and not file.validation.is_ok:
logger.error("File pk={pk} is flagged, won't make thumbnails".format(**args))
return
# For an image, source of the thumbnails is the original image
source_path = file.source.path
thumbnail_field = file.thumbnail
unchanged_thumbnail = thumbnail_field.name
if file.is_video:
frame_path = files.utils.get_thumbnail_upload_to(file.hash)
# For a video, source of the thumbnails is a frame extracted with ffpeg
files.utils.extract_frame(source_path, frame_path)
thumbnail_field.name = frame_path
source_path = frame_path
thumbnails = files.utils.make_thumbnails(source_path, file.hash)
thumbnail_field.name = thumbnails['1080p']['path']
update_fields = set()
if thumbnail_field.name != unchanged_thumbnail:
update_fields.add('thumbnail')
if file.metadata.get('thumbnails') != thumbnails:
file.metadata.update({'thumbnails': thumbnails})
update_fields.add('metadata')
if update_fields:
args['update_fields'] = update_fields
logger.info('Made thumbnails for file pk=%(pk)s, updating %(update_fields)s', args)
file.save(update_fields=update_fields)