Oleg Komarov
c975e8cb95
This PR adds a new check to background file scans: wheel digests are verified using PyPI json API https://warehouse.pypa.io/api-reference/json.html This check should flag uploads that try to ship code not published on PyPI. Although the fact that something is published on PyPI is not a guarantee that the code is safe to load, this additional step should introduce at least some barriers to uploading malicious code. We can potentially improve on this further by e.g. integrating with https://docs.virustotal.com/docs/api-overview Reviewed-on: #199 Reviewed-by: Anna Sirota <annasirota@noreply.localhost>
73 lines
3.0 KiB
Python
73 lines
3.0 KiB
Python
import logging
|
|
import os.path
|
|
|
|
from background_task import background
|
|
from background_task.tasks import TaskSchedule
|
|
from django.conf import settings
|
|
|
|
import files.models
|
|
import files.utils
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@background(schedule={'action': TaskSchedule.RESCHEDULE_EXISTING})
|
|
def scan_file(file_id: int):
|
|
"""Run a scan of a given file and save its output as a FileValidation record."""
|
|
file = files.models.File.objects.get(pk=file_id)
|
|
abs_path = os.path.join(settings.MEDIA_ROOT, file.source.path)
|
|
clamd_scan_status, clamd_scan_found = files.utils.run_clamdscan(abs_path)
|
|
logger.info('File pk=%s scanned by clamd: %s', file.pk, (clamd_scan_status, clamd_scan_found))
|
|
scan_result = {'clamdscan': [clamd_scan_status, clamd_scan_found]}
|
|
is_ok = clamd_scan_status == 'OK'
|
|
if is_ok and (wheels := files.utils.get_wheels_from_manifest(file.metadata)):
|
|
if invalid_wheels := files.utils.validate_wheels(abs_path, wheels):
|
|
logger.info('File pk=%s has invalid wheels: %s', file.pk, invalid_wheels)
|
|
is_ok = False
|
|
scan_result['invalid_wheels'] = invalid_wheels
|
|
|
|
files.models.FileValidation.objects.update_or_create(
|
|
file=file, defaults={'results': scan_result, 'is_ok': is_ok}
|
|
)
|
|
|
|
|
|
@background(schedule={'action': TaskSchedule.RESCHEDULE_EXISTING})
|
|
def make_thumbnails(file_id: int) -> None:
|
|
"""Generate thumbnails for a given file, store them in thumbnail and metadata columns."""
|
|
file = files.models.File.objects.get(pk=file_id)
|
|
args = {'pk': file_id, 'type': file.get_type_display()}
|
|
|
|
if not file.is_image and not file.is_video:
|
|
logger.error('File pk=%(pk)s of type "%(type)s" is neither an image nor a video', args)
|
|
return
|
|
if settings.REQUIRE_FILE_VALIDATION and not file.validation.is_ok:
|
|
logger.error("File pk={pk} is flagged, won't make thumbnails".format(**args))
|
|
return
|
|
|
|
# For an image, source of the thumbnails is the original image
|
|
source_path = file.source.path
|
|
thumbnail_field = file.thumbnail
|
|
unchanged_thumbnail = thumbnail_field.name
|
|
|
|
if file.is_video:
|
|
frame_path = files.utils.get_thumbnail_upload_to(file.hash)
|
|
# For a video, source of the thumbnails is a frame extracted with ffpeg
|
|
files.utils.extract_frame(source_path, frame_path)
|
|
thumbnail_field.name = frame_path
|
|
source_path = frame_path
|
|
|
|
thumbnails = files.utils.make_thumbnails(source_path, file.hash)
|
|
|
|
thumbnail_field.name = thumbnails['1080p']['path']
|
|
|
|
update_fields = set()
|
|
if thumbnail_field.name != unchanged_thumbnail:
|
|
update_fields.add('thumbnail')
|
|
if file.metadata.get('thumbnails') != thumbnails:
|
|
file.metadata.update({'thumbnails': thumbnails})
|
|
update_fields.add('metadata')
|
|
if update_fields:
|
|
args['update_fields'] = update_fields
|
|
logger.info('Made thumbnails for file pk=%(pk)s, updating %(update_fields)s', args)
|
|
file.save(update_fields=update_fields)
|