Scan files with clamdscan #77
@ -18,7 +18,7 @@ def clamdscan(file_id: int):
|
|||||||
abs_path = os.path.join(settings.MEDIA_ROOT, file.source.path)
|
abs_path = os.path.join(settings.MEDIA_ROOT, file.source.path)
|
||||||
completed_process = files.utils.run_clamdscan(abs_path)
|
completed_process = files.utils.run_clamdscan(abs_path)
|
||||||
logger.info('File pk=%s scanned: exit code %s', file.pk, completed_process.returncode)
|
logger.info('File pk=%s scanned: exit code %s', file.pk, completed_process.returncode)
|
||||||
results = {
|
scan_result = {
|
||||||
'clamdscan': {
|
'clamdscan': {
|
||||||
'args': completed_process.args,
|
'args': completed_process.args,
|
||||||
'stdout': completed_process.stdout.decode(),
|
'stdout': completed_process.stdout.decode(),
|
||||||
@ -28,9 +28,9 @@ def clamdscan(file_id: int):
|
|||||||
}
|
}
|
||||||
is_ok = completed_process.returncode == 0
|
is_ok = completed_process.returncode == 0
|
||||||
file_validation, is_new = files.models.FileValidation.objects.get_or_create(
|
file_validation, is_new = files.models.FileValidation.objects.get_or_create(
|
||||||
file=file, defaults={'results': results, 'is_ok': is_ok}
|
file=file, defaults={'results': scan_result, 'is_ok': is_ok}
|
||||||
)
|
)
|
||||||
if not is_new:
|
if not is_new:
|
||||||
file_validation.results = results
|
file_validation.results = scan_result
|
||||||
file_validation.is_ok = is_ok
|
file_validation.is_ok = is_ok
|
||||||
file_validation.save(update_fields={'results', 'is_ok'})
|
file_validation.save(update_fields={'results', 'is_ok'})
|
||||||
|
@ -8,6 +8,7 @@ from django.conf import settings
|
|||||||
from django.test import TestCase, override_settings
|
from django.test import TestCase, override_settings
|
||||||
|
|
||||||
from common.tests.factories.files import FileFactory
|
from common.tests.factories.files import FileFactory
|
||||||
|
import files.models
|
||||||
import files.tasks
|
import files.tasks
|
||||||
|
|
||||||
|
|
||||||
@ -31,6 +32,39 @@ class FileScanTest(TestCase):
|
|||||||
test_file.write(test_content)
|
test_file.write(test_content)
|
||||||
|
|
||||||
file = FileFactory(source=test_file_path)
|
file = FileFactory(source=test_file_path)
|
||||||
|
self.assertFalse(hasattr(file, 'validation'))
|
||||||
|
|
||||||
|
# A background task should have been created
|
||||||
|
task = Task.objects.created_by(creator=file).first()
|
||||||
|
self.assertIsNotNone(task)
|
||||||
|
self.assertEqual(task.task_name, 'files.tasks.clamdscan')
|
||||||
|
self.assertEqual(task.task_params, f'[[], {{"file_id": {file.pk}}}]')
|
||||||
|
|
||||||
|
# Actually run the task as if by background runner
|
||||||
|
task_args, task_kwargs = task.params()
|
||||||
|
files.tasks.clamdscan.task_function(*task_args, **task_kwargs)
|
||||||
|
|
||||||
|
file.refresh_from_db()
|
||||||
|
self.assertFalse(file.validation.is_ok)
|
||||||
|
result = file.validation.results['clamdscan']
|
||||||
|
self.assertEqual(result['returncode'], 1)
|
||||||
|
stdout_lines = result['stdout'].split('\n')
|
||||||
|
self.assertIn(f'{file.source.name}: Win.Test.EICAR_HDB-1 FOUND', stdout_lines[0])
|
||||||
|
self.assertEqual(result['stderr'], '')
|
||||||
|
|
||||||
|
def test_scan_flags_found_invalid_updates_existing_validation(self):
|
||||||
|
test_file_path = os.path.join(self.temp_directory, 'test_file.zip')
|
||||||
|
test_content = (
|
||||||
|
b'X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*' # noqa: W605
|
||||||
|
)
|
||||||
|
with open(test_file_path, 'wb+') as test_file:
|
||||||
|
test_file.write(test_content)
|
||||||
|
|
||||||
|
file = FileFactory(source=test_file_path)
|
||||||
|
# Make sure validation record exists before scanner runs
|
||||||
|
existing_validation = files.models.FileValidation(file=file, results={})
|
||||||
|
existing_validation.save()
|
||||||
|
self.assertTrue(hasattr(file, 'validation'))
|
||||||
|
|
||||||
# A background task should have been created
|
# A background task should have been created
|
||||||
task = Task.objects.created_by(creator=file).first()
|
task = Task.objects.created_by(creator=file).first()
|
||||||
@ -43,11 +77,13 @@ class FileScanTest(TestCase):
|
|||||||
files.tasks.clamdscan.task_function(*task_args, **task_kwargs)
|
files.tasks.clamdscan.task_function(*task_args, **task_kwargs)
|
||||||
|
|
||||||
self.assertFalse(file.validation.is_ok)
|
self.assertFalse(file.validation.is_ok)
|
||||||
|
file.validation.refresh_from_db()
|
||||||
result = file.validation.results['clamdscan']
|
result = file.validation.results['clamdscan']
|
||||||
self.assertEqual(result['returncode'], 1)
|
self.assertEqual(result['returncode'], 1)
|
||||||
stdout_lines = result['stdout'].split('\n')
|
stdout_lines = result['stdout'].split('\n')
|
||||||
self.assertIn(f'{file.source.name}: Win.Test.EICAR_HDB-1 FOUND', stdout_lines[0])
|
self.assertIn(f'{file.source.name}: Win.Test.EICAR_HDB-1 FOUND', stdout_lines[0])
|
||||||
self.assertEqual(result['stderr'], '')
|
self.assertEqual(result['stderr'], '')
|
||||||
|
self.assertEqual(existing_validation.pk, file.validation.pk)
|
||||||
|
|
||||||
def test_scan_flags_nothing_found_valid(self):
|
def test_scan_flags_nothing_found_valid(self):
|
||||||
test_file_path = os.path.join(self.temp_directory, 'test_file.zip')
|
test_file_path = os.path.join(self.temp_directory, 'test_file.zip')
|
||||||
@ -55,6 +91,7 @@ class FileScanTest(TestCase):
|
|||||||
test_file.write(b'some file')
|
test_file.write(b'some file')
|
||||||
|
|
||||||
file = FileFactory(source=test_file_path)
|
file = FileFactory(source=test_file_path)
|
||||||
|
self.assertFalse(hasattr(file, 'validation'))
|
||||||
|
|
||||||
# A background task should have been created
|
# A background task should have been created
|
||||||
task = Task.objects.created_by(creator=file).first()
|
task = Task.objects.created_by(creator=file).first()
|
||||||
@ -66,6 +103,7 @@ class FileScanTest(TestCase):
|
|||||||
task_args, task_kwargs = task.params()
|
task_args, task_kwargs = task.params()
|
||||||
files.tasks.clamdscan.task_function(*task_args, **task_kwargs)
|
files.tasks.clamdscan.task_function(*task_args, **task_kwargs)
|
||||||
|
|
||||||
|
file.refresh_from_db()
|
||||||
self.assertTrue(file.validation.is_ok)
|
self.assertTrue(file.validation.is_ok)
|
||||||
result = file.validation.results['clamdscan']
|
result = file.validation.results['clamdscan']
|
||||||
self.assertEqual(result['returncode'], 0)
|
self.assertEqual(result['returncode'], 0)
|
||||||
|
Loading…
Reference in New Issue
Block a user