448 lines
16 KiB
Python
448 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Comprehensive Backend Test Suite for PunimTag
|
|
Tests all backend functionality including face clustering, enhanced recognition, and complex queries
|
|
"""
|
|
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
import unittest
|
|
import uuid
|
|
import pickle
|
|
from datetime import datetime, timedelta
|
|
import numpy as np
|
|
from punimtag import PunimTag
|
|
from config import PunimTagConfig, create_default_config
|
|
from typing import List
|
|
|
|
|
|
class TestBackendFunctionality(unittest.TestCase):
|
|
"""Test all backend features thoroughly"""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment with temporary database and config"""
|
|
self.test_dir = tempfile.mkdtemp()
|
|
self.db_path = os.path.join(self.test_dir, 'test.db')
|
|
self.photos_dir = os.path.join(self.test_dir, 'photos')
|
|
self.config_path = os.path.join(self.test_dir, 'test_config.json')
|
|
|
|
os.makedirs(self.photos_dir, exist_ok=True)
|
|
|
|
# Create test configuration
|
|
self.config = PunimTagConfig(self.config_path)
|
|
self.config.face_recognition.confidence_threshold = 0.5
|
|
self.config.auto_tagging.enabled = True
|
|
self.config.processing.batch_size = 50
|
|
self.config.save()
|
|
|
|
# Initialize PunimTag with test database
|
|
self.tagger = PunimTag(db_path=self.db_path, photos_dir=self.photos_dir)
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment"""
|
|
self.tagger.close()
|
|
shutil.rmtree(self.test_dir)
|
|
|
|
def test_configuration_system(self):
|
|
"""Test configuration loading and saving"""
|
|
# Test default values
|
|
self.assertEqual(self.config.face_recognition.confidence_threshold, 0.5)
|
|
self.assertTrue(self.config.auto_tagging.enabled)
|
|
|
|
# Test updating settings
|
|
success = self.config.update_setting('face_recognition', 'confidence_threshold', 0.7)
|
|
self.assertTrue(success)
|
|
self.assertEqual(self.config.face_recognition.confidence_threshold, 0.7)
|
|
|
|
# Test getting settings
|
|
value = self.config.get_setting('processing', 'batch_size')
|
|
self.assertEqual(value, 50)
|
|
|
|
# Test tag suggestions
|
|
event_tags = self.config.get_tag_suggestions('event')
|
|
self.assertIn('wedding', event_tags)
|
|
self.assertIn('bar_mitzvah', event_tags)
|
|
|
|
def test_jewish_org_tags(self):
|
|
"""Test Jewish organization specific tag functionality"""
|
|
# Test adding Jewish event tags
|
|
for tag_name in ['shabbat', 'chanukah', 'passover']:
|
|
tag_id = self.tagger.add_tag(tag_name, 'event')
|
|
self.assertIsNotNone(tag_id)
|
|
|
|
# Test location tags
|
|
for tag_name in ['synagogue', 'sanctuary', 'sukkah']:
|
|
tag_id = self.tagger.add_tag(tag_name, 'location')
|
|
self.assertIsNotNone(tag_id)
|
|
|
|
# Verify tags exist in database
|
|
c = self.tagger.conn.cursor()
|
|
c.execute("SELECT COUNT(*) FROM tags WHERE category = 'event'")
|
|
event_count = c.fetchone()[0]
|
|
self.assertGreaterEqual(event_count, 3)
|
|
|
|
def test_face_clustering(self):
|
|
"""Test face clustering functionality"""
|
|
# Create mock face data
|
|
face_ids = self._create_mock_faces(10)
|
|
|
|
# Test clustering
|
|
clusters = self.tagger.cluster_unknown_faces()
|
|
self.assertIsInstance(clusters, dict)
|
|
|
|
# Test getting cluster data
|
|
cluster_data = self.tagger.get_face_clusters()
|
|
self.assertIsInstance(cluster_data, list)
|
|
|
|
# Each cluster should have required fields
|
|
for cluster in cluster_data:
|
|
self.assertIn('cluster_id', cluster)
|
|
self.assertIn('face_count', cluster)
|
|
self.assertIn('face_ids', cluster)
|
|
self.assertIn('representative_face', cluster)
|
|
|
|
def test_cluster_assignment(self):
|
|
"""Test assigning clusters to people"""
|
|
# Create mock faces and cluster them
|
|
face_ids = self._create_mock_faces(5)
|
|
clusters = self.tagger.cluster_unknown_faces()
|
|
|
|
if clusters:
|
|
cluster_id = list(clusters.keys())[0]
|
|
success = self.tagger.assign_cluster_to_person(cluster_id, "Test Person")
|
|
self.assertTrue(success)
|
|
|
|
# Verify assignment
|
|
c = self.tagger.conn.cursor()
|
|
c.execute("SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL")
|
|
assigned_count = c.fetchone()[0]
|
|
self.assertGreater(assigned_count, 0)
|
|
|
|
def test_most_common_faces(self):
|
|
"""Test getting most frequently photographed people"""
|
|
# Add some people and faces
|
|
person1_id = self.tagger.add_person("John Doe")
|
|
person2_id = self.tagger.add_person("Jane Smith")
|
|
|
|
# Create mock faces assigned to people
|
|
face_ids = self._create_mock_faces(10)
|
|
|
|
# Assign faces to people
|
|
for i, face_id in enumerate(face_ids[:5]):
|
|
self.tagger.assign_face_to_person(face_id, person1_id, True)
|
|
|
|
for face_id in face_ids[5:7]:
|
|
self.tagger.assign_face_to_person(face_id, person2_id, True)
|
|
|
|
# Test getting most common faces
|
|
common_faces = self.tagger.get_most_common_faces(limit=10)
|
|
self.assertIsInstance(common_faces, list)
|
|
|
|
if common_faces:
|
|
# Should be sorted by face count (John Doe should be first)
|
|
self.assertEqual(common_faces[0]['name'], "John Doe")
|
|
self.assertEqual(common_faces[0]['face_count'], 5)
|
|
|
|
def test_face_verification(self):
|
|
"""Test face verification functionality"""
|
|
person_id = self.tagger.add_person("Test Person")
|
|
face_ids = self._create_mock_faces(3)
|
|
|
|
# Assign faces to person
|
|
for face_id in face_ids:
|
|
self.tagger.assign_face_to_person(face_id, person_id, True)
|
|
|
|
# Test verification
|
|
faces = self.tagger.verify_person_faces(person_id)
|
|
self.assertEqual(len(faces), 3)
|
|
|
|
# Test removing incorrect assignment
|
|
self.tagger.remove_incorrect_face_assignment(face_ids[0])
|
|
|
|
# Verify removal
|
|
faces_after = self.tagger.verify_person_faces(person_id)
|
|
self.assertEqual(len(faces_after), 2)
|
|
|
|
def test_batch_processing(self):
|
|
"""Test batch image processing"""
|
|
# Create mock image paths
|
|
image_paths = [
|
|
os.path.join(self.photos_dir, f'test_{i}.jpg')
|
|
for i in range(5)
|
|
]
|
|
|
|
# Create empty test files
|
|
for path in image_paths:
|
|
with open(path, 'w') as f:
|
|
f.write('') # Empty file for testing
|
|
|
|
# Test batch processing (will fail on actual processing but test the logic)
|
|
try:
|
|
results = self.tagger.batch_process_images(image_paths, batch_size=2)
|
|
self.assertIn('processed', results)
|
|
self.assertIn('errors', results)
|
|
self.assertIn('skipped', results)
|
|
except Exception:
|
|
# Expected to fail with empty files, but structure should be correct
|
|
pass
|
|
|
|
def test_advanced_search(self):
|
|
"""Test advanced search functionality"""
|
|
# Setup test data
|
|
person_id = self.tagger.add_person("Search Test Person")
|
|
tag_id = self.tagger.add_tag("test_event", "event")
|
|
|
|
# Create mock image
|
|
image_id = self._create_mock_image()
|
|
|
|
# Add mock face and tag
|
|
face_id = self._create_mock_face(image_id)
|
|
self.tagger.assign_face_to_person(face_id, person_id, True)
|
|
self.tagger.tag_image(image_id, tag_id)
|
|
|
|
# Test various search scenarios
|
|
|
|
# Search by person
|
|
results = self.tagger.advanced_search(people=["Search Test Person"])
|
|
self.assertIsInstance(results, list)
|
|
|
|
# Search by tag
|
|
results = self.tagger.advanced_search(tags=["test_event"])
|
|
self.assertIsInstance(results, list)
|
|
|
|
# Search by person and tag
|
|
results = self.tagger.advanced_search(
|
|
people=["Search Test Person"],
|
|
tags=["test_event"]
|
|
)
|
|
self.assertIsInstance(results, list)
|
|
|
|
# Search with date range
|
|
today = datetime.now()
|
|
yesterday = today - timedelta(days=1)
|
|
tomorrow = today + timedelta(days=1)
|
|
|
|
results = self.tagger.advanced_search(
|
|
date_from=yesterday,
|
|
date_to=tomorrow
|
|
)
|
|
self.assertIsInstance(results, list)
|
|
|
|
# Search with location bounds
|
|
results = self.tagger.advanced_search(
|
|
latitude_min=40.0,
|
|
latitude_max=41.0,
|
|
longitude_min=-74.0,
|
|
longitude_max=-73.0
|
|
)
|
|
self.assertIsInstance(results, list)
|
|
|
|
# Search with minimum people requirement
|
|
results = self.tagger.advanced_search(min_people=1)
|
|
self.assertIsInstance(results, list)
|
|
|
|
# Search with limit
|
|
results = self.tagger.advanced_search(limit=5)
|
|
self.assertIsInstance(results, list)
|
|
self.assertLessEqual(len(results), 5)
|
|
|
|
def test_face_quality_calculation(self):
|
|
"""Test face quality scoring"""
|
|
# Test with different face sizes and encodings
|
|
small_face = (10, 30, 30, 10) # 20x20 face
|
|
large_face = (10, 110, 110, 10) # 100x100 face
|
|
|
|
encoding = np.random.rand(128)
|
|
|
|
small_quality = self.tagger.calculate_face_quality(encoding, small_face)
|
|
large_quality = self.tagger.calculate_face_quality(encoding, large_face)
|
|
|
|
# Larger faces should have higher quality scores
|
|
self.assertGreater(large_quality, small_quality)
|
|
|
|
# Quality should be between 0 and 1
|
|
self.assertGreaterEqual(small_quality, 0)
|
|
self.assertLessEqual(small_quality, 1)
|
|
self.assertGreaterEqual(large_quality, 0)
|
|
self.assertLessEqual(large_quality, 1)
|
|
|
|
def test_database_integrity(self):
|
|
"""Test database integrity and relationships"""
|
|
# Test foreign key relationships
|
|
person_id = self.tagger.add_person("Integrity Test")
|
|
image_id = self._create_mock_image()
|
|
face_id = self._create_mock_face(image_id)
|
|
tag_id = self.tagger.add_tag("integrity_test")
|
|
|
|
# Test assignments
|
|
self.tagger.assign_face_to_person(face_id, person_id, True)
|
|
self.tagger.tag_image(image_id, tag_id)
|
|
|
|
# Verify relationships exist
|
|
c = self.tagger.conn.cursor()
|
|
|
|
# Check face-person relationship
|
|
c.execute("SELECT person_id FROM faces WHERE id = ?", (face_id,))
|
|
result = c.fetchone()
|
|
self.assertEqual(result[0], person_id)
|
|
|
|
# Check image-tag relationship
|
|
c.execute("SELECT tag_id FROM image_tags WHERE image_id = ?", (image_id,))
|
|
result = c.fetchone()
|
|
self.assertEqual(result[0], tag_id)
|
|
|
|
def test_search_edge_cases(self):
|
|
"""Test search functionality with edge cases"""
|
|
# Search with empty parameters
|
|
results = self.tagger.advanced_search()
|
|
self.assertIsInstance(results, list)
|
|
|
|
# Search with non-existent person
|
|
results = self.tagger.advanced_search(people=["Non Existent Person"])
|
|
self.assertEqual(len(results), 0)
|
|
|
|
# Search with non-existent tag
|
|
results = self.tagger.advanced_search(tags=["non_existent_tag"])
|
|
self.assertEqual(len(results), 0)
|
|
|
|
# Search with invalid date range
|
|
future_date = datetime.now() + timedelta(days=365)
|
|
past_date = datetime.now() - timedelta(days=365)
|
|
|
|
results = self.tagger.advanced_search(
|
|
date_from=future_date,
|
|
date_to=past_date
|
|
)
|
|
self.assertEqual(len(results), 0)
|
|
|
|
# Helper methods
|
|
|
|
def _create_mock_image(self) -> int:
|
|
"""Create a mock image entry in database"""
|
|
import uuid
|
|
unique_path = f'test_path_{uuid.uuid4().hex[:8]}.jpg'
|
|
c = self.tagger.conn.cursor()
|
|
c.execute('''INSERT INTO images
|
|
(path, filename, date_taken, width, height, file_size)
|
|
VALUES (?, ?, ?, ?, ?, ?)''',
|
|
(unique_path, unique_path, datetime.now(),
|
|
800, 600, 12345))
|
|
self.tagger.conn.commit()
|
|
return c.lastrowid
|
|
|
|
def _create_mock_face(self, image_id: int) -> int:
|
|
"""Create a mock face entry in database"""
|
|
import pickle
|
|
encoding = np.random.rand(128)
|
|
encoding_blob = pickle.dumps(encoding)
|
|
|
|
c = self.tagger.conn.cursor()
|
|
c.execute('''INSERT INTO faces
|
|
(image_id, top, right, bottom, left, encoding)
|
|
VALUES (?, ?, ?, ?, ?, ?)''',
|
|
(image_id, 10, 110, 110, 10, encoding_blob))
|
|
self.tagger.conn.commit()
|
|
return c.lastrowid
|
|
|
|
def _create_mock_faces(self, count: int) -> List[int]:
|
|
"""Create multiple mock faces"""
|
|
face_ids = []
|
|
for i in range(count):
|
|
image_id = self._create_mock_image()
|
|
face_id = self._create_mock_face(image_id)
|
|
face_ids.append(face_id)
|
|
return face_ids
|
|
|
|
|
|
def run_performance_tests():
|
|
"""Run performance tests with larger datasets"""
|
|
print("\nRunning Performance Tests")
|
|
print("=" * 50)
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
db_path = os.path.join(temp_dir, 'perf_test.db')
|
|
tagger = PunimTag(db_path=db_path)
|
|
|
|
try:
|
|
# Test with larger numbers of faces
|
|
print("Creating 1000 mock faces...")
|
|
start_time = datetime.now()
|
|
|
|
face_ids = []
|
|
for i in range(1000):
|
|
# Create image
|
|
c = tagger.conn.cursor()
|
|
c.execute('''INSERT INTO images
|
|
(path, filename, date_taken, width, height, file_size)
|
|
VALUES (?, ?, ?, ?, ?, ?)''',
|
|
(f'perf_test_{i}_{uuid.uuid4().hex[:8]}.jpg', f'perf_test_{i}.jpg',
|
|
datetime.now(), 800, 600, 12345))
|
|
image_id = c.lastrowid
|
|
|
|
# Create face
|
|
encoding = np.random.rand(128)
|
|
encoding_blob = pickle.dumps(encoding)
|
|
c.execute('''INSERT INTO faces
|
|
(image_id, top, right, bottom, left, encoding)
|
|
VALUES (?, ?, ?, ?, ?, ?)''',
|
|
(image_id, 10, 110, 110, 10, encoding_blob))
|
|
face_ids.append(c.lastrowid)
|
|
|
|
if i % 100 == 0:
|
|
print(f"Created {i} faces...")
|
|
|
|
tagger.conn.commit()
|
|
creation_time = (datetime.now() - start_time).total_seconds()
|
|
print(f"Created 1000 faces in {creation_time:.2f} seconds")
|
|
|
|
# Test clustering performance
|
|
print("Testing clustering performance...")
|
|
start_time = datetime.now()
|
|
clusters = tagger.cluster_unknown_faces()
|
|
clustering_time = (datetime.now() - start_time).total_seconds()
|
|
print(f"Clustered faces in {clustering_time:.2f} seconds")
|
|
print(f"Found {len(clusters)} clusters")
|
|
|
|
# Test search performance
|
|
print("Testing search performance...")
|
|
start_time = datetime.now()
|
|
results = tagger.advanced_search(limit=100)
|
|
search_time = (datetime.now() - start_time).total_seconds()
|
|
print(f"Search completed in {search_time:.2f} seconds")
|
|
print(f"Found {len(results)} results")
|
|
|
|
finally:
|
|
tagger.close()
|
|
|
|
|
|
def main():
|
|
"""Main test runner"""
|
|
print("PunimTag Backend Test Suite")
|
|
print("=" * 50)
|
|
|
|
# Run unit tests
|
|
print("Running unit tests...")
|
|
loader = unittest.TestLoader()
|
|
suite = loader.loadTestsFromTestCase(TestBackendFunctionality)
|
|
runner = unittest.TextTestRunner(verbosity=2)
|
|
result = runner.run(suite)
|
|
|
|
if result.wasSuccessful():
|
|
print("\n✅ All unit tests passed!")
|
|
|
|
# Run performance tests
|
|
run_performance_tests()
|
|
|
|
print("\n🎉 Backend testing completed successfully!")
|
|
print("\nBackend is ready for UI development.")
|
|
else:
|
|
print("\n❌ Some tests failed. Please fix issues before proceeding.")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = main()
|
|
exit(0 if success else 1) |