Scan files with clamdscan #77
@ -18,7 +18,7 @@ def _record_changes(sender: object, instance: files.models.File, **kwargs: objec
|
|||||||
|
|
||||||
def _initiate_scan(file: files.models.File) -> None:
|
def _initiate_scan(file: files.models.File) -> None:
|
||||||
logger.info('Initiating a scan for file pk=%s', file.pk)
|
logger.info('Initiating a scan for file pk=%s', file.pk)
|
||||||
files.tasks.scan(file_id=file.pk, creator=file, verbose_name=file.source.name)
|
files.tasks.clamdscan(file_id=file.pk, creator=file, verbose_name=file.source.name)
|
||||||
|
|
||||||
|
|
||||||
@receiver(post_save, sender=files.models.File)
|
@receiver(post_save, sender=files.models.File)
|
||||||
|
@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
@background(schedule={'action': TaskSchedule.RESCHEDULE_EXISTING})
|
@background(schedule={'action': TaskSchedule.RESCHEDULE_EXISTING})
|
||||||
def scan(file_id: int):
|
def clamdscan(file_id: int):
|
||||||
"""Run a scan of a given file and save its output as a FileValidation record."""
|
"""Run a scan of a given file and save its output as a FileValidation record."""
|
||||||
file = files.models.File.objects.get(pk=file_id)
|
file = files.models.File.objects.get(pk=file_id)
|
||||||
abs_path = os.path.join(settings.MEDIA_ROOT, file.source.path)
|
abs_path = os.path.join(settings.MEDIA_ROOT, file.source.path)
|
||||||
@ -25,7 +25,7 @@ def scan(file_id: int):
|
|||||||
}
|
}
|
||||||
logger.info('File pk=%s scanned: exit code %s', file.pk, completed_process.returncode)
|
logger.info('File pk=%s scanned: exit code %s', file.pk, completed_process.returncode)
|
||||||
file_validation, is_new = files.models.FileValidation.objects.get_or_create(
|
file_validation, is_new = files.models.FileValidation.objects.get_or_create(
|
||||||
file=file, defaults={'results': {completed_process.args[0]: scan_result}}
|
file=file, defaults={'results': {'clamdscan': scan_result}}
|
||||||
)
|
)
|
||||||
file_validation.is_ok = completed_process.returncode == 0
|
file_validation.is_ok = completed_process.returncode == 0
|
||||||
file_validation.save()
|
file_validation.save()
|
||||||
|
@ -35,12 +35,12 @@ class FileScanTest(TestCase):
|
|||||||
# A background task should have been created
|
# A background task should have been created
|
||||||
task = Task.objects.created_by(creator=file).first()
|
task = Task.objects.created_by(creator=file).first()
|
||||||
self.assertIsNotNone(task)
|
self.assertIsNotNone(task)
|
||||||
self.assertEqual(task.task_name, 'files.tasks.scan')
|
self.assertEqual(task.task_name, 'files.tasks.clamdscan')
|
||||||
self.assertEqual(task.task_params, f'[[], {{"file_id": {file.pk}}}]')
|
self.assertEqual(task.task_params, f'[[], {{"file_id": {file.pk}}}]')
|
||||||
|
|
||||||
# Actually run the task as if by background runner
|
# Actually run the task as if by background runner
|
||||||
task_args, task_kwargs = task.params()
|
task_args, task_kwargs = task.params()
|
||||||
files.tasks.scan.task_function(*task_args, **task_kwargs)
|
files.tasks.clamdscan.task_function(*task_args, **task_kwargs)
|
||||||
|
|
||||||
self.assertFalse(file.validation.is_ok)
|
self.assertFalse(file.validation.is_ok)
|
||||||
result = file.validation.results['clamdscan']
|
result = file.validation.results['clamdscan']
|
||||||
@ -59,12 +59,12 @@ class FileScanTest(TestCase):
|
|||||||
# A background task should have been created
|
# A background task should have been created
|
||||||
task = Task.objects.created_by(creator=file).first()
|
task = Task.objects.created_by(creator=file).first()
|
||||||
self.assertIsNotNone(task)
|
self.assertIsNotNone(task)
|
||||||
self.assertEqual(task.task_name, 'files.tasks.scan')
|
self.assertEqual(task.task_name, 'files.tasks.clamdscan')
|
||||||
self.assertEqual(task.task_params, f'[[], {{"file_id": {file.pk}}}]')
|
self.assertEqual(task.task_params, f'[[], {{"file_id": {file.pk}}}]')
|
||||||
|
|
||||||
# Actually run the task as if by background runner
|
# Actually run the task as if by background runner
|
||||||
task_args, task_kwargs = task.params()
|
task_args, task_kwargs = task.params()
|
||||||
files.tasks.scan.task_function(*task_args, **task_kwargs)
|
files.tasks.clamdscan.task_function(*task_args, **task_kwargs)
|
||||||
|
|
||||||
self.assertTrue(file.validation.is_ok)
|
self.assertTrue(file.validation.is_ok)
|
||||||
result = file.validation.results['clamdscan']
|
result = file.validation.results['clamdscan']
|
||||||
|
Loading…
Reference in New Issue
Block a user