Scan files with clamdscan #77

Merged
Anna Sirota merged 17 commits from scan-file into main 2024-04-12 19:11:30 +02:00
3 changed files with 75 additions and 72 deletions
Showing only changes of commit da1522599e - Show all commits

View File

@ -1 +0,0 @@
X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*

View File

@ -1,19 +1,12 @@
import json import json
import os
import shutil
import tempfile
import unittest
from background_task.models import Task
from django.conf import settings
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.test import TestCase, override_settings from django.test import TestCase
from common.admin import get_admin_change_path from common.admin import get_admin_change_path
from common.log_entries import entries_for from common.log_entries import entries_for
from common.tests.factories.files import FileFactory from common.tests.factories.files import FileFactory
from files.models import File from files.models import File
import files.tasks
User = get_user_model() User = get_user_model()
@ -83,66 +76,3 @@ class FileTest(TestCase):
response = self.client.get(path) response = self.client.get(path)
self.assertEqual(response.status_code, 200, path) self.assertEqual(response.status_code, 200, path)
@unittest.skipUnless(shutil.which('clamdscan'), 'requires clamdscan')
@override_settings(MEDIA_ROOT='/tmp/')
class FileScanTest(TestCase):
def setUp(self):
super().setUp()
self.temp_directory = tempfile.mkdtemp(prefix=settings.MEDIA_ROOT)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.temp_directory)
def test_scan_flags_found_invalid(self):
test_file_path = os.path.join(self.temp_directory, 'test_file.zip')
test_content = (
b'X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*' # noqa: W605
)
with open(test_file_path, 'wb+') as test_file:
test_file.write(test_content)
file = FileFactory(source=test_file_path)
# A background task should have been created
task = Task.objects.created_by(creator=file).first()
self.assertIsNotNone(task)
self.assertEqual(task.task_name, 'files.tasks.scan')
self.assertEqual(task.task_params, f'[[], {{"file_id": {file.pk}}}]')
# Actually run the task as if by background runner
task_args, task_kwargs = task.params()
files.tasks.scan.task_function(*task_args, **task_kwargs)
self.assertFalse(file.validation.is_valid)
result = file.validation.results['clamdscan']
self.assertEqual(result['returncode'], 1)
stdout_lines = result['stdout'].split('\n')
self.assertIn(f'{file.source.name}: Win.Test.EICAR_HDB-1 FOUND', stdout_lines[0])
self.assertEqual(result['stderr'], '')
def test_scan_flags_nothing_found_valid(self):
test_file_path = os.path.join(self.temp_directory, 'test_file.zip')
with open(test_file_path, 'wb+') as test_file:
test_file.write(b'some file')
file = FileFactory(source=test_file_path)
# A background task should have been created
task = Task.objects.created_by(creator=file).first()
self.assertIsNotNone(task)
self.assertEqual(task.task_name, 'files.tasks.scan')
self.assertEqual(task.task_params, f'[[], {{"file_id": {file.pk}}}]')
# Actually run the task as if by background runner
task_args, task_kwargs = task.params()
files.tasks.scan.task_function(*task_args, **task_kwargs)
self.assertTrue(file.validation.is_valid)
result = file.validation.results['clamdscan']
self.assertEqual(result['returncode'], 0)
stdout_lines = result['stdout'].split('\n')
self.assertIn(f'{file.source.name}: OK', stdout_lines[0])
self.assertEqual(result['stderr'], '')

View File

@ -0,0 +1,74 @@
import os
import shutil
import tempfile
import unittest
from background_task.models import Task
from django.conf import settings
from django.test import TestCase, override_settings
from common.tests.factories.files import FileFactory
import files.tasks
@unittest.skipUnless(shutil.which('clamdscan'), 'requires clamdscan')
@override_settings(MEDIA_ROOT='/tmp/')
class FileScanTest(TestCase):
def setUp(self):
super().setUp()
self.temp_directory = tempfile.mkdtemp(prefix=settings.MEDIA_ROOT)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.temp_directory)
def test_scan_flags_found_invalid(self):
test_file_path = os.path.join(self.temp_directory, 'test_file.zip')
test_content = (
b'X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*' # noqa: W605
)
with open(test_file_path, 'wb+') as test_file:
test_file.write(test_content)
file = FileFactory(source=test_file_path)
# A background task should have been created
task = Task.objects.created_by(creator=file).first()
self.assertIsNotNone(task)
self.assertEqual(task.task_name, 'files.tasks.scan')
self.assertEqual(task.task_params, f'[[], {{"file_id": {file.pk}}}]')
# Actually run the task as if by background runner
task_args, task_kwargs = task.params()
files.tasks.scan.task_function(*task_args, **task_kwargs)
self.assertFalse(file.validation.is_valid)
result = file.validation.results['clamdscan']
self.assertEqual(result['returncode'], 1)
stdout_lines = result['stdout'].split('\n')
self.assertIn(f'{file.source.name}: Win.Test.EICAR_HDB-1 FOUND', stdout_lines[0])
self.assertEqual(result['stderr'], '')
def test_scan_flags_nothing_found_valid(self):
test_file_path = os.path.join(self.temp_directory, 'test_file.zip')
with open(test_file_path, 'wb+') as test_file:
test_file.write(b'some file')
file = FileFactory(source=test_file_path)
# A background task should have been created
task = Task.objects.created_by(creator=file).first()
self.assertIsNotNone(task)
self.assertEqual(task.task_name, 'files.tasks.scan')
self.assertEqual(task.task_params, f'[[], {{"file_id": {file.pk}}}]')
# Actually run the task as if by background runner
task_args, task_kwargs = task.params()
files.tasks.scan.task_function(*task_args, **task_kwargs)
self.assertTrue(file.validation.is_valid)
result = file.validation.results['clamdscan']
self.assertEqual(result['returncode'], 0)
stdout_lines = result['stdout'].split('\n')
self.assertIn(f'{file.source.name}: OK', stdout_lines[0])
self.assertEqual(result['stderr'], '')