Scan files with clamdscan #77

Merged
Anna Sirota merged 17 commits from scan-file into main 2024-04-12 19:11:30 +02:00
5 changed files with 75 additions and 3 deletions
Showing only changes of commit b1441511b6 - Show all commits

View File

@ -0,0 +1,18 @@
# Generated by Django 4.2.11 on 2024-04-11 17:13
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('files', '0004_alter_file_status'),
]
operations = [
migrations.AlterField(
model_name='filevalidation',
name='validation',
field=models.JSONField(),
),
]

View File

@ -1,12 +1,14 @@
from pathlib import Path from pathlib import Path
from typing import Dict, Any from typing import Dict, Any
import logging import logging
import os.path
from django.conf import settings
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.db import models from django.db import models
from common.model_mixins import CreatedModifiedMixin, TrackChangesMixin, SoftDeleteMixin from common.model_mixins import CreatedModifiedMixin, TrackChangesMixin, SoftDeleteMixin
from files.utils import get_sha256, guess_mimetype_from_ext from files.utils import get_sha256, guess_mimetype_from_ext, scan
from constants.base import ( from constants.base import (
FILE_STATUS_CHOICES, FILE_STATUS_CHOICES,
FILE_TYPE_CHOICES, FILE_TYPE_CHOICES,
@ -204,6 +206,25 @@ class File(CreatedModifiedMixin, TrackChangesMixin, SoftDeleteMixin, models.Mode
def get_submit_url(self) -> str: def get_submit_url(self) -> str:
return self.extension.get_draft_url() return self.extension.get_draft_url()
def scan(self) -> 'FileValidation':
"""Run a scanner on the source file and save its output as a FileValidation record."""
abs_path = os.path.join(settings.MEDIA_ROOT, self.source.path)
completed_process = scan(abs_path)
validation = {
'args': completed_process.args,
'stdout': completed_process.stdout.decode(),
'stderr': completed_process.stderr.decode(),
'returncode': completed_process.returncode,
}
file_validation, is_new = FileValidation.objects.get_or_create(
file=self, defaults={'validation': validation}
)
file_validation.is_valid = completed_process.returncode == 0
# FIXME: do we need `errors`/`warnings`/`notices` counters at all?
file_validation.errors = 1 if not file_validation.is_valid else 0
file_validation.save()
return file_validation
class FileValidation(CreatedModifiedMixin, TrackChangesMixin, models.Model): class FileValidation(CreatedModifiedMixin, TrackChangesMixin, models.Model):
track_changes_to_fields = {'is_valid', 'errors', 'warnings', 'notices', 'validation'} track_changes_to_fields = {'is_valid', 'errors', 'warnings', 'notices', 'validation'}
@ -213,4 +234,4 @@ class FileValidation(CreatedModifiedMixin, TrackChangesMixin, models.Model):
errors = models.IntegerField(default=0) errors = models.IntegerField(default=0)
warnings = models.IntegerField(default=0) warnings = models.IntegerField(default=0)
notices = models.IntegerField(default=0) notices = models.IntegerField(default=0)
validation = models.TextField() validation = models.JSONField()

View File

@ -0,0 +1 @@
X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*

View File

@ -1,7 +1,9 @@
import json import json
import shutil
import tempfile
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.test import TestCase from django.test import TestCase, override_settings
from common.admin import get_admin_change_path from common.admin import get_admin_change_path
from common.log_entries import entries_for from common.log_entries import entries_for
@ -76,3 +78,25 @@ class FileTest(TestCase):
response = self.client.get(path) response = self.client.get(path)
self.assertEqual(response.status_code, 200, path) self.assertEqual(response.status_code, 200, path)
@override_settings(MEDIA_ROOT='./files/tests/files')
class FileScanTest(TestCase):
def setUp(self):
super().setUp()
self.temp_directory = tempfile.mkdtemp()
def tearDown(self):
super().tearDown()
shutil.rmtree(self.temp_directory)
def test_scan(self):
# TODO: write the test files on the fly
file = FileFactory(source='Win.Test.EICAR_HDB-1.zip')
file_validation = file.scan()
self.assertEqual(file_validation.validation['returncode'], 1)
stdout_lines = file_validation.validation['stdout'].split('\n')
self.assertIn(f'{file.source.name}: Win.Test.EICAR_HDB-1 FOUND', stdout_lines[0])
self.assertEqual(file_validation.validation['stderr'], '')

View File

@ -4,6 +4,8 @@ import io
import logging import logging
import mimetypes import mimetypes
import os import os
import os.path
import subprocess
import toml import toml
import typing import typing
import zipfile import zipfile
@ -161,3 +163,9 @@ def guess_mimetype_from_content(file_obj) -> str:
# This file might be read again by validation or other utilities # This file might be read again by validation or other utilities
file_obj.seek(0) file_obj.seek(0)
return mimetype_from_bytes return mimetype_from_bytes
def scan(abs_path: str) -> 'subprocess.CompletedProcess':
scan_args = ['clamdscan', '--fdpass', abs_path]
logger.info('Running %s', scan_args)
return subprocess.run(scan_args, capture_output=True)