Browse Source

Merge pull request #123 from bcgov/DIV-1001

Hooked up ClamAV and fixed tests
pull/172/head
Michael Olund 5 years ago
committed by GitHub
parent
commit
c13435c7ad
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 38 deletions
  1. +2
    -18
      edivorce/apps/core/serializer.py
  2. +11
    -3
      edivorce/apps/core/tests/test_api.py
  3. +22
    -16
      edivorce/apps/core/tests/test_upload.py
  4. +19
    -1
      edivorce/apps/core/validators.py

+ 2
- 18
edivorce/apps/core/serializer.py View File

@ -3,6 +3,7 @@ from rest_framework import serializers
from rest_framework.exceptions import ValidationError from rest_framework.exceptions import ValidationError
from .models import Document, UserResponse from .models import Document, UserResponse
from .validators import *
class UserResponseSerializer(serializers.ModelSerializer): class UserResponseSerializer(serializers.ModelSerializer):
@ -23,22 +24,10 @@ class UserResponseSerializer(serializers.ModelSerializer):
instance.save() instance.save()
def valid_file_extension(file):
extension = file.name.split('.')[-1]
if extension.lower() not in ['pdf', 'png', 'gif', 'jpg', 'jpe', 'jpeg']:
raise ValidationError(f'File type not supported: {extension}')
def valid_doc_type(value):
valid_codes = ['AAI', 'AFDO', 'AFTL', 'CSA', 'EFSS', 'MC', 'NCV', 'OFI', 'RDP']
if value.upper() not in valid_codes:
raise ValidationError(f'Doc type not supported: {value}. Valid codes: {", ".join(valid_codes)}')
class CreateDocumentSerializer(serializers.ModelSerializer): class CreateDocumentSerializer(serializers.ModelSerializer):
doc_type = serializers.CharField(required=True, validators=[valid_doc_type]) doc_type = serializers.CharField(required=True, validators=[valid_doc_type])
party_code = serializers.IntegerField(min_value=0, max_value=2, required=True) party_code = serializers.IntegerField(min_value=0, max_value=2, required=True)
file = serializers.FileField(required=True, validators=[valid_file_extension])
file = serializers.FileField(required=True, validators=[valid_file_extension,file_scan_validation])
filename = serializers.CharField(read_only=True) filename = serializers.CharField(read_only=True)
size = serializers.IntegerField(read_only=True) size = serializers.IntegerField(read_only=True)
rotation = serializers.IntegerField(read_only=True) rotation = serializers.IntegerField(read_only=True)
@ -62,11 +51,6 @@ class CreateDocumentSerializer(serializers.ModelSerializer):
return response return response
def valid_rotation(value):
if value % 90 != 0:
raise serializers.ValidationError('Rotation must be 0, 90, 180, or 270')
class DocumentMetadataSerializer(serializers.ModelSerializer): class DocumentMetadataSerializer(serializers.ModelSerializer):
doc_type = serializers.CharField(read_only=True) doc_type = serializers.CharField(read_only=True)
party_code = serializers.IntegerField(read_only=True) party_code = serializers.IntegerField(read_only=True)


+ 11
- 3
edivorce/apps/core/tests/test_api.py View File

@ -72,7 +72,10 @@ class APITest(APITestCase):
'party_code': 1 'party_code': 1
} }
self.client.force_authenticate(self.user) self.client.force_authenticate(self.user)
response = self.client.post(url, data)
with self.settings(CLAMAV_ENABLED=False):
response = self.client.post(url, data)
self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(Document.objects.count(), 1) self.assertEqual(Document.objects.count(), 1)
@ -97,13 +100,18 @@ class APITest(APITestCase):
'party_code': 1 'party_code': 1
} }
self.client.force_authenticate(self.user) self.client.force_authenticate(self.user)
response = self.client.post(url, data)
with self.settings(CLAMAV_ENABLED=False):
response = self.client.post(url, data)
self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(Document.objects.count(), 1) self.assertEqual(Document.objects.count(), 1)
file.seek(0) # file.seek(0) #
response = self.client.post(url, data)
with self.settings(CLAMAV_ENABLED=False):
response = self.client.post(url, data)
self.assertContains(response, self.assertContains(response,
'This file appears to have already been uploaded for this document.', 'This file appears to have already been uploaded for this document.',
status_code=status.HTTP_400_BAD_REQUEST) status_code=status.HTTP_400_BAD_REQUEST)


+ 22
- 16
edivorce/apps/core/tests/test_upload.py View File

@ -2,15 +2,21 @@ import clamd
from clamd import BufferTooLongError from clamd import BufferTooLongError
from unittest import mock from unittest import mock
from django import forms
from django.test import TestCase from django.test import TestCase
from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files.uploadedfile import SimpleUploadedFile
from rest_framework import serializers
from rest_framework.test import APIRequestFactory
from ..validators import file_scan_validation from ..validators import file_scan_validation
from ..models import Document
class TestUploadForm(forms.Form):
upload = forms.FileField(validators=[file_scan_validation])
class TestUploadSerializer(serializers.ModelSerializer):
upload = serializers.FileField(validators=[file_scan_validation])
class Meta:
model = Document
fields = ('upload', 'filename')
class UploadScanTests(TestCase): class UploadScanTests(TestCase):
@ -18,17 +24,17 @@ class UploadScanTests(TestCase):
def test_validation_disabled(self): def test_validation_disabled(self):
with self.settings(CLAMAV_ENABLED=False): with self.settings(CLAMAV_ENABLED=False):
infected = SimpleUploadedFile('infected.txt', clamd.EICAR) infected = SimpleUploadedFile('infected.txt', clamd.EICAR)
form = TestUploadForm(files={'upload': infected})
serializer = TestUploadSerializer(data={'upload': infected})
self.assertTrue(form.is_valid(), form.errors)
self.assertTrue(serializer.is_valid(), serializer.errors)
def test_validation_invalid_network_connection(self): def test_validation_invalid_network_connection(self):
with self.settings(CLAMAV_TCP_PORT=9999): with self.settings(CLAMAV_TCP_PORT=9999):
infected = SimpleUploadedFile('infected.txt', clamd.EICAR) infected = SimpleUploadedFile('infected.txt', clamd.EICAR)
form = TestUploadForm(files={'upload': infected})
serializer = TestUploadSerializer(data={'upload': infected})
self.assertFalse(form.is_valid())
self.assertEqual(form.errors['upload'][0], 'Unable to scan file.')
self.assertFalse(serializer.is_valid())
self.assertEqual(serializer.errors['upload'][0], 'Unable to scan file.')
@mock.patch('clamd.ClamdNetworkSocket.instream') @mock.patch('clamd.ClamdNetworkSocket.instream')
def test_validation_buffer_overflow(self, mock_clam): def test_validation_buffer_overflow(self, mock_clam):
@ -36,26 +42,26 @@ class UploadScanTests(TestCase):
# by default clamav has a 10mb limit for instream # by default clamav has a 10mb limit for instream
clean = SimpleUploadedFile('clean.txt', b'clean file') clean = SimpleUploadedFile('clean.txt', b'clean file')
form = TestUploadForm(files={'upload': clean})
serializer = TestUploadSerializer(data={'upload': clean})
self.assertFalse(form.is_valid())
self.assertEqual(form.errors['upload'][0], 'Unable to scan file.')
self.assertFalse(serializer.is_valid())
self.assertEqual(serializer.errors['upload'][0], 'Unable to scan file.')
@mock.patch('clamd.ClamdNetworkSocket.instream') @mock.patch('clamd.ClamdNetworkSocket.instream')
def test_validation_virus_found(self, mock_clam): def test_validation_virus_found(self, mock_clam):
mock_clam.return_value = {'stream': ('FOUND', 'Eicar-Test-Signature')} mock_clam.return_value = {'stream': ('FOUND', 'Eicar-Test-Signature')}
infected = SimpleUploadedFile('infected.txt', clamd.EICAR) infected = SimpleUploadedFile('infected.txt', clamd.EICAR)
form = TestUploadForm(files={'upload': infected})
serializer = TestUploadSerializer(data={'upload': infected})
self.assertFalse(form.is_valid())
self.assertEqual(form.errors['upload'][0], 'Infected file found.')
self.assertFalse(serializer.is_valid())
self.assertEqual(serializer.errors['upload'][0], 'Infected file found.')
@mock.patch('clamd.ClamdNetworkSocket.instream') @mock.patch('clamd.ClamdNetworkSocket.instream')
def test_validation_no_virus_found(self, mock_clam): def test_validation_no_virus_found(self, mock_clam):
mock_clam.return_value = {'stream': ('OK', None)} mock_clam.return_value = {'stream': ('OK', None)}
clean = SimpleUploadedFile('clean.txt', b'clean file') clean = SimpleUploadedFile('clean.txt', b'clean file')
form = TestUploadForm(files={'upload': clean})
serializer = TestUploadSerializer(data={'upload': clean})
self.assertTrue(form.is_valid())
self.assertTrue(serializer.is_valid())

+ 19
- 1
edivorce/apps/core/validators.py View File

@ -2,7 +2,8 @@ import logging
import clamd import clamd
import sys import sys
from django.core.exceptions import ValidationError
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from django.conf import settings from django.conf import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -42,3 +43,20 @@ def file_scan_validation(file):
if result and result['stream'][0] == 'FOUND': if result and result['stream'][0] == 'FOUND':
logger.warning('Virus found: {}'.format(file.name)) logger.warning('Virus found: {}'.format(file.name))
raise ValidationError('Infected file found.', code='infected') raise ValidationError('Infected file found.', code='infected')
def valid_file_extension(file):
extension = file.name.split('.')[-1]
if extension.lower() not in ['pdf', 'png', 'gif', 'jpg', 'jpe', 'jpeg']:
raise ValidationError(f'File type not supported: {extension}')
def valid_doc_type(value):
valid_codes = ['AAI', 'AFDO', 'AFTL', 'CSA', 'EFSS', 'MC', 'NCV', 'OFI', 'RDP']
if value.upper() not in valid_codes:
raise ValidationError(f'Doc type not supported: {value}. Valid codes: {", ".join(valid_codes)}')
def valid_rotation(value):
if value % 90 != 0:
raise serializers.ValidationError('Rotation must be 0, 90, 180, or 270')

Loading…
Cancel
Save