Scan files with clamdscan #77

Merged
Anna Sirota merged 17 commits from scan-file into main 2024-04-12 19:11:30 +02:00
4 changed files with 72 additions and 5 deletions
Showing only changes of commit bd1d6acc52 - Show all commits

View File

@ -1,6 +1,20 @@
from django.contrib import admin
import background_task.admin
import background_task.models
from .models import File
from .models import File, FileValidation
import files.signals
def scan_selected_files(self, request, queryset):
"""Scan selected files."""
for instance in queryset:
files.signals._initiate_scan(instance)
class FileValidationInlineAdmin(admin.TabularInline):
model = FileValidation
readonly_fields = ('date_created', 'date_modified', 'is_valid', 'results')
@admin.register(File)
@ -77,3 +91,51 @@ class FileAdmin(admin.ModelAdmin):
},
),
)
inlines = [FileValidationInlineAdmin]
actions = [scan_selected_files]
try:
admin.site.unregister(background_task.models.Task)
admin.site.unregister(background_task.models.CompletedTask)
except admin.site.NotRegistered:
pass
class TaskMixin:
"""Modify a few properties of background tasks displayed in admin."""
def no_errors(self, obj):
"""Replace background_task's "has_error".
Make Django's red/green boolean icons less confusing
in the context of "there's an error during task run".
"""
return not bool(obj.last_error)
no_errors.boolean = True
@admin.register(background_task.models.Task)
@admin.register(background_task.models.CompletedTask)
class TaskAdmin(background_task.admin.TaskAdmin, TaskMixin):
date_hierarchy = 'run_at'
list_display = [
'run_at',
'task_name',
'task_params',
'attempts',
'no_errors',
'locked_by',
'locked_by_pid_running',
]
list_filter = (
'task_name',
'run_at',
'failed_at',
'locked_at',
'attempts',
'creator_content_type',
)
search_fields = ['task_name', 'task_params', 'last_error', 'verbose_name']

View File

@ -206,7 +206,7 @@ class File(CreatedModifiedMixin, TrackChangesMixin, SoftDeleteMixin, models.Mode
class FileValidation(CreatedModifiedMixin, TrackChangesMixin, models.Model):
track_changes_to_fields = {'is_valid', 'errors', 'warnings', 'notices', 'validation'}
track_changes_to_fields = {'is_valid', 'results'}
file = models.OneToOneField(File, related_name='validation', on_delete=models.CASCADE)
is_valid = models.BooleanField(default=False)

View File

@ -16,6 +16,11 @@ def _record_changes(sender: object, instance: files.models.File, **kwargs: objec
instance.record_status_change(was_changed, old_state, **kwargs)
def _initiate_scan(file: files.models.File) -> None:
logger.info('Initiating a scan for file pk=%s', file.pk)
files.tasks.scan(file_id=file.pk, creator=file, verbose_name=file.source.name)
@receiver(post_save, sender=files.models.File)
def _scan_new_file(
sender: object, instance: files.models.File, created: bool, **kwargs: object
@ -23,5 +28,4 @@ def _scan_new_file(
if not created:
return
logger.info('Initiating a scan for file pk=%s', instance.pk)
files.tasks.scan(file_id=instance.pk, creator=instance)
_initiate_scan(instance)

View File

@ -2,6 +2,7 @@ import logging
import os.path
from background_task import background
from background_task.tasks import TaskSchedule
from django.conf import settings
import files.models
@ -10,7 +11,7 @@ import files.utils
logger = logging.getLogger(__name__)
@background()
@background(schedule={'action': TaskSchedule.RESCHEDULE_EXISTING})
def scan(file_id: int):
"""Run a scan on a given file and save its output as a FileValidation record."""
file = files.models.File.objects.get(pk=file_id)