punimtag/tests/test_backend.py
2025-08-15 00:57:39 -08:00

448 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Comprehensive Backend Test Suite for PunimTag
Tests all backend functionality including face clustering, enhanced recognition, and complex queries
"""
import os
import tempfile
import shutil
import unittest
import uuid
import pickle
from datetime import datetime, timedelta
import numpy as np
from punimtag import PunimTag
from config import PunimTagConfig, create_default_config
from typing import List
class TestBackendFunctionality(unittest.TestCase):
"""Test all backend features thoroughly"""
def setUp(self):
"""Set up test environment with temporary database and config"""
self.test_dir = tempfile.mkdtemp()
self.db_path = os.path.join(self.test_dir, 'test.db')
self.photos_dir = os.path.join(self.test_dir, 'photos')
self.config_path = os.path.join(self.test_dir, 'test_config.json')
os.makedirs(self.photos_dir, exist_ok=True)
# Create test configuration
self.config = PunimTagConfig(self.config_path)
self.config.face_recognition.confidence_threshold = 0.5
self.config.auto_tagging.enabled = True
self.config.processing.batch_size = 50
self.config.save()
# Initialize PunimTag with test database
self.tagger = PunimTag(db_path=self.db_path, photos_dir=self.photos_dir)
def tearDown(self):
"""Clean up test environment"""
self.tagger.close()
shutil.rmtree(self.test_dir)
def test_configuration_system(self):
"""Test configuration loading and saving"""
# Test default values
self.assertEqual(self.config.face_recognition.confidence_threshold, 0.5)
self.assertTrue(self.config.auto_tagging.enabled)
# Test updating settings
success = self.config.update_setting('face_recognition', 'confidence_threshold', 0.7)
self.assertTrue(success)
self.assertEqual(self.config.face_recognition.confidence_threshold, 0.7)
# Test getting settings
value = self.config.get_setting('processing', 'batch_size')
self.assertEqual(value, 50)
# Test tag suggestions
event_tags = self.config.get_tag_suggestions('event')
self.assertIn('wedding', event_tags)
self.assertIn('bar_mitzvah', event_tags)
def test_jewish_org_tags(self):
"""Test Jewish organization specific tag functionality"""
# Test adding Jewish event tags
for tag_name in ['shabbat', 'chanukah', 'passover']:
tag_id = self.tagger.add_tag(tag_name, 'event')
self.assertIsNotNone(tag_id)
# Test location tags
for tag_name in ['synagogue', 'sanctuary', 'sukkah']:
tag_id = self.tagger.add_tag(tag_name, 'location')
self.assertIsNotNone(tag_id)
# Verify tags exist in database
c = self.tagger.conn.cursor()
c.execute("SELECT COUNT(*) FROM tags WHERE category = 'event'")
event_count = c.fetchone()[0]
self.assertGreaterEqual(event_count, 3)
def test_face_clustering(self):
"""Test face clustering functionality"""
# Create mock face data
face_ids = self._create_mock_faces(10)
# Test clustering
clusters = self.tagger.cluster_unknown_faces()
self.assertIsInstance(clusters, dict)
# Test getting cluster data
cluster_data = self.tagger.get_face_clusters()
self.assertIsInstance(cluster_data, list)
# Each cluster should have required fields
for cluster in cluster_data:
self.assertIn('cluster_id', cluster)
self.assertIn('face_count', cluster)
self.assertIn('face_ids', cluster)
self.assertIn('representative_face', cluster)
def test_cluster_assignment(self):
"""Test assigning clusters to people"""
# Create mock faces and cluster them
face_ids = self._create_mock_faces(5)
clusters = self.tagger.cluster_unknown_faces()
if clusters:
cluster_id = list(clusters.keys())[0]
success = self.tagger.assign_cluster_to_person(cluster_id, "Test Person")
self.assertTrue(success)
# Verify assignment
c = self.tagger.conn.cursor()
c.execute("SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL")
assigned_count = c.fetchone()[0]
self.assertGreater(assigned_count, 0)
def test_most_common_faces(self):
"""Test getting most frequently photographed people"""
# Add some people and faces
person1_id = self.tagger.add_person("John Doe")
person2_id = self.tagger.add_person("Jane Smith")
# Create mock faces assigned to people
face_ids = self._create_mock_faces(10)
# Assign faces to people
for i, face_id in enumerate(face_ids[:5]):
self.tagger.assign_face_to_person(face_id, person1_id, True)
for face_id in face_ids[5:7]:
self.tagger.assign_face_to_person(face_id, person2_id, True)
# Test getting most common faces
common_faces = self.tagger.get_most_common_faces(limit=10)
self.assertIsInstance(common_faces, list)
if common_faces:
# Should be sorted by face count (John Doe should be first)
self.assertEqual(common_faces[0]['name'], "John Doe")
self.assertEqual(common_faces[0]['face_count'], 5)
def test_face_verification(self):
"""Test face verification functionality"""
person_id = self.tagger.add_person("Test Person")
face_ids = self._create_mock_faces(3)
# Assign faces to person
for face_id in face_ids:
self.tagger.assign_face_to_person(face_id, person_id, True)
# Test verification
faces = self.tagger.verify_person_faces(person_id)
self.assertEqual(len(faces), 3)
# Test removing incorrect assignment
self.tagger.remove_incorrect_face_assignment(face_ids[0])
# Verify removal
faces_after = self.tagger.verify_person_faces(person_id)
self.assertEqual(len(faces_after), 2)
def test_batch_processing(self):
"""Test batch image processing"""
# Create mock image paths
image_paths = [
os.path.join(self.photos_dir, f'test_{i}.jpg')
for i in range(5)
]
# Create empty test files
for path in image_paths:
with open(path, 'w') as f:
f.write('') # Empty file for testing
# Test batch processing (will fail on actual processing but test the logic)
try:
results = self.tagger.batch_process_images(image_paths, batch_size=2)
self.assertIn('processed', results)
self.assertIn('errors', results)
self.assertIn('skipped', results)
except Exception:
# Expected to fail with empty files, but structure should be correct
pass
def test_advanced_search(self):
"""Test advanced search functionality"""
# Setup test data
person_id = self.tagger.add_person("Search Test Person")
tag_id = self.tagger.add_tag("test_event", "event")
# Create mock image
image_id = self._create_mock_image()
# Add mock face and tag
face_id = self._create_mock_face(image_id)
self.tagger.assign_face_to_person(face_id, person_id, True)
self.tagger.tag_image(image_id, tag_id)
# Test various search scenarios
# Search by person
results = self.tagger.advanced_search(people=["Search Test Person"])
self.assertIsInstance(results, list)
# Search by tag
results = self.tagger.advanced_search(tags=["test_event"])
self.assertIsInstance(results, list)
# Search by person and tag
results = self.tagger.advanced_search(
people=["Search Test Person"],
tags=["test_event"]
)
self.assertIsInstance(results, list)
# Search with date range
today = datetime.now()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)
results = self.tagger.advanced_search(
date_from=yesterday,
date_to=tomorrow
)
self.assertIsInstance(results, list)
# Search with location bounds
results = self.tagger.advanced_search(
latitude_min=40.0,
latitude_max=41.0,
longitude_min=-74.0,
longitude_max=-73.0
)
self.assertIsInstance(results, list)
# Search with minimum people requirement
results = self.tagger.advanced_search(min_people=1)
self.assertIsInstance(results, list)
# Search with limit
results = self.tagger.advanced_search(limit=5)
self.assertIsInstance(results, list)
self.assertLessEqual(len(results), 5)
def test_face_quality_calculation(self):
"""Test face quality scoring"""
# Test with different face sizes and encodings
small_face = (10, 30, 30, 10) # 20x20 face
large_face = (10, 110, 110, 10) # 100x100 face
encoding = np.random.rand(128)
small_quality = self.tagger.calculate_face_quality(encoding, small_face)
large_quality = self.tagger.calculate_face_quality(encoding, large_face)
# Larger faces should have higher quality scores
self.assertGreater(large_quality, small_quality)
# Quality should be between 0 and 1
self.assertGreaterEqual(small_quality, 0)
self.assertLessEqual(small_quality, 1)
self.assertGreaterEqual(large_quality, 0)
self.assertLessEqual(large_quality, 1)
def test_database_integrity(self):
"""Test database integrity and relationships"""
# Test foreign key relationships
person_id = self.tagger.add_person("Integrity Test")
image_id = self._create_mock_image()
face_id = self._create_mock_face(image_id)
tag_id = self.tagger.add_tag("integrity_test")
# Test assignments
self.tagger.assign_face_to_person(face_id, person_id, True)
self.tagger.tag_image(image_id, tag_id)
# Verify relationships exist
c = self.tagger.conn.cursor()
# Check face-person relationship
c.execute("SELECT person_id FROM faces WHERE id = ?", (face_id,))
result = c.fetchone()
self.assertEqual(result[0], person_id)
# Check image-tag relationship
c.execute("SELECT tag_id FROM image_tags WHERE image_id = ?", (image_id,))
result = c.fetchone()
self.assertEqual(result[0], tag_id)
def test_search_edge_cases(self):
"""Test search functionality with edge cases"""
# Search with empty parameters
results = self.tagger.advanced_search()
self.assertIsInstance(results, list)
# Search with non-existent person
results = self.tagger.advanced_search(people=["Non Existent Person"])
self.assertEqual(len(results), 0)
# Search with non-existent tag
results = self.tagger.advanced_search(tags=["non_existent_tag"])
self.assertEqual(len(results), 0)
# Search with invalid date range
future_date = datetime.now() + timedelta(days=365)
past_date = datetime.now() - timedelta(days=365)
results = self.tagger.advanced_search(
date_from=future_date,
date_to=past_date
)
self.assertEqual(len(results), 0)
# Helper methods
def _create_mock_image(self) -> int:
"""Create a mock image entry in database"""
import uuid
unique_path = f'test_path_{uuid.uuid4().hex[:8]}.jpg'
c = self.tagger.conn.cursor()
c.execute('''INSERT INTO images
(path, filename, date_taken, width, height, file_size)
VALUES (?, ?, ?, ?, ?, ?)''',
(unique_path, unique_path, datetime.now(),
800, 600, 12345))
self.tagger.conn.commit()
return c.lastrowid
def _create_mock_face(self, image_id: int) -> int:
"""Create a mock face entry in database"""
import pickle
encoding = np.random.rand(128)
encoding_blob = pickle.dumps(encoding)
c = self.tagger.conn.cursor()
c.execute('''INSERT INTO faces
(image_id, top, right, bottom, left, encoding)
VALUES (?, ?, ?, ?, ?, ?)''',
(image_id, 10, 110, 110, 10, encoding_blob))
self.tagger.conn.commit()
return c.lastrowid
def _create_mock_faces(self, count: int) -> List[int]:
"""Create multiple mock faces"""
face_ids = []
for i in range(count):
image_id = self._create_mock_image()
face_id = self._create_mock_face(image_id)
face_ids.append(face_id)
return face_ids
def run_performance_tests():
"""Run performance tests with larger datasets"""
print("\nRunning Performance Tests")
print("=" * 50)
with tempfile.TemporaryDirectory() as temp_dir:
db_path = os.path.join(temp_dir, 'perf_test.db')
tagger = PunimTag(db_path=db_path)
try:
# Test with larger numbers of faces
print("Creating 1000 mock faces...")
start_time = datetime.now()
face_ids = []
for i in range(1000):
# Create image
c = tagger.conn.cursor()
c.execute('''INSERT INTO images
(path, filename, date_taken, width, height, file_size)
VALUES (?, ?, ?, ?, ?, ?)''',
(f'perf_test_{i}_{uuid.uuid4().hex[:8]}.jpg', f'perf_test_{i}.jpg',
datetime.now(), 800, 600, 12345))
image_id = c.lastrowid
# Create face
encoding = np.random.rand(128)
encoding_blob = pickle.dumps(encoding)
c.execute('''INSERT INTO faces
(image_id, top, right, bottom, left, encoding)
VALUES (?, ?, ?, ?, ?, ?)''',
(image_id, 10, 110, 110, 10, encoding_blob))
face_ids.append(c.lastrowid)
if i % 100 == 0:
print(f"Created {i} faces...")
tagger.conn.commit()
creation_time = (datetime.now() - start_time).total_seconds()
print(f"Created 1000 faces in {creation_time:.2f} seconds")
# Test clustering performance
print("Testing clustering performance...")
start_time = datetime.now()
clusters = tagger.cluster_unknown_faces()
clustering_time = (datetime.now() - start_time).total_seconds()
print(f"Clustered faces in {clustering_time:.2f} seconds")
print(f"Found {len(clusters)} clusters")
# Test search performance
print("Testing search performance...")
start_time = datetime.now()
results = tagger.advanced_search(limit=100)
search_time = (datetime.now() - start_time).total_seconds()
print(f"Search completed in {search_time:.2f} seconds")
print(f"Found {len(results)} results")
finally:
tagger.close()
def main():
"""Main test runner"""
print("PunimTag Backend Test Suite")
print("=" * 50)
# Run unit tests
print("Running unit tests...")
loader = unittest.TestLoader()
suite = loader.loadTestsFromTestCase(TestBackendFunctionality)
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
if result.wasSuccessful():
print("\n✅ All unit tests passed!")
# Run performance tests
run_performance_tests()
print("\n🎉 Backend testing completed successfully!")
print("\nBackend is ready for UI development.")
else:
print("\n❌ Some tests failed. Please fix issues before proceeding.")
return False
return True
if __name__ == "__main__":
success = main()
exit(0 if success else 1)