feat: Enhance API startup script and add file hash management for photos

This commit improves the `run_api_with_worker.sh` script by ensuring the virtual environment is created if it doesn't exist and dependencies are installed. It also adds a check to ensure the database schema is up to date. Additionally, new functionality has been introduced to calculate and store file hashes for uploaded photos, preventing duplicates. The database schema has been updated to include a `file_hash` column in the `photos` table, along with an index for efficient querying. The frontend has been updated to handle warnings for duplicate photos during the review process. Documentation has been updated to reflect these changes.
This commit is contained in:
tanyar09 2025-11-24 13:16:41 -05:00
parent 93cb4eda5b
commit 661e812193
35 changed files with 593 additions and 5628 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.8 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.7 MiB

View File

@ -336,8 +336,10 @@ CREATE TABLE photos (
filename TEXT NOT NULL,
date_added DATETIME DEFAULT CURRENT_TIMESTAMP,
date_taken DATE,
processed BOOLEAN DEFAULT 0
processed BOOLEAN DEFAULT 0,
file_hash TEXT NOT NULL
);
CREATE INDEX idx_photos_file_hash ON photos(file_hash);
```
#### people

View File

@ -36,6 +36,7 @@ export interface ReviewResponse {
approved: number
rejected: number
errors: string[]
warnings?: string[] // Informational messages (e.g., duplicates)
}
export const pendingPhotosApi = {

View File

@ -16,6 +16,12 @@ export default function PendingPhotos() {
const [submitting, setSubmitting] = useState(false)
const [statusFilter, setStatusFilter] = useState<string>('pending')
const [imageUrls, setImageUrls] = useState<Record<number, string>>({})
const [notification, setNotification] = useState<{
approved: number
rejected: number
warnings: string[]
errors: string[]
} | null>(null)
const imageUrlsRef = useRef<Record<number, string>>({})
const loadPendingPhotos = useCallback(async () => {
@ -232,19 +238,20 @@ export default function PendingPhotos() {
decisions: decisionsList,
})
const message = [
`✅ Approved: ${response.approved}`,
`❌ Rejected: ${response.rejected}`,
response.errors.length > 0 ? `⚠️ Errors: ${response.errors.length}` : '',
]
.filter(Boolean)
.join('\n')
alert(message)
// Show custom notification instead of alert
setNotification({
approved: response.approved,
rejected: response.rejected,
warnings: response.warnings || [],
errors: response.errors,
})
if (response.errors.length > 0) {
console.error('Errors:', response.errors)
}
if (response.warnings && response.warnings.length > 0) {
console.info('Warnings:', response.warnings)
}
// Reload the list to show updated status
await loadPendingPhotos()
@ -263,6 +270,39 @@ export default function PendingPhotos() {
return (
<div>
{/* Notification */}
{notification && (
<div className="mb-4 bg-white border border-gray-200 rounded-lg shadow-lg p-4">
<div className="space-y-2">
<div className="flex items-center gap-2">
<span className="text-green-600 text-lg"></span>
<span className="font-medium">Approved: {notification.approved}</span>
</div>
<div className="flex items-center gap-2">
<span className="text-red-600 text-lg"></span>
<span className="font-medium">Rejected: {notification.rejected}</span>
</div>
{notification.warnings.length > 0 && (
<div className="text-xs text-gray-600 ml-7">
{notification.warnings.join(', ')}
</div>
)}
{notification.errors.length > 0 && (
<div className="flex items-center gap-2">
<span className="text-yellow-600 text-lg"></span>
<span className="font-medium">Errors: {notification.errors.length}</span>
</div>
)}
</div>
<button
onClick={() => setNotification(null)}
className="mt-3 px-3 py-1.5 text-sm text-gray-600 bg-gray-50 rounded hover:bg-gray-100 hover:text-gray-700 transition-colors"
>
Dismiss
</button>
</div>
)}
<div className="bg-white rounded-lg shadow p-6">
<h1 className="text-2xl font-bold text-gray-900 mb-6">Manage User Uploaded Photos</h1>

View File

@ -1,16 +1,51 @@
#!/bin/bash
# Start FastAPI server with RQ worker in background
set -euo pipefail
cd "$(dirname "$0")"
# Activate virtual environment if it exists
if [ -d "venv" ]; then
source venv/bin/activate
# Ensure virtual environment exists and dependencies are installed
PYTHON_BIN="${PYTHON_BIN:-}"
if [ -z "$PYTHON_BIN" ]; then
if command -v python3 >/dev/null 2>&1; then
PYTHON_BIN="python3"
elif command -v python >/dev/null 2>&1; then
PYTHON_BIN="python"
else
echo "❌ Python interpreter not found. Please install Python 3.10+."
exit 1
fi
fi
if [ ! -d "venv" ]; then
echo "📦 Creating virtual environment..."
"$PYTHON_BIN" -m venv venv
fi
source venv/bin/activate
REQUIREMENTS_STAMP="venv/.requirements_installed"
if [ ! -f "$REQUIREMENTS_STAMP" ] || [ requirements.txt -nt "$REQUIREMENTS_STAMP" ]; then
echo "📦 Installing backend dependencies..."
python -m pip install --upgrade pip wheel setuptools
python -m pip install -r requirements.txt
touch "$REQUIREMENTS_STAMP"
fi
# Set Python path
export PYTHONPATH="$(pwd)"
# Ensure database schema exists
echo "🗃 Ensuring database schema is up to date..."
python - <<'PY'
from src.web.db.models import Base
from src.web.db.session import engine
Base.metadata.create_all(bind=engine)
print("✅ Database schema ready")
PY
# Check if Redis is running
if ! redis-cli ping > /dev/null 2>&1; then
echo "⚠️ Redis is not running. Starting Redis..."

View File

@ -1,102 +0,0 @@
#!/usr/bin/env python3
"""Check all identified faces for pose information"""
import sqlite3
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.core.config import DEFAULT_DB_PATH
def check_identified_faces(db_path: str):
"""Check all identified faces for pose information"""
if not os.path.exists(db_path):
print(f"Database not found: {db_path}")
return
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get all identified faces with pose information
cursor.execute('''
SELECT
f.id,
f.person_id,
p.name || ' ' || p.last_name as person_name,
ph.filename,
f.pose_mode,
f.yaw_angle,
f.pitch_angle,
f.roll_angle,
f.face_confidence,
f.quality_score,
f.location
FROM faces f
JOIN people p ON f.person_id = p.id
JOIN photos ph ON f.photo_id = ph.id
WHERE f.person_id IS NOT NULL
ORDER BY p.id, f.id
''')
faces = cursor.fetchall()
if not faces:
print("No identified faces found.")
return
print(f"\n{'='*80}")
print(f"Found {len(faces)} identified faces")
print(f"{'='*80}\n")
# Group by person
by_person = {}
for face in faces:
person_id = face['person_id']
if person_id not in by_person:
by_person[person_id] = []
by_person[person_id].append(face)
# Print summary
print("SUMMARY BY PERSON:")
print("-" * 80)
for person_id, person_faces in by_person.items():
person_name = person_faces[0]['person_name']
pose_modes = [f['pose_mode'] for f in person_faces]
frontal_count = sum(1 for p in pose_modes if p == 'frontal')
profile_count = sum(1 for p in pose_modes if 'profile' in p)
other_count = len(pose_modes) - frontal_count - profile_count
print(f"\nPerson {person_id}: {person_name}")
print(f" Total faces: {len(person_faces)}")
print(f" Frontal: {frontal_count}")
print(f" Profile: {profile_count}")
print(f" Other: {other_count}")
print(f" Pose modes: {set(pose_modes)}")
# Print detailed information
print(f"\n{'='*80}")
print("DETAILED FACE INFORMATION:")
print(f"{'='*80}\n")
for face in faces:
print(f"Face ID: {face['id']}")
print(f" Person: {face['person_name']} (ID: {face['person_id']})")
print(f" Photo: {face['filename']}")
print(f" Pose Mode: {face['pose_mode']}")
print(f" Yaw: {face['yaw_angle']:.2f}°" if face['yaw_angle'] is not None else " Yaw: None")
print(f" Pitch: {face['pitch_angle']:.2f}°" if face['pitch_angle'] is not None else " Pitch: None")
print(f" Roll: {face['roll_angle']:.2f}°" if face['roll_angle'] is not None else " Roll: None")
print(f" Confidence: {face['face_confidence']:.3f}")
print(f" Quality: {face['quality_score']:.3f}")
print(f" Location: {face['location']}")
print()
conn.close()
if __name__ == "__main__":
db_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DB_PATH
check_identified_faces(db_path)

188
scripts/check_two_faces_pose.py Executable file
View File

@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""Check two identified faces and analyze why their pose modes are wrong"""
import sys
import os
import json
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.web.db.models import Face, Person, Photo
from src.web.db.session import get_database_url
from src.utils.pose_detection import PoseDetector
def check_two_faces(face_id1: int = None, face_id2: int = None):
"""Check two identified faces and analyze their pose modes"""
db_url = get_database_url()
print(f"Connecting to database: {db_url}")
engine = create_engine(db_url)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Get all identified faces
query = (
session.query(Face, Person, Photo)
.join(Person, Face.person_id == Person.id)
.join(Photo, Face.photo_id == Photo.id)
.filter(Face.person_id.isnot(None))
.order_by(Face.id)
)
if face_id1:
query = query.filter(Face.id == face_id1)
elif face_id2:
query = query.filter(Face.id == face_id2)
faces = query.limit(2).all()
if len(faces) < 2:
print(f"Found {len(faces)} identified face(s). Need 2 faces to compare.")
if len(faces) == 0:
print("No identified faces found.")
return
print("\nShowing available identified faces:")
all_faces = (
session.query(Face, Person, Photo)
.join(Person, Face.person_id == Person.id)
.join(Photo, Face.photo_id == Photo.id)
.filter(Face.person_id.isnot(None))
.order_by(Face.id)
.limit(10)
.all()
)
for face, person, photo in all_faces:
print(f" Face ID: {face.id}, Person: {person.first_name} {person.last_name}, Photo: {photo.filename}, Pose: {face.pose_mode}")
return
print(f"\n{'='*80}")
print("ANALYZING TWO IDENTIFIED FACES")
print(f"{'='*80}\n")
for idx, (face, person, photo) in enumerate(faces, 1):
person_name = f"{person.first_name} {person.last_name}"
print(f"{'='*80}")
print(f"FACE {idx}: ID {face.id}")
print(f"{'='*80}")
print(f"Person: {person_name} (ID: {face.person_id})")
print(f"Photo: {photo.filename}")
print(f"Current Pose Mode: {face.pose_mode}")
print(f"Yaw: {face.yaw_angle:.2f}°" if face.yaw_angle is not None else "Yaw: None")
print(f"Pitch: {face.pitch_angle:.2f}°" if face.pitch_angle is not None else "Pitch: None")
print(f"Roll: {face.roll_angle:.2f}°" if face.roll_angle is not None else "Roll: None")
print(f"Face Width: {face.face_width if hasattr(face, 'face_width') else 'N/A'}")
print(f"Confidence: {face.face_confidence:.3f}")
print(f"Quality: {face.quality_score:.3f}")
print(f"Location: {face.location}")
# Parse landmarks if available
landmarks = None
if face.landmarks:
try:
landmarks = json.loads(face.landmarks)
print(f"\nLandmarks:")
for key, value in landmarks.items():
print(f" {key}: {value}")
except json.JSONDecodeError:
print(f"\nLandmarks: (invalid JSON)")
# Recalculate pose mode using current logic
print(f"\n{''*80}")
print("RECALCULATING POSE MODE:")
print(f"{''*80}")
# Calculate face width from landmarks if available
face_width = None
if landmarks:
face_width = PoseDetector.calculate_face_width_from_landmarks(landmarks)
print(f"Calculated face_width from landmarks: {face_width}")
# Recalculate pose mode
recalculated_pose = PoseDetector.classify_pose_mode(
face.yaw_angle,
face.pitch_angle,
face.roll_angle,
face_width,
landmarks
)
print(f"Recalculated Pose Mode: {recalculated_pose}")
if recalculated_pose != face.pose_mode:
print(f"⚠️ MISMATCH! Current: '{face.pose_mode}' vs Recalculated: '{recalculated_pose}'")
# Analyze why
print(f"\nAnalysis:")
if face.yaw_angle is None:
print(f" - Yaw is None")
if landmarks:
left_eye = landmarks.get('left_eye')
right_eye = landmarks.get('right_eye')
nose = landmarks.get('nose')
missing = []
if not left_eye:
missing.append('left_eye')
if not right_eye:
missing.append('right_eye')
if not nose:
missing.append('nose')
if missing:
print(f" - Missing landmarks: {', '.join(missing)}")
print(f" - Should be classified as profile (missing landmarks)")
else:
print(f" - All landmarks present")
if face_width:
print(f" - Face width: {face_width}px")
if face_width < 25.0:
print(f" - Face width < 25px, should be profile")
else:
print(f" - Face width >= 25px, should be frontal")
else:
print(f" - No landmarks available")
else:
abs_yaw = abs(face.yaw_angle)
print(f" - Yaw angle: {face.yaw_angle:.2f}° (abs: {abs_yaw:.2f}°)")
if abs_yaw >= 30.0:
expected = "profile_left" if face.yaw_angle < 0 else "profile_right"
print(f" - |yaw| >= 30°, should be '{expected}'")
else:
print(f" - |yaw| < 30°, should be 'frontal'")
else:
print(f"✓ Pose mode matches recalculated value")
print()
finally:
session.close()
if __name__ == "__main__":
face_id1 = None
face_id2 = None
if len(sys.argv) > 1:
try:
face_id1 = int(sys.argv[1])
except ValueError:
print(f"Invalid face ID: {sys.argv[1]}")
sys.exit(1)
if len(sys.argv) > 2:
try:
face_id2 = int(sys.argv[2])
except ValueError:
print(f"Invalid face ID: {sys.argv[2]}")
sys.exit(1)
try:
check_two_faces(face_id1, face_id2)
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -1,39 +0,0 @@
#!/usr/bin/env python3
"""
Script to clean up false positive face detections from the database
"""
import sys
import os
# Add the project root to the Python path
project_root = os.path.join(os.path.dirname(__file__), '..')
sys.path.insert(0, project_root)
from src.core.database import DatabaseManager
from src.core.face_processing import FaceProcessor
def main():
"""Clean up false positive faces from the database"""
print("🧹 PunimTag False Positive Face Cleanup")
print("=" * 50)
# Initialize database and face processor
db_manager = DatabaseManager()
face_processor = FaceProcessor(db_manager, verbose=1)
# Clean up false positives
removed_count = face_processor.cleanup_false_positive_faces(verbose=True)
if removed_count > 0:
print(f"\n✅ Cleanup complete! Removed {removed_count} false positive faces.")
print("You can now re-run face processing with improved filtering.")
else:
print("\n✅ No false positive faces found to remove.")
print("\nTo reprocess photos with improved face detection:")
print("1. Use the web interface to process photos")
print("2. Or use the API endpoints to trigger face processing")
if __name__ == "__main__":
main()

View File

@ -62,3 +62,4 @@ else
exit 1
fi

View File

@ -97,3 +97,4 @@ def grant_delete_permission() -> None:
if __name__ == "__main__":
grant_delete_permission()

View File

@ -1,117 +0,0 @@
#!/usr/bin/env python3
"""
Migration script to prepare database for DeepFace
Drops all existing tables and recreates with new schema
WARNING: This will delete ALL existing data!
Run this script before migrating to DeepFace.
"""
import sqlite3
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.core.database import DatabaseManager
from src.core.config import DEFAULT_DB_PATH
def migrate_database():
"""Drop all tables and reinitialize with DeepFace schema"""
print("=" * 70)
print("DeepFace Migration Script - Database Reset")
print("=" * 70)
print()
print("⚠️ WARNING: This will delete ALL existing data!")
print()
print("This includes:")
print(" • All photos")
print(" • All faces and face encodings")
print(" • All people and person data")
print(" • All tags and photo-tag linkages")
print()
print("The database will be recreated with the new DeepFace schema.")
print()
response = input("Type 'DELETE ALL DATA' to confirm (or anything else to cancel): ")
if response != "DELETE ALL DATA":
print()
print("❌ Migration cancelled.")
print()
return False
print()
print("🗑️ Dropping all existing tables...")
print()
try:
# Connect directly to database
conn = sqlite3.connect(DEFAULT_DB_PATH)
cursor = conn.cursor()
# Get list of all tables (excluding SQLite system tables)
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [row[0] for row in cursor.fetchall()]
if not tables:
print(" No tables found in database.")
else:
# Drop all tables in correct order (respecting foreign keys)
drop_order = ['phototaglinkage', 'person_encodings', 'faces', 'tags', 'people', 'photos']
for table in drop_order:
if table in tables:
cursor.execute(f'DROP TABLE IF EXISTS {table}')
print(f" ✓ Dropped table: {table}")
# Drop any remaining tables not in our list (excluding SQLite system tables)
for table in tables:
if table not in drop_order and not table.startswith('sqlite_'):
cursor.execute(f'DROP TABLE IF EXISTS {table}')
print(f" ✓ Dropped table: {table}")
conn.commit()
conn.close()
print()
print("✅ All tables dropped successfully")
print()
print("🔄 Reinitializing database with DeepFace schema...")
print()
# Reinitialize with new schema
db = DatabaseManager(DEFAULT_DB_PATH, verbose=1)
print()
print("=" * 70)
print("✅ Database migration complete!")
print("=" * 70)
print()
print("Next steps:")
print(" 1. Add photos using the dashboard (File → Add Photos)")
print(" 2. Process faces with DeepFace (Tools → Process Faces)")
print(" 3. Identify people in the Identify panel")
print()
print("New DeepFace features:")
print(" • 512-dimensional face encodings (vs 128)")
print(" • Multiple detector backends (RetinaFace, MTCNN, etc.)")
print(" • ArcFace model for improved accuracy")
print(" • Face confidence scores from detector")
print()
return True
except Exception as e:
print()
print(f"❌ Error during migration: {e}")
print()
return False
if __name__ == "__main__":
success = migrate_database()
sys.exit(0 if success else 1)

View File

@ -60,3 +60,4 @@ echo "3. Run your application - it will connect to PostgreSQL automatically"

View File

@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""Update status of a reported photo in the auth database."""
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import text
from src.web.db.session import get_auth_database_url, AuthSessionLocal
def update_reported_photo_status(report_id: int, new_status: str):
"""Update the status of a reported photo."""
if AuthSessionLocal is None:
raise ValueError("Auth database not configured. Set DATABASE_URL_AUTH environment variable.")
db = AuthSessionLocal()
try:
# First check if the report exists and get its current status
check_result = db.execute(text("""
SELECT id, status, review_notes
FROM inappropriate_photo_reports
WHERE id = :report_id
"""), {"report_id": report_id})
row = check_result.fetchone()
if not row:
print(f"❌ Reported photo {report_id} not found in database.")
return
current_status = row.status
review_notes = row.review_notes
print(f"📋 Current status: '{current_status}'")
if review_notes:
print(f"📝 Review notes: '{review_notes}'")
if current_status == new_status:
print(f" Status is already '{new_status}'. No update needed.")
return
# Update the status
result = db.execute(text("""
UPDATE inappropriate_photo_reports
SET status = :new_status
WHERE id = :report_id
"""), {
"new_status": new_status,
"report_id": report_id
})
db.commit()
if result.rowcount > 0:
print(f"✅ Successfully updated reported photo {report_id} status from '{current_status}' to '{new_status}'")
else:
print(f"⚠️ No rows updated.")
except Exception as e:
db.rollback()
print(f"❌ Error updating reported photo status: {str(e)}")
raise
finally:
db.close()
def find_reported_photo_by_note(search_note: str):
"""Find reported photos by review notes."""
if AuthSessionLocal is None:
raise ValueError("Auth database not configured. Set DATABASE_URL_AUTH environment variable.")
db = AuthSessionLocal()
try:
result = db.execute(text("""
SELECT id, photo_id, status, review_notes, reported_at
FROM inappropriate_photo_reports
WHERE review_notes LIKE :search_pattern
ORDER BY id DESC
"""), {"search_pattern": f"%{search_note}%"})
rows = result.fetchall()
if not rows:
print(f"❌ No reported photos found with note containing '{search_note}'")
return []
print(f"📋 Found {len(rows)} reported photo(s) with note containing '{search_note}':\n")
for row in rows:
print(f" ID: {row.id}, Photo ID: {row.photo_id}, Status: {row.status}")
print(f" Notes: {row.review_notes}")
print(f" Reported at: {row.reported_at}\n")
return rows
except Exception as e:
print(f"❌ Error searching for reported photos: {str(e)}")
raise
finally:
db.close()
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python scripts/update_reported_photo_status.py <report_id> <new_status>")
print(" OR: python scripts/update_reported_photo_status.py search <search_text>")
print("Example: python scripts/update_reported_photo_status.py 57 dismissed")
print("Example: python scripts/update_reported_photo_status.py search 'agree. removed'")
sys.exit(1)
if sys.argv[1] == "search":
search_text = sys.argv[2]
find_reported_photo_by_note(search_text)
else:
report_id = int(sys.argv[1])
new_status = sys.argv[2]
update_reported_photo_status(report_id, new_status)

View File

@ -1,18 +0,0 @@
"""
Core business logic modules for PunimTag
"""
from .database import DatabaseManager
from .face_processing import FaceProcessor
from .photo_management import PhotoManager
from .tag_management import TagManager
from .search_stats import SearchStats
__all__ = [
'DatabaseManager',
'FaceProcessor',
'PhotoManager',
'TagManager',
'SearchStats',
]

View File

@ -1,63 +0,0 @@
#!/usr/bin/env python3
"""
Configuration constants and settings for PunimTag
"""
import os
import warnings
# Suppress TensorFlow warnings (must be before DeepFace import)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
warnings.filterwarnings('ignore')
# Default file paths
DEFAULT_DB_PATH = "data/photos.db"
DEFAULT_CONFIG_FILE = "gui_config.json"
DEFAULT_WINDOW_SIZE = "600x500"
# DeepFace Settings
DEEPFACE_DETECTOR_BACKEND = "retinaface" # Options: retinaface, mtcnn, opencv, ssd
DEEPFACE_MODEL_NAME = "ArcFace" # Best accuracy model
DEEPFACE_DISTANCE_METRIC = "cosine" # For similarity calculation
DEEPFACE_ENFORCE_DETECTION = False # Don't fail if no faces found
DEEPFACE_ALIGN_FACES = True # Face alignment for better accuracy
# DeepFace Options for GUI
DEEPFACE_DETECTOR_OPTIONS = ["retinaface", "mtcnn", "opencv", "ssd"]
DEEPFACE_MODEL_OPTIONS = ["ArcFace", "Facenet", "Facenet512", "VGG-Face"]
# Face tolerance/threshold settings (adjusted for DeepFace)
DEFAULT_FACE_TOLERANCE = 0.6 # Default tolerance for face matching
DEEPFACE_SIMILARITY_THRESHOLD = 60 # Minimum similarity percentage (0-100)
# Confidence calibration settings
USE_CALIBRATED_CONFIDENCE = True # Use calibrated confidence instead of linear transformation
CONFIDENCE_CALIBRATION_METHOD = "empirical" # Method: "empirical", "linear", "sigmoid"
# Legacy settings (kept for compatibility until Phase 3 migration)
DEFAULT_FACE_DETECTION_MODEL = "hog" # Legacy - will be replaced by DEEPFACE_DETECTOR_BACKEND
DEFAULT_BATCH_SIZE = 20
DEFAULT_PROCESSING_LIMIT = 50
# Face quality settings
MIN_FACE_QUALITY = 0.3
DEFAULT_CONFIDENCE_THRESHOLD = 0.5
# Face detection filtering settings
MIN_FACE_CONFIDENCE = 0.4 # Minimum confidence from detector to accept face (lowered to allow more low-quality faces)
MIN_FACE_SIZE = 40 # Minimum face size in pixels (width or height) - lowered to allow smaller faces
MAX_FACE_SIZE = 1500 # Maximum face size in pixels (to avoid full-image false positives)
# GUI settings
FACE_CROP_SIZE = 100
ICON_SIZE = 20
MAX_SUGGESTIONS = 10
# Database settings
DB_TIMEOUT = 30.0
# Supported image formats
SUPPORTED_IMAGE_FORMATS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
# Face crop temporary directory
TEMP_FACE_CROP_DIR = "temp_face_crops"

View File

@ -1,592 +0,0 @@
#!/usr/bin/env python3
"""
Database operations and schema management for PunimTag
"""
import sqlite3
import threading
from contextlib import contextmanager
from typing import Dict, List, Tuple, Optional
from src.core.config import DEFAULT_DB_PATH, DB_TIMEOUT
class DatabaseManager:
"""Handles all database operations for the photo tagger"""
def __init__(self, db_path: str = DEFAULT_DB_PATH, verbose: int = 0):
"""Initialize database manager"""
self.db_path = db_path
self.verbose = verbose
self._db_connection = None
self._db_lock = threading.Lock()
self.init_database()
@contextmanager
def get_db_connection(self):
"""Context manager for database connections with connection pooling"""
with self._db_lock:
if self._db_connection is None:
self._db_connection = sqlite3.connect(self.db_path, timeout=DB_TIMEOUT, check_same_thread=False)
self._db_connection.row_factory = sqlite3.Row
try:
yield self._db_connection
except Exception:
self._db_connection.rollback()
raise
else:
self._db_connection.commit()
def close_db_connection(self):
"""Close database connection"""
with self._db_lock:
if self._db_connection:
self._db_connection.close()
self._db_connection = None
def init_database(self):
"""Create database tables if they don't exist"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Photos table
cursor.execute('''
CREATE TABLE IF NOT EXISTS photos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT UNIQUE NOT NULL,
filename TEXT NOT NULL,
date_added DATETIME DEFAULT CURRENT_TIMESTAMP,
date_taken DATE,
processed BOOLEAN DEFAULT 0
)
''')
# People table
cursor.execute('''
CREATE TABLE IF NOT EXISTS people (
id INTEGER PRIMARY KEY AUTOINCREMENT,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
middle_name TEXT,
maiden_name TEXT,
date_of_birth DATE,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(first_name, last_name, middle_name, maiden_name, date_of_birth)
)
''')
# Faces table (updated for DeepFace and pose detection)
cursor.execute('''
CREATE TABLE IF NOT EXISTS faces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
photo_id INTEGER NOT NULL,
person_id INTEGER,
encoding BLOB NOT NULL,
location TEXT NOT NULL,
confidence REAL DEFAULT 0.0,
quality_score REAL DEFAULT 0.0,
is_primary_encoding BOOLEAN DEFAULT 0,
detector_backend TEXT DEFAULT 'retinaface',
model_name TEXT DEFAULT 'ArcFace',
face_confidence REAL DEFAULT 0.0,
exif_orientation INTEGER DEFAULT NULL,
pose_mode TEXT DEFAULT 'frontal',
yaw_angle REAL DEFAULT NULL,
pitch_angle REAL DEFAULT NULL,
roll_angle REAL DEFAULT NULL,
landmarks TEXT DEFAULT NULL,
FOREIGN KEY (photo_id) REFERENCES photos (id),
FOREIGN KEY (person_id) REFERENCES people (id)
)
''')
# Add pose fields if they don't exist (for existing databases)
try:
cursor.execute('ALTER TABLE faces ADD COLUMN pose_mode TEXT DEFAULT "frontal"')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute('ALTER TABLE faces ADD COLUMN yaw_angle REAL DEFAULT NULL')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute('ALTER TABLE faces ADD COLUMN pitch_angle REAL DEFAULT NULL')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute('ALTER TABLE faces ADD COLUMN roll_angle REAL DEFAULT NULL')
except sqlite3.OperationalError:
pass # Column already exists
# Person encodings table for multiple encodings per person (updated for DeepFace)
cursor.execute('''
CREATE TABLE IF NOT EXISTS person_encodings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id INTEGER NOT NULL,
face_id INTEGER NOT NULL,
encoding BLOB NOT NULL,
quality_score REAL DEFAULT 0.0,
detector_backend TEXT DEFAULT 'retinaface',
model_name TEXT DEFAULT 'ArcFace',
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (person_id) REFERENCES people (id),
FOREIGN KEY (face_id) REFERENCES faces (id)
)
''')
# Tags table - holds only tag information
cursor.execute('''
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tag_name TEXT UNIQUE NOT NULL,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Photo-Tag linkage table
# linkage_type: INTEGER enum → 0 = single (per-photo add), 1 = bulk (folder-wide add)
cursor.execute('''
CREATE TABLE IF NOT EXISTS phototaglinkage (
linkage_id INTEGER PRIMARY KEY AUTOINCREMENT,
photo_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
linkage_type INTEGER NOT NULL DEFAULT 0 CHECK(linkage_type IN (0,1)),
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (photo_id) REFERENCES photos (id),
FOREIGN KEY (tag_id) REFERENCES tags (id),
UNIQUE(photo_id, tag_id)
)
''')
# Add indexes for better performance
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_person_id ON faces(person_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_photo_id ON faces(photo_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_processed ON photos(processed)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_quality ON faces(quality_score)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_person_id ON person_encodings(person_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_quality ON person_encodings(quality_score)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_taken ON photos(date_taken)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_added ON photos(date_added)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_pose_mode ON faces(pose_mode)')
if self.verbose >= 1:
print(f"✅ Database initialized: {self.db_path}")
def load_tag_mappings(self) -> Tuple[Dict[int, str], Dict[str, int]]:
"""Load tag name to ID and ID to name mappings from database (case-insensitive)"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT id, tag_name FROM tags ORDER BY LOWER(tag_name)')
tag_id_to_name = {}
tag_name_to_id = {}
for row in cursor.fetchall():
tag_id, tag_name = row
tag_id_to_name[tag_id] = tag_name
# Use lowercase for case-insensitive lookups
tag_name_to_id[tag_name.lower()] = tag_id
return tag_id_to_name, tag_name_to_id
def get_existing_tag_ids_for_photo(self, photo_id: int) -> List[int]:
"""Get list of tag IDs for a photo from database"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ptl.tag_id
FROM phototaglinkage ptl
WHERE ptl.photo_id = ?
ORDER BY ptl.created_date
''', (photo_id,))
return [row[0] for row in cursor.fetchall()]
def get_tag_id_by_name(self, tag_name: str, tag_name_to_id_map: Dict[str, int]) -> Optional[int]:
"""Get tag ID by name, creating the tag if it doesn't exist"""
if tag_name in tag_name_to_id_map:
return tag_name_to_id_map[tag_name]
return None
def get_tag_name_by_id(self, tag_id: int, tag_id_to_name_map: Dict[int, str]) -> str:
"""Get tag name by ID"""
return tag_id_to_name_map.get(tag_id, f"Unknown Tag {tag_id}")
def show_people_list(self, cursor=None) -> List[Tuple]:
"""Show list of people in database"""
if cursor is None:
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT id, first_name, last_name, middle_name, maiden_name, date_of_birth, created_date
FROM people
ORDER BY last_name, first_name
''')
return cursor.fetchall()
def add_photo(self, photo_path: str, filename: str, date_taken: Optional[str] = None) -> int:
"""Add a photo to the database and return its ID if new, None if already exists"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Check if photo already exists
cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,))
existing = cursor.fetchone()
if existing:
# Photo already exists, return None to indicate it wasn't added
return None
# Photo doesn't exist, insert it
cursor.execute('''
INSERT INTO photos (path, filename, date_taken)
VALUES (?, ?, ?)
''', (photo_path, filename, date_taken))
# Get the new photo ID
cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,))
result = cursor.fetchone()
return result[0] if result else None
def mark_photo_processed(self, photo_id: int):
"""Mark a photo as processed"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
def add_face(self, photo_id: int, encoding: bytes, location: str, confidence: float = 0.0,
quality_score: float = 0.0, person_id: Optional[int] = None,
detector_backend: str = 'retinaface',
model_name: str = 'ArcFace',
face_confidence: float = 0.0,
exif_orientation: Optional[int] = None,
pose_mode: str = 'frontal',
yaw_angle: Optional[float] = None,
pitch_angle: Optional[float] = None,
roll_angle: Optional[float] = None,
landmarks: Optional[str] = None) -> int:
"""Add a face to the database and return its ID
Args:
photo_id: ID of the photo containing the face
encoding: Face encoding as bytes (512 floats for ArcFace = 4096 bytes)
location: Face location as string (DeepFace format: "{'x': x, 'y': y, 'w': w, 'h': h}")
confidence: Legacy confidence value (kept for compatibility)
quality_score: Quality score 0.0-1.0
person_id: ID of identified person (None if unidentified)
detector_backend: DeepFace detector used (retinaface, mtcnn, opencv, ssd)
model_name: DeepFace model used (ArcFace, Facenet, etc.)
face_confidence: Confidence from DeepFace detector
exif_orientation: EXIF orientation value (1-8) for coordinate transformation
pose_mode: Pose mode classification (e.g., 'frontal', 'profile_left', 'looking_up')
yaw_angle: Yaw angle in degrees (left/right rotation)
pitch_angle: Pitch angle in degrees (up/down tilt)
roll_angle: Roll angle in degrees (rotation around face axis)
landmarks: JSON string of facial landmarks (e.g., {'left_eye': [x, y], ...})
Returns:
Face ID
"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO faces (photo_id, person_id, encoding, location, confidence,
quality_score, detector_backend, model_name, face_confidence,
exif_orientation, pose_mode, yaw_angle, pitch_angle, roll_angle, landmarks)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (photo_id, person_id, encoding, location, confidence, quality_score,
detector_backend, model_name, face_confidence, exif_orientation,
pose_mode, yaw_angle, pitch_angle, roll_angle, landmarks))
return cursor.lastrowid
def update_face_person(self, face_id: int, person_id: Optional[int]):
"""Update the person_id for a face"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE faces SET person_id = ? WHERE id = ?', (person_id, face_id))
def add_person(self, first_name: str, last_name: str, middle_name: str = None,
maiden_name: str = None, date_of_birth: str = None) -> int:
"""Add a person to the database and return their ID (case-insensitive)"""
# Normalize names to title case for case-insensitive matching
normalized_first = first_name.strip().title()
normalized_last = last_name.strip().title()
normalized_middle = middle_name.strip().title() if middle_name else ''
normalized_maiden = maiden_name.strip().title() if maiden_name else ''
normalized_dob = date_of_birth.strip() if date_of_birth else ''
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO people (first_name, last_name, middle_name, maiden_name, date_of_birth)
VALUES (?, ?, ?, ?, ?)
''', (normalized_first, normalized_last, normalized_middle, normalized_maiden, normalized_dob))
# Get the person ID (case-insensitive lookup)
cursor.execute('''
SELECT id FROM people
WHERE LOWER(first_name) = LOWER(?) AND LOWER(last_name) = LOWER(?)
AND LOWER(COALESCE(middle_name, '')) = LOWER(?) AND LOWER(COALESCE(maiden_name, '')) = LOWER(?)
AND date_of_birth = ?
''', (normalized_first, normalized_last, normalized_middle, normalized_maiden, normalized_dob))
result = cursor.fetchone()
return result[0] if result else None
def add_tag(self, tag_name: str) -> int:
"""Add a tag to the database and return its ID (case-insensitive)"""
# Normalize tag name to lowercase for consistency
normalized_tag_name = tag_name.lower().strip()
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Check if tag already exists (case-insensitive)
cursor.execute('SELECT id FROM tags WHERE LOWER(tag_name) = ?', (normalized_tag_name,))
existing = cursor.fetchone()
if existing:
return existing[0]
# Insert new tag with original case
cursor.execute('INSERT INTO tags (tag_name) VALUES (?)', (tag_name.strip(),))
# Get the tag ID
cursor.execute('SELECT id FROM tags WHERE LOWER(tag_name) = ?', (normalized_tag_name,))
result = cursor.fetchone()
return result[0] if result else None
def link_photo_tag(self, photo_id: int, tag_id: int):
"""Link a photo to a tag"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO phototaglinkage (photo_id, tag_id)
VALUES (?, ?)
''', (photo_id, tag_id))
def unlink_photo_tag(self, photo_id: int, tag_id: int):
"""Unlink a photo from a tag"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM phototaglinkage
WHERE photo_id = ? AND tag_id = ?
''', (photo_id, tag_id))
def get_photos_by_pattern(self, pattern: str = None, limit: int = 10) -> List[Tuple]:
"""Get photos matching a pattern"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
if pattern:
cursor.execute('''
SELECT id, path, filename, date_taken, processed
FROM photos
WHERE filename LIKE ? OR path LIKE ?
ORDER BY date_added DESC
LIMIT ?
''', (f'%{pattern}%', f'%{pattern}%', limit))
else:
cursor.execute('''
SELECT id, path, filename, date_taken, processed
FROM photos
ORDER BY date_added DESC
LIMIT ?
''', (limit,))
return cursor.fetchall()
def get_unprocessed_photos(self, limit: Optional[int] = None) -> List[Tuple]:
"""Get unprocessed photos
Args:
limit: Maximum number of photos to return. If None, return all unprocessed photos.
"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
if limit is None:
cursor.execute('''
SELECT id, path, filename, date_taken
FROM photos
WHERE processed = 0
ORDER BY date_added ASC
''')
else:
cursor.execute('''
SELECT id, path, filename, date_taken
FROM photos
WHERE processed = 0
ORDER BY date_added ASC
LIMIT ?
''', (limit,))
return cursor.fetchall()
def get_unidentified_faces(self, limit: int = 20) -> List[Tuple]:
"""Get unidentified faces"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT f.id, f.photo_id, f.location, f.confidence, f.quality_score,
p.path, p.filename
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.person_id IS NULL
ORDER BY f.quality_score DESC, f.confidence DESC
LIMIT ?
''', (limit,))
return cursor.fetchall()
def get_face_encodings(self, face_id: int) -> Optional[bytes]:
"""Get face encoding for a specific face"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT encoding FROM faces WHERE id = ?', (face_id,))
result = cursor.fetchone()
return result[0] if result else None
def get_face_photo_info(self, face_id: int) -> Optional[Tuple]:
"""Get photo information for a specific face including EXIF orientation"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT f.photo_id, p.filename, f.location, f.exif_orientation
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id = ?
''', (face_id,))
result = cursor.fetchone()
return result if result else None
def get_all_face_encodings(self) -> List[Tuple]:
"""Get all face encodings with their IDs"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT id, encoding, person_id, quality_score FROM faces')
return cursor.fetchall()
def get_person_encodings(self, person_id: int, min_quality: float = 0.3) -> List[Tuple]:
"""Get all encodings for a person above minimum quality"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT pe.encoding, pe.quality_score, pe.face_id
FROM person_encodings pe
WHERE pe.person_id = ? AND pe.quality_score >= ?
ORDER BY pe.quality_score DESC
''', (person_id, min_quality))
return cursor.fetchall()
def add_person_encoding(self, person_id: int, face_id: int, encoding: bytes,
quality_score: float,
detector_backend: str = 'retinaface',
model_name: str = 'ArcFace'):
"""Add a person encoding
Args:
person_id: ID of the person
face_id: ID of the face this encoding came from
encoding: Face encoding as bytes
quality_score: Quality score 0.0-1.0
detector_backend: DeepFace detector used
model_name: DeepFace model used
"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO person_encodings (person_id, face_id, encoding, quality_score,
detector_backend, model_name)
VALUES (?, ?, ?, ?, ?, ?)
''', (person_id, face_id, encoding, quality_score, detector_backend, model_name))
def update_person_encodings(self, person_id: int):
"""Update person encodings by removing old ones and adding current face encodings"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Remove old encodings
cursor.execute('DELETE FROM person_encodings WHERE person_id = ?', (person_id,))
# Add current face encodings
cursor.execute('''
INSERT INTO person_encodings (person_id, face_id, encoding, quality_score)
SELECT ?, id, encoding, quality_score
FROM faces
WHERE person_id = ? AND quality_score >= 0.3
''', (person_id, person_id))
def get_similar_faces(self, face_id: int, tolerance: float = 0.6,
include_same_photo: bool = False) -> List[Dict]:
"""Get faces similar to the given face ID"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Get the target face encoding and photo
cursor.execute('''
SELECT f.encoding, f.photo_id, p.path, p.filename
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id = ?
''', (face_id,))
target_result = cursor.fetchone()
if not target_result:
return []
target_encoding = target_result[0]
target_photo_id = target_result[1]
target_path = target_result[2]
target_filename = target_result[3]
# Get all other faces
if include_same_photo:
cursor.execute('''
SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence,
p.path, p.filename, f.photo_id
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id != ?
''', (face_id,))
else:
cursor.execute('''
SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence,
p.path, p.filename, f.photo_id
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id != ? AND f.photo_id != ?
''', (face_id, target_photo_id))
return cursor.fetchall()
def get_statistics(self) -> Dict:
"""Get database statistics"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
stats = {}
# Photo statistics
cursor.execute('SELECT COUNT(*) FROM photos')
stats['total_photos'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM photos WHERE processed = 1')
stats['processed_photos'] = cursor.fetchone()[0]
# Face statistics
cursor.execute('SELECT COUNT(*) FROM faces')
stats['total_faces'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL')
stats['identified_faces'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NULL')
stats['unidentified_faces'] = cursor.fetchone()[0]
# People statistics
cursor.execute('SELECT COUNT(*) FROM people')
stats['total_people'] = cursor.fetchone()[0]
# Tag statistics
cursor.execute('SELECT COUNT(*) FROM tags')
stats['total_tags'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM phototaglinkage')
stats['total_photo_tags'] = cursor.fetchone()[0]
return stats

File diff suppressed because it is too large Load Diff

View File

@ -1,243 +0,0 @@
#!/usr/bin/env python3
"""
Photo scanning, metadata extraction, and file operations for PunimTag
"""
import os
from pathlib import Path
from PIL import Image
from datetime import datetime
from typing import Optional, List, Tuple
from src.core.config import SUPPORTED_IMAGE_FORMATS
from src.core.database import DatabaseManager
from src.utils.path_utils import normalize_path, validate_path_exists
class PhotoManager:
"""Handles photo scanning, metadata extraction, and file operations"""
def __init__(self, db_manager: DatabaseManager, verbose: int = 0):
"""Initialize photo manager"""
self.db = db_manager
self.verbose = verbose
def extract_photo_date(self, photo_path: str) -> Optional[str]:
"""Extract date taken from photo with fallback to file modification time.
Tries in order:
1. EXIF date tags (DateTimeOriginal, DateTimeDigitized, DateTime)
2. File modification time (as fallback)
"""
# First try EXIF date extraction
try:
with Image.open(photo_path) as image:
exifdata = image.getexif()
# Look for date taken in EXIF tags
date_tags = [
36867, # DateTimeOriginal - when photo was actually taken (highest priority)
36868, # DateTimeDigitized - when photo was digitized
306, # DateTime - file modification date (lowest priority)
]
for tag_id in date_tags:
if tag_id in exifdata:
date_str = exifdata[tag_id]
if date_str:
# Parse EXIF date format (YYYY:MM:DD HH:MM:SS)
try:
date_obj = datetime.strptime(date_str, '%Y:%m:%d %H:%M:%S')
return date_obj.strftime('%Y-%m-%d')
except ValueError:
# Try alternative format
try:
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
return date_obj.strftime('%Y-%m-%d')
except ValueError:
continue
except Exception as e:
if self.verbose >= 2:
print(f" ⚠️ Could not extract EXIF date from {os.path.basename(photo_path)}: {e}")
# Fallback to file modification time
try:
if os.path.exists(photo_path):
mtime = os.path.getmtime(photo_path)
mtime_date = datetime.fromtimestamp(mtime)
return mtime_date.strftime('%Y-%m-%d')
except Exception as e:
if self.verbose >= 2:
print(f" ⚠️ Could not get file modification time from {os.path.basename(photo_path)}: {e}")
return None
def scan_folder(self, folder_path: str, recursive: bool = True) -> int:
"""Scan folder for photos and add to database"""
# Normalize path to absolute path
try:
folder_path = normalize_path(folder_path)
except ValueError as e:
print(f"❌ Invalid path: {e}")
return 0
if not validate_path_exists(folder_path):
print(f"❌ Folder not found or not accessible: {folder_path}")
return 0
found_photos = []
if recursive:
for root, dirs, files in os.walk(folder_path):
for file in files:
file_ext = Path(file).suffix.lower()
if file_ext in SUPPORTED_IMAGE_FORMATS:
photo_path = os.path.join(root, file)
found_photos.append((photo_path, file))
else:
for file in os.listdir(folder_path):
file_ext = Path(file).suffix.lower()
if file_ext in SUPPORTED_IMAGE_FORMATS:
photo_path = os.path.join(folder_path, file)
found_photos.append((photo_path, file))
if not found_photos:
print(f"📁 No photos found in {folder_path}")
return 0
# Add to database
added_count = 0
existing_count = 0
for photo_path, filename in found_photos:
try:
# Ensure photo path is absolute
photo_path = normalize_path(photo_path)
# Extract date taken from EXIF data
date_taken = self.extract_photo_date(photo_path)
# Add photo to database (with absolute path)
photo_id = self.db.add_photo(photo_path, filename, date_taken)
if photo_id:
# New photo was added
added_count += 1
if self.verbose >= 2:
date_info = f" (taken: {date_taken})" if date_taken else " (no date)"
print(f" 📸 Added: {filename}{date_info}")
else:
# Photo already exists
existing_count += 1
if self.verbose >= 2:
print(f" 📸 Already exists: {filename}")
except Exception as e:
print(f"⚠️ Error adding {filename}: {e}")
# Print summary
if added_count > 0 and existing_count > 0:
print(f"📁 Found {len(found_photos)} photos: {added_count} new, {existing_count} already in database")
elif added_count > 0:
print(f"📁 Found {len(found_photos)} photos, added {added_count} new photos")
elif existing_count > 0:
print(f"📁 Found {len(found_photos)} photos, all already in database")
else:
print(f"📁 Found {len(found_photos)} photos, none could be added")
return added_count
def get_photo_info(self, photo_id: int) -> Optional[Tuple]:
"""Get photo information by ID"""
photos = self.db.get_photos_by_pattern(limit=1000) # Get all photos
for photo in photos:
if photo[0] == photo_id: # photo[0] is the ID
return photo
return None
def get_photo_path(self, photo_id: int) -> Optional[str]:
"""Get photo path by ID"""
photo_info = self.get_photo_info(photo_id)
return photo_info[1] if photo_info else None # photo[1] is the path
def get_photo_filename(self, photo_id: int) -> Optional[str]:
"""Get photo filename by ID"""
photo_info = self.get_photo_info(photo_id)
return photo_info[2] if photo_info else None # photo[2] is the filename
def is_photo_processed(self, photo_id: int) -> bool:
"""Check if photo has been processed for faces"""
photo_info = self.get_photo_info(photo_id)
return photo_info[4] if photo_info else False # photo[4] is the processed flag
def mark_photo_processed(self, photo_id: int):
"""Mark a photo as processed"""
self.db.mark_photo_processed(photo_id)
def get_photos_by_date_range(self, date_from: str = None, date_to: str = None) -> List[Tuple]:
"""Get photos within a date range"""
# This would need to be implemented in the database module
# For now, return all photos
return self.db.get_photos_by_pattern()
def get_photos_by_pattern(self, pattern: str = None, limit: int = 10) -> List[Tuple]:
"""Get photos matching a pattern"""
return self.db.get_photos_by_pattern(pattern, limit)
def validate_photo_file(self, photo_path: str) -> bool:
"""Validate that a photo file exists and is readable"""
if not os.path.exists(photo_path):
return False
try:
with Image.open(photo_path) as image:
image.verify()
return True
except Exception:
return False
def get_photo_dimensions(self, photo_path: str) -> Optional[Tuple[int, int]]:
"""Get photo dimensions (width, height)"""
try:
with Image.open(photo_path) as image:
return image.size
except Exception:
return None
def get_photo_format(self, photo_path: str) -> Optional[str]:
"""Get photo format"""
try:
with Image.open(photo_path) as image:
return image.format
except Exception:
return None
def get_photo_exif_data(self, photo_path: str) -> dict:
"""Get EXIF data from photo"""
try:
with Image.open(photo_path) as image:
exifdata = image.getexif()
return dict(exifdata)
except Exception:
return {}
def get_photo_file_size(self, photo_path: str) -> Optional[int]:
"""Get photo file size in bytes"""
try:
return os.path.getsize(photo_path)
except Exception:
return None
def get_photo_creation_time(self, photo_path: str) -> Optional[datetime]:
"""Get photo file creation time"""
try:
timestamp = os.path.getctime(photo_path)
return datetime.fromtimestamp(timestamp)
except Exception:
return None
def get_photo_modification_time(self, photo_path: str) -> Optional[datetime]:
"""Get photo file modification time"""
try:
timestamp = os.path.getmtime(photo_path)
return datetime.fromtimestamp(timestamp)
except Exception:
return None

View File

@ -1,453 +0,0 @@
#!/usr/bin/env python3
"""
Search functionality and statistics for PunimTag
"""
from typing import List, Dict, Tuple, Optional
from src.core.database import DatabaseManager
class SearchStats:
"""Handles search functionality and statistics generation"""
def __init__(self, db_manager: DatabaseManager, verbose: int = 0):
"""Initialize search and stats manager"""
self.db = db_manager
self.verbose = verbose
def search_faces(self, person_name: str) -> List[Tuple[str, str]]:
"""Search for photos containing a specific person by name (partial, case-insensitive).
Returns a list of tuples: (photo_path, person_full_name).
"""
# Get all people matching the name
people = self.db.show_people_list()
matching_people = []
search_name = (person_name or "").strip().lower()
if not search_name:
return []
for person in people:
person_id, first_name, last_name, middle_name, maiden_name, date_of_birth, created_date = person
full_name = f"{first_name or ''} {last_name or ''}".strip().lower()
# Check if search term matches any part of the name
if (
(full_name and search_name in full_name) or
(first_name and search_name in first_name.lower()) or
(last_name and search_name in last_name.lower()) or
(middle_name and search_name in middle_name.lower()) or
(maiden_name and search_name in maiden_name.lower())
):
matching_people.append(person_id)
if not matching_people:
return []
# Fetch photo paths for each matching person using database helper if available
results: List[Tuple[str, str]] = []
try:
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
# faces.person_id links to photos via faces.photo_id
placeholders = ",".join(["?"] * len(matching_people))
cursor.execute(
f"""
SELECT DISTINCT p.path, pe.first_name, pe.last_name
FROM faces f
JOIN photos p ON p.id = f.photo_id
JOIN people pe ON pe.id = f.person_id
WHERE f.person_id IN ({placeholders})
ORDER BY pe.last_name, pe.first_name, p.path
""",
tuple(matching_people),
)
for row in cursor.fetchall():
if row and row[0]:
path = row[0]
first = (row[1] or "").strip()
last = (row[2] or "").strip()
full_name = (f"{first} {last}").strip() or "Unknown"
results.append((path, full_name))
except Exception:
# Fall back gracefully if schema differs
pass
return results
def get_statistics(self) -> Dict:
"""Get comprehensive database statistics"""
stats = self.db.get_statistics()
# Add calculated statistics
if stats['total_photos'] > 0:
stats['processing_percentage'] = (stats['processed_photos'] / stats['total_photos']) * 100
else:
stats['processing_percentage'] = 0
if stats['total_faces'] > 0:
stats['identification_percentage'] = (stats['identified_faces'] / stats['total_faces']) * 100
else:
stats['identification_percentage'] = 0
if stats['total_people'] > 0:
stats['faces_per_person'] = stats['identified_faces'] / stats['total_people']
else:
stats['faces_per_person'] = 0
if stats['total_photos'] > 0:
stats['faces_per_photo'] = stats['total_faces'] / stats['total_photos']
else:
stats['faces_per_photo'] = 0
if stats['total_photos'] > 0:
stats['tags_per_photo'] = stats['total_photo_tags'] / stats['total_photos']
else:
stats['tags_per_photo'] = 0
return stats
def print_statistics(self):
"""Print formatted statistics to console"""
stats = self.get_statistics()
print("\n📊 PunimTag Database Statistics")
print("=" * 50)
print(f"📸 Photos:")
print(f" Total photos: {stats['total_photos']}")
print(f" Processed: {stats['processed_photos']} ({stats['processing_percentage']:.1f}%)")
print(f" Unprocessed: {stats['total_photos'] - stats['processed_photos']}")
print(f"\n👤 Faces:")
print(f" Total faces: {stats['total_faces']}")
print(f" Identified: {stats['identified_faces']} ({stats['identification_percentage']:.1f}%)")
print(f" Unidentified: {stats['unidentified_faces']}")
print(f"\n👥 People:")
print(f" Total people: {stats['total_people']}")
print(f" Average faces per person: {stats['faces_per_person']:.1f}")
print(f"\n🏷️ Tags:")
print(f" Total tags: {stats['total_tags']}")
print(f" Total photo-tag links: {stats['total_photo_tags']}")
print(f" Average tags per photo: {stats['tags_per_photo']:.1f}")
print(f"\n📈 Averages:")
print(f" Faces per photo: {stats['faces_per_photo']:.1f}")
print(f" Tags per photo: {stats['tags_per_photo']:.1f}")
print("=" * 50)
def get_photo_statistics(self) -> Dict:
"""Get detailed photo statistics"""
stats = self.get_statistics()
# This could be expanded with more detailed photo analysis
return {
'total_photos': stats['total_photos'],
'processed_photos': stats['processed_photos'],
'unprocessed_photos': stats['total_photos'] - stats['processed_photos'],
'processing_percentage': stats['processing_percentage']
}
def get_face_statistics(self) -> Dict:
"""Get detailed face statistics"""
stats = self.get_statistics()
return {
'total_faces': stats['total_faces'],
'identified_faces': stats['identified_faces'],
'unidentified_faces': stats['unidentified_faces'],
'identification_percentage': stats['identification_percentage'],
'faces_per_photo': stats['faces_per_photo']
}
def get_people_statistics(self) -> Dict:
"""Get detailed people statistics"""
stats = self.get_statistics()
return {
'total_people': stats['total_people'],
'faces_per_person': stats['faces_per_person']
}
def get_tag_statistics(self) -> Dict:
"""Get detailed tag statistics"""
stats = self.get_statistics()
return {
'total_tags': stats['total_tags'],
'total_photo_tags': stats['total_photo_tags'],
'tags_per_photo': stats['tags_per_photo']
}
def search_photos_by_date(self, date_from: str = None, date_to: str = None) -> List[Tuple[str, str]]:
"""Search photos by date range.
Args:
date_from: Start date in YYYY-MM-DD format (inclusive)
date_to: End date in YYYY-MM-DD format (inclusive)
Returns:
List of tuples: (photo_path, date_taken)
"""
try:
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
# Build the query based on provided date parameters
if date_from and date_to:
# Both dates provided - search within range
query = '''
SELECT path, date_taken
FROM photos
WHERE date_taken IS NOT NULL
AND date_taken >= ? AND date_taken <= ?
ORDER BY date_taken DESC, filename
'''
cursor.execute(query, (date_from, date_to))
elif date_from:
# Only start date provided - search from date onwards
query = '''
SELECT path, date_taken
FROM photos
WHERE date_taken IS NOT NULL
AND date_taken >= ?
ORDER BY date_taken DESC, filename
'''
cursor.execute(query, (date_from,))
elif date_to:
# Only end date provided - search up to date
query = '''
SELECT path, date_taken
FROM photos
WHERE date_taken IS NOT NULL
AND date_taken <= ?
ORDER BY date_taken DESC, filename
'''
cursor.execute(query, (date_to,))
else:
# No dates provided - return all photos with date_taken
query = '''
SELECT path, date_taken
FROM photos
WHERE date_taken IS NOT NULL
ORDER BY date_taken DESC, filename
'''
cursor.execute(query)
results = cursor.fetchall()
return [(row[0], row[1]) for row in results]
except Exception as e:
if self.verbose >= 1:
print(f"Error searching photos by date: {e}")
return []
def search_photos_by_tags(self, tags: List[str], match_all: bool = False) -> List[Tuple]:
"""Search photos by tags
Args:
tags: List of tag names to search for
match_all: If True, photos must have ALL tags. If False, photos with ANY tag.
Returns:
List of tuples: (photo_path, tag_info)
"""
if not tags:
return []
# Get tag IDs for the provided tag names (case-insensitive)
tag_id_to_name, tag_name_to_id = self.db.load_tag_mappings()
tag_ids = []
for tag_name in tags:
# Convert to lowercase for case-insensitive lookup
normalized_tag_name = tag_name.lower().strip()
if normalized_tag_name in tag_name_to_id:
tag_ids.append(tag_name_to_id[normalized_tag_name])
if not tag_ids:
return []
results = []
try:
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
if match_all:
# Photos that have ALL specified tags
placeholders = ",".join(["?"] * len(tag_ids))
cursor.execute(f'''
SELECT p.path, GROUP_CONCAT(t.tag_name, ', ') as tag_names
FROM photos p
JOIN phototaglinkage ptl ON p.id = ptl.photo_id
JOIN tags t ON ptl.tag_id = t.id
WHERE ptl.tag_id IN ({placeholders})
GROUP BY p.id, p.path
HAVING COUNT(DISTINCT ptl.tag_id) = ?
ORDER BY p.path
''', tuple(tag_ids) + (len(tag_ids),))
else:
# Photos that have ANY of the specified tags
placeholders = ",".join(["?"] * len(tag_ids))
cursor.execute(f'''
SELECT DISTINCT p.path, GROUP_CONCAT(t.tag_name, ', ') as tag_names
FROM photos p
JOIN phototaglinkage ptl ON p.id = ptl.photo_id
JOIN tags t ON ptl.tag_id = t.id
WHERE ptl.tag_id IN ({placeholders})
GROUP BY p.id, p.path
ORDER BY p.path
''', tuple(tag_ids))
for row in cursor.fetchall():
if row and row[0]:
results.append((row[0], row[1] or ""))
except Exception as e:
if self.verbose > 0:
print(f"Error searching photos by tags: {e}")
return results
def search_photos_by_people(self, people: List[str]) -> List[Tuple]:
"""Search photos by people"""
# This would need to be implemented in the database module
# For now, return empty list
return []
def get_most_common_tags(self, limit: int = 10) -> List[Tuple[str, int]]:
"""Get most commonly used tags"""
# This would need to be implemented in the database module
# For now, return empty list
return []
def get_most_photographed_people(self, limit: int = 10) -> List[Tuple[str, int]]:
"""Get most photographed people"""
# This would need to be implemented in the database module
# For now, return empty list
return []
def get_photos_without_faces(self) -> List[Tuple]:
"""Get photos that have no detected faces
Only includes processed photos (photos that have been processed for face detection).
Returns:
List of tuples: (photo_path, filename)
"""
results = []
try:
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
# Find photos that have no faces associated with them
# Only include processed photos
cursor.execute('''
SELECT p.path, p.filename
FROM photos p
LEFT JOIN faces f ON p.id = f.photo_id
WHERE f.photo_id IS NULL
AND p.processed = 1
ORDER BY p.filename
''')
for row in cursor.fetchall():
if row and row[0]:
results.append((row[0], row[1]))
except Exception as e:
if self.verbose > 0:
print(f"Error searching photos without faces: {e}")
return results
def get_photos_without_tags(self) -> List[Tuple]:
"""Get photos that have no tags
Returns:
List of tuples: (photo_path, filename)
"""
results = []
try:
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
# Find photos that have no tags associated with them
cursor.execute('''
SELECT p.path, p.filename
FROM photos p
LEFT JOIN phototaglinkage ptl ON p.id = ptl.photo_id
WHERE ptl.photo_id IS NULL
ORDER BY p.filename
''')
for row in cursor.fetchall():
if row and row[0]:
results.append((row[0], row[1]))
except Exception as e:
if self.verbose > 0:
print(f"Error searching photos without tags: {e}")
return results
def get_duplicate_faces(self, tolerance: float = 0.6) -> List[Dict]:
"""Get potential duplicate faces (same person, different photos)"""
# This would need to be implemented using face matching
# For now, return empty list
return []
def get_face_quality_distribution(self) -> Dict:
"""Get distribution of face quality scores"""
# This would need to be implemented in the database module
# For now, return empty dict
return {}
def get_processing_timeline(self) -> List[Tuple[str, int]]:
"""Get timeline of photo processing (photos processed per day)"""
# This would need to be implemented in the database module
# For now, return empty list
return []
def export_statistics(self, filename: str = "punimtag_stats.json"):
"""Export statistics to a JSON file"""
import json
stats = self.get_statistics()
try:
with open(filename, 'w') as f:
json.dump(stats, f, indent=2)
print(f"✅ Statistics exported to {filename}")
except Exception as e:
print(f"❌ Error exporting statistics: {e}")
def generate_report(self) -> str:
"""Generate a text report of statistics"""
stats = self.get_statistics()
report = f"""
PunimTag Database Report
Generated: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
PHOTO STATISTICS:
- Total photos: {stats['total_photos']}
- Processed: {stats['processed_photos']} ({stats['processing_percentage']:.1f}%)
- Unprocessed: {stats['total_photos'] - stats['processed_photos']}
FACE STATISTICS:
- Total faces: {stats['total_faces']}
- Identified: {stats['identified_faces']} ({stats['identification_percentage']:.1f}%)
- Unidentified: {stats['unidentified_faces']}
- Average faces per photo: {stats['faces_per_photo']:.1f}
PEOPLE STATISTICS:
- Total people: {stats['total_people']}
- Average faces per person: {stats['faces_per_person']:.1f}
TAG STATISTICS:
- Total tags: {stats['total_tags']}
- Total photo-tag links: {stats['total_photo_tags']}
- Average tags per photo: {stats['tags_per_photo']:.1f}
"""
return report

View File

@ -1,266 +0,0 @@
#!/usr/bin/env python3
"""
Tag management functionality for PunimTag
"""
from typing import List, Dict, Tuple, Optional
from src.core.config import DEFAULT_BATCH_SIZE
from src.core.database import DatabaseManager
class TagManager:
"""Handles photo tagging and tag management operations"""
def __init__(self, db_manager: DatabaseManager, verbose: int = 0):
"""Initialize tag manager"""
self.db = db_manager
self.verbose = verbose
def deduplicate_tags(self, tag_list: List[str]) -> List[str]:
"""Remove duplicate tags from a list while preserving order (case insensitive)"""
seen = set()
unique_tags = []
for tag in tag_list:
if tag.lower() not in seen:
seen.add(tag.lower())
unique_tags.append(tag)
return unique_tags
def parse_tags_string(self, tags_string: str) -> List[str]:
"""Parse a comma-separated tags string into a list, handling empty strings and whitespace"""
if not tags_string or tags_string.strip() == "":
return []
# Split by comma and strip whitespace from each tag
tags = [tag.strip() for tag in tags_string.split(",")]
# Remove empty strings that might result from splitting
return [tag for tag in tags if tag]
def add_tags_to_photos(self, photo_pattern: str = None, batch_size: int = DEFAULT_BATCH_SIZE) -> int:
"""Add custom tags to photos via command line interface"""
if photo_pattern:
photos = self.db.get_photos_by_pattern(photo_pattern, batch_size)
else:
photos = self.db.get_photos_by_pattern(limit=batch_size)
if not photos:
print("No photos found")
return 0
print(f"🏷️ Tagging {len(photos)} photos (enter comma-separated tags)")
tagged_count = 0
for photo_id, photo_path, filename, date_taken, processed in photos:
print(f"\n📸 {filename}")
tags_input = input("🏷️ Tags: ").strip()
if tags_input.lower() == 'q':
break
if tags_input:
tags = self.parse_tags_string(tags_input)
tags = self.deduplicate_tags(tags)
for tag_name in tags:
# Add tag to database and get its ID
tag_id = self.db.add_tag(tag_name)
if tag_id:
# Link photo to tag
self.db.link_photo_tag(photo_id, tag_id)
print(f" ✅ Added {len(tags)} tags")
tagged_count += 1
print(f"✅ Tagged {tagged_count} photos")
return tagged_count
def add_tags_to_photo(self, photo_id: int, tags: List[str]) -> int:
"""Add tags to a specific photo"""
if not tags:
return 0
tags = self.deduplicate_tags(tags)
added_count = 0
for tag_name in tags:
# Add tag to database and get its ID
tag_id = self.db.add_tag(tag_name)
if tag_id:
# Link photo to tag
self.db.link_photo_tag(photo_id, tag_id)
added_count += 1
return added_count
def remove_tags_from_photo(self, photo_id: int, tags: List[str]) -> int:
"""Remove tags from a specific photo"""
if not tags:
return 0
removed_count = 0
tag_id_to_name, tag_name_to_id = self.db.load_tag_mappings()
for tag_name in tags:
if tag_name in tag_name_to_id:
tag_id = tag_name_to_id[tag_name]
self.db.unlink_photo_tag(photo_id, tag_id)
removed_count += 1
return removed_count
def get_photo_tags(self, photo_id: int) -> List[str]:
"""Get all tags for a specific photo"""
tag_ids = self.db.get_existing_tag_ids_for_photo(photo_id)
tag_id_to_name, _ = self.db.load_tag_mappings()
tags = []
for tag_id in tag_ids:
tag_name = self.db.get_tag_name_by_id(tag_id, tag_id_to_name)
tags.append(tag_name)
return tags
def get_all_tags(self) -> List[Tuple[int, str]]:
"""Get all tags in the database"""
tag_id_to_name, _ = self.db.load_tag_mappings()
return [(tag_id, tag_name) for tag_id, tag_name in tag_id_to_name.items()]
def get_photos_with_tag(self, tag_name: str) -> List[Tuple]:
"""Get all photos that have a specific tag"""
tag_id_to_name, tag_name_to_id = self.db.load_tag_mappings()
if tag_name not in tag_name_to_id:
return []
tag_id = tag_name_to_id[tag_name]
# This would need to be implemented in the database module
# For now, return empty list
return []
def get_tag_statistics(self) -> Dict:
"""Get tag usage statistics"""
tag_id_to_name, _ = self.db.load_tag_mappings()
stats = {
'total_tags': len(tag_id_to_name),
'tag_usage': {}
}
# Count usage for each tag
for tag_id, tag_name in tag_id_to_name.items():
# This would need to be implemented in the database module
# For now, set usage to 0
stats['tag_usage'][tag_name] = 0
return stats
def delete_tag(self, tag_name: str) -> bool:
"""Delete a tag from the database (and all its linkages)"""
tag_id_to_name, tag_name_to_id = self.db.load_tag_mappings()
if tag_name not in tag_name_to_id:
return False
tag_id = tag_name_to_id[tag_name]
# This would need to be implemented in the database module
# For now, return False
return False
def rename_tag(self, old_name: str, new_name: str) -> bool:
"""Rename a tag"""
tag_id_to_name, tag_name_to_id = self.db.load_tag_mappings()
if old_name not in tag_name_to_id:
return False
if new_name in tag_name_to_id:
return False # New name already exists
tag_id = tag_name_to_id[old_name]
# This would need to be implemented in the database module
# For now, return False
return False
def merge_tags(self, source_tag: str, target_tag: str) -> bool:
"""Merge one tag into another (move all linkages from source to target)"""
tag_id_to_name, tag_name_to_id = self.db.load_tag_mappings()
if source_tag not in tag_name_to_id or target_tag not in tag_name_to_id:
return False
source_tag_id = tag_name_to_id[source_tag]
target_tag_id = tag_name_to_id[target_tag]
# This would need to be implemented in the database module
# For now, return False
return False
def get_photos_by_tags(self, tags: List[str], match_all: bool = False) -> List[Tuple]:
"""Get photos that have any (or all) of the specified tags"""
if not tags:
return []
tag_id_to_name, tag_name_to_id = self.db.load_tag_mappings()
tag_ids = []
for tag_name in tags:
# Convert to lowercase for case-insensitive lookup
normalized_tag_name = tag_name.lower().strip()
if normalized_tag_name in tag_name_to_id:
tag_ids.append(tag_name_to_id[normalized_tag_name])
if not tag_ids:
return []
# This would need to be implemented in the database module
# For now, return empty list
return []
def get_common_tags(self, photo_ids: List[int]) -> List[str]:
"""Get tags that are common to all specified photos"""
if not photo_ids:
return []
# Get tags for each photo
all_photo_tags = []
for photo_id in photo_ids:
tags = self.get_photo_tags(photo_id)
all_photo_tags.append(set(tags))
if not all_photo_tags:
return []
# Find intersection of all tag sets
common_tags = set.intersection(*all_photo_tags)
return list(common_tags)
def get_suggested_tags(self, photo_id: int, limit: int = 5) -> List[str]:
"""Get suggested tags based on similar photos"""
# This is a placeholder for tag suggestion logic
# Could be implemented based on:
# - Tags from photos in the same folder
# - Tags from photos taken on the same date
# - Most commonly used tags
# - Machine learning based suggestions
return []
def validate_tag_name(self, tag_name: str) -> Tuple[bool, str]:
"""Validate a tag name and return (is_valid, error_message)"""
if not tag_name or not tag_name.strip():
return False, "Tag name cannot be empty"
tag_name = tag_name.strip()
if len(tag_name) > 50:
return False, "Tag name is too long (max 50 characters)"
if ',' in tag_name:
return False, "Tag name cannot contain commas"
if tag_name.lower() in ['all', 'none', 'untagged']:
return False, "Tag name is reserved"
return True, ""

View File

@ -1,161 +0,0 @@
#!/usr/bin/env python3
"""
PunimTag CLI Setup Script
Simple setup for the minimal photo tagger
"""
import os
import sys
import subprocess
from pathlib import Path
def check_python_version():
"""Check if Python version is compatible"""
if sys.version_info < (3, 7):
print("❌ Python 3.7+ is required")
return False
print(f"✅ Python {sys.version_info.major}.{sys.version_info.minor} detected")
return True
def install_system_dependencies():
"""Install system-level packages required for compilation and runtime"""
print("🔧 Installing system dependencies...")
print(" (Build tools, libraries, and image viewer)")
# Check if we're on a Debian/Ubuntu system
if Path("/usr/bin/apt").exists():
try:
# Install required system packages for building Python packages and running tools
packages = [
"cmake", "build-essential", "libopenblas-dev", "liblapack-dev",
"libx11-dev", "libgtk-3-dev", "libboost-python-dev", "feh"
]
print(f"📦 Installing packages: {', '.join(packages)}")
subprocess.run([
"sudo", "apt", "install", "-y"
] + packages, check=True)
print("✅ System dependencies installed successfully")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Failed to install system dependencies: {e}")
print(" You may need to run: sudo apt update")
return False
else:
print("⚠️ System dependency installation not supported on this platform")
print(" Please install manually:")
print(" - cmake, build-essential")
print(" - libopenblas-dev, liblapack-dev")
print(" - libx11-dev, libgtk-3-dev, libboost-python-dev")
print(" - feh (image viewer)")
return True
def install_requirements():
"""Install Python requirements"""
requirements_file = Path("requirements.txt")
if not requirements_file.exists():
print("❌ requirements.txt not found!")
return False
print("📦 Installing Python dependencies...")
try:
subprocess.run([
sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt'
], check=True)
print("✅ Dependencies installed successfully")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Failed to install dependencies: {e}")
return False
def create_directories():
"""Create necessary directories"""
directories = ['data', 'logs']
for directory in directories:
Path(directory).mkdir(exist_ok=True)
print(f"✅ Created directory: {directory}")
def test_installation():
"""Test if DeepFace face recognition works"""
print("🧪 Testing DeepFace face recognition installation...")
try:
from deepface import DeepFace
import numpy as np
from PIL import Image
import tensorflow as tf
print("✅ All required modules imported successfully")
return True
except ImportError as e:
print(f"❌ Import error: {e}")
return False
def main():
"""Main setup function"""
print("🚀 PunimTag CLI Setup")
print("=" * 40)
# Check Python version
if not check_python_version():
return 1
# Check if we're in a virtual environment (recommended)
if sys.prefix == sys.base_prefix:
print("⚠️ Not in a virtual environment!")
print(" Recommended: python -m venv venv && source venv/bin/activate")
response = input(" Continue anyway? (y/N): ").strip().lower()
if response != 'y':
print("Setup cancelled. Create a virtual environment first.")
return 1
else:
print("✅ Virtual environment detected")
print()
# Install system dependencies
if not install_system_dependencies():
return 1
print()
# Create directories
print("📁 Creating directories...")
create_directories()
print()
# Install requirements
if not install_requirements():
return 1
print()
# Test installation
if not test_installation():
print("⚠️ Installation test failed. You may need to install additional dependencies.")
print(" For Ubuntu/Debian: sudo apt-get install build-essential cmake")
print(" For macOS: brew install cmake")
return 1
print()
print("✅ Setup complete!")
print()
print("🎯 Quick Start:")
print(" 1. Start the web server: python run_api_with_worker.sh")
print(" 2. Access the web interface at http://localhost:8000")
print(" 3. Use the web UI to scan, process, and identify faces")
print()
print("📖 For more information, see README.md")
print()
print("⚠️ IMPORTANT: Always activate virtual environment first!")
print(" source venv/bin/activate")
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -189,7 +189,57 @@ def get_current_user_info(
# Check if user exists in main database to get admin status
user = db.query(User).filter(User.username == username).first()
is_admin = user.is_admin if user else False
# If user doesn't exist in main database, check if we should bootstrap them
if not user:
# Check if any admin users exist
admin_count = db.query(User).filter(User.is_admin == True).count()
# If no admins exist, bootstrap current user as admin
if admin_count == 0:
from src.web.utils.password import hash_password
# Generate unique email to avoid conflicts
base_email = f"{username}@example.com"
email = base_email
counter = 1
# Ensure email is unique
while db.query(User).filter(User.email == email).first():
email = f"{username}+{counter}@example.com"
counter += 1
# Create user as admin for bootstrap (they should change password)
default_password_hash = hash_password("changeme")
try:
user = User(
username=username,
password_hash=default_password_hash,
email=email,
full_name=username,
is_active=True,
is_admin=True,
)
db.add(user)
db.commit()
db.refresh(user)
is_admin = True
except Exception:
# If creation fails (e.g., race condition), try to get existing user
db.rollback()
user = db.query(User).filter(User.username == username).first()
if user:
# Update existing user to be admin if no admins exist
if not user.is_admin:
user.is_admin = True
db.commit()
db.refresh(user)
is_admin = user.is_admin
else:
is_admin = False
else:
is_admin = False
else:
is_admin = user.is_admin if user else False
return UserResponse(username=username, is_admin=is_admin)

View File

@ -16,7 +16,7 @@ from sqlalchemy.orm import Session
from src.web.db.session import get_auth_db, get_db
from src.web.api.users import get_current_admin_user
from src.web.api.auth import get_current_user
from src.web.services.photo_service import import_photo_from_path
from src.web.services.photo_service import import_photo_from_path, calculate_file_hash
from src.web.settings import PHOTO_STORAGE_DIR
router = APIRouter(prefix="/pending-photos", tags=["pending-photos"])
@ -78,6 +78,7 @@ class ReviewResponse(BaseModel):
approved: int
rejected: int
errors: list[str]
warnings: list[str] = [] # Informational messages (e.g., duplicates)
@router.get("", response_model=PendingPhotosListResponse)
@ -264,6 +265,7 @@ def review_pending_photos(
approved_count = 0
rejected_count = 0
duplicate_count = 0
errors = []
admin_user_id = current_admin.get("user_id")
now = datetime.utcnow()
@ -314,32 +316,66 @@ def review_pending_photos(
errors.append(f"Photo file not found for pending photo {decision.id}: {source_path}")
continue
# Calculate file hash and check for duplicates BEFORE moving file
try:
file_hash = calculate_file_hash(str(source_path))
except Exception as e:
errors.append(f"Failed to calculate hash for pending photo {decision.id}: {str(e)}")
continue
# Check if photo with same hash already exists in main database
existing_photo = main_db.execute(text("""
SELECT id, path FROM photos WHERE file_hash = :file_hash
"""), {"file_hash": file_hash}).fetchone()
if existing_photo:
# Photo already exists - mark as duplicate and skip import
# Don't add to errors - we'll show a summary message instead
# Update status to rejected with duplicate reason
auth_db.execute(text("""
UPDATE pending_photos
SET status = 'rejected',
reviewed_at = :reviewed_at,
reviewed_by = :reviewed_by,
rejection_reason = 'Duplicate photo already exists in database'
WHERE id = :id
"""), {
"id": decision.id,
"reviewed_at": now,
"reviewed_by": admin_user_id,
})
auth_db.commit()
rejected_count += 1
duplicate_count += 1
continue
# Generate unique filename for main storage to avoid conflicts
file_ext = source_path.suffix
unique_filename = f"{uuid.uuid4()}{file_ext}"
dest_path = main_storage_dir / unique_filename
# Move file to main storage
# Copy file to main storage (keep original in shared location)
try:
shutil.move(str(source_path), str(dest_path))
shutil.copy2(str(source_path), str(dest_path))
except Exception as e:
errors.append(f"Failed to move photo file for {decision.id}: {str(e)}")
errors.append(f"Failed to copy photo file for {decision.id}: {str(e)}")
continue
# Import photo into main database (Scan process)
# This will also check for duplicates by hash, but we've already checked above
try:
photo, is_new = import_photo_from_path(main_db, str(dest_path))
if not is_new:
# Photo already exists - delete the moved file
# Photo already exists (shouldn't happen due to hash check above, but handle gracefully)
if dest_path.exists():
dest_path.unlink()
errors.append(f"Photo already exists in main database: {photo.path}")
continue
except Exception as e:
# If import fails, try to move file back
# If import fails, delete the copied file (original remains in shared location)
if dest_path.exists():
try:
shutil.move(str(dest_path), str(source_path))
dest_path.unlink()
except:
pass
errors.append(f"Failed to import photo {decision.id} into main database: {str(e)}")
@ -388,9 +424,18 @@ def review_pending_photos(
auth_db.rollback()
main_db.rollback()
# Add friendly message about duplicates if any were found
warnings = []
if duplicate_count > 0:
if duplicate_count == 1:
warnings.append(f"{duplicate_count} photo was not added as it already exists in the database")
else:
warnings.append(f"{duplicate_count} photos were not added as they already exist in the database")
return ReviewResponse(
approved=approved_count,
rejected=rejected_count,
errors=errors
errors=errors,
warnings=warnings
)

26
src/web/config.py Normal file
View File

@ -0,0 +1,26 @@
"""Configuration values used by the PunimTag web services.
This module replaces the legacy desktop configuration to keep the web
application self-contained.
"""
from __future__ import annotations
# Supported image formats for uploads/imports
SUPPORTED_IMAGE_FORMATS = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"}
# DeepFace behavior
DEEPFACE_ENFORCE_DETECTION = False
DEEPFACE_ALIGN_FACES = True
# Face filtering thresholds
MIN_FACE_CONFIDENCE = 0.4
MIN_FACE_SIZE = 40
MAX_FACE_SIZE = 1500
# Matching tolerance and calibration options
DEFAULT_FACE_TOLERANCE = 0.6
USE_CALIBRATED_CONFIDENCE = True
CONFIDENCE_CALIBRATION_METHOD = "empirical" # "empirical", "linear", or "sigmoid"

View File

@ -38,6 +38,7 @@ class Photo(Base):
date_added = Column(DateTime, default=datetime.utcnow, nullable=False)
date_taken = Column(Date, nullable=True, index=True)
processed = Column(Boolean, default=False, nullable=False, index=True)
file_hash = Column(Text, nullable=False, index=True)
faces = relationship("Face", back_populates="photo", cascade="all, delete-orphan")
photo_tags = relationship(
@ -49,6 +50,7 @@ class Photo(Base):
Index("idx_photos_processed", "processed"),
Index("idx_photos_date_taken", "date_taken"),
Index("idx_photos_date_added", "date_added"),
Index("idx_photos_file_hash", "file_hash"),
)

View File

@ -20,12 +20,15 @@ try:
except ImportError:
DEEPFACE_AVAILABLE = False
from src.core.config import (
DEEPFACE_ENFORCE_DETECTION,
from src.web.config import (
CONFIDENCE_CALIBRATION_METHOD,
DEFAULT_FACE_TOLERANCE,
DEEPFACE_ALIGN_FACES,
DEEPFACE_ENFORCE_DETECTION,
MAX_FACE_SIZE,
MIN_FACE_CONFIDENCE,
MIN_FACE_SIZE,
MAX_FACE_SIZE,
USE_CALIBRATED_CONFIDENCE,
)
from src.utils.exif_utils import EXIFOrientationHandler
from src.utils.pose_detection import PoseDetector, RETINAFACE_AVAILABLE
@ -1400,8 +1403,6 @@ def calibrate_confidence(distance: float, tolerance: float = None) -> float:
Returns:
Calibrated confidence percentage (0-100) representing actual match probability
"""
from src.core.config import DEFAULT_FACE_TOLERANCE, USE_CALIBRATED_CONFIDENCE, CONFIDENCE_CALIBRATION_METHOD
if tolerance is None:
tolerance = DEFAULT_FACE_TOLERANCE
@ -1510,7 +1511,6 @@ def find_similar_faces(
Args:
filter_frontal_only: Only return frontal or tilted faces (not profile)
"""
from src.core.config import DEFAULT_FACE_TOLERANCE
from src.web.db.models import Photo
if tolerance is None:
@ -1608,9 +1608,6 @@ def calculate_batch_similarities(
where face_id_1 is from the request list, face_id_2 is from all faces in DB
similarity is in [0,1] range and confidence_pct is in [0,100] range
"""
from src.core.config import DEFAULT_FACE_TOLERANCE
from src.web.db.models import Photo
if not face_ids:
return []
@ -1754,8 +1751,6 @@ def find_auto_match_matches(
List of (person_id, reference_face_id, reference_face, matches) tuples
where matches is list of (face, distance, confidence_pct) tuples
"""
from src.core.config import DEFAULT_FACE_TOLERANCE
if tolerance is None:
tolerance = DEFAULT_FACE_TOLERANCE
@ -1866,10 +1861,6 @@ def get_auto_match_people_list(
Returns:
List of (person_id, reference_face, person_name, face_count) tuples
"""
from src.web.db.models import Person, Photo
from src.core.config import DEFAULT_FACE_TOLERANCE
from sqlalchemy import func, case
if tolerance is None:
tolerance = DEFAULT_FACE_TOLERANCE

View File

@ -2,6 +2,7 @@
from __future__ import annotations
import hashlib
import os
from pathlib import Path
from datetime import datetime, date
@ -10,7 +11,7 @@ from typing import Callable, Optional, Tuple
from PIL import Image
from sqlalchemy.orm import Session
from src.core.config import SUPPORTED_IMAGE_FORMATS
from src.web.config import SUPPORTED_IMAGE_FORMATS
from src.web.db.models import Photo
@ -102,6 +103,30 @@ def extract_exif_date(image_path: str) -> Optional[date]:
return None
def calculate_file_hash(file_path: str) -> str:
"""Calculate SHA256 hash of file content.
Args:
file_path: Path to the file to hash
Returns:
Hexadecimal string representation of SHA256 hash
"""
sha256_hash = hashlib.sha256()
try:
with open(file_path, "rb") as f:
# Read file in chunks to handle large files efficiently
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
except Exception as e:
# Log error for debugging
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to calculate hash for {file_path}: {e}")
raise
def extract_photo_date(image_path: str) -> Optional[date]:
"""Extract date taken from photo with fallback to file modification time.
@ -169,8 +194,18 @@ def import_photo_from_path(
photo_path = os.path.abspath(photo_path)
filename = os.path.basename(photo_path)
# Check if photo already exists by path
existing = db.query(Photo).filter(Photo.path == photo_path).first()
# Calculate file hash for duplicate detection
try:
file_hash = calculate_file_hash(photo_path)
except Exception as e:
# If hash calculation fails, we can't proceed
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to calculate hash for {photo_path}: {e}")
raise
# Check if photo already exists by hash (primary duplicate check)
existing = db.query(Photo).filter(Photo.file_hash == file_hash).first()
if existing:
# If existing photo doesn't have date_taken, try to update it
if existing.date_taken is None:
@ -181,16 +216,33 @@ def import_photo_from_path(
db.refresh(existing)
return existing, False
# Also check by path as fallback (in case hash wasn't set for some reason)
existing_by_path = db.query(Photo).filter(Photo.path == photo_path).first()
if existing_by_path:
# Update hash if missing
if not existing_by_path.file_hash:
existing_by_path.file_hash = file_hash
db.commit()
db.refresh(existing_by_path)
# If existing photo doesn't have date_taken, try to update it
if existing_by_path.date_taken is None:
date_taken = extract_photo_date(photo_path)
if date_taken:
existing_by_path.date_taken = date_taken
db.commit()
db.refresh(existing_by_path)
return existing_by_path, False
# Extract date taken with fallback to file modification time
date_taken = extract_photo_date(photo_path)
# Create new photo record - match desktop schema exactly
# Desktop schema: id, path, filename, date_added, date_taken (DATE), processed
# Create new photo record with file_hash
photo = Photo(
path=photo_path,
filename=filename,
date_taken=date_taken,
processed=False,
file_hash=file_hash,
)
db.add(photo)

View File

@ -1,81 +0,0 @@
#!/usr/bin/env python3
"""
Test script to demonstrate the difference between old and new confidence calculations
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.core.config import DEFAULT_FACE_TOLERANCE, USE_CALIBRATED_CONFIDENCE, CONFIDENCE_CALIBRATION_METHOD
from src.core.face_processing import FaceProcessor
from src.core.database import DatabaseManager
def test_confidence_calibration():
"""Test and compare old vs new confidence calculations"""
print("🔍 Confidence Calibration Test")
print("=" * 50)
# Initialize face processor (we don't need database for this test)
db_manager = DatabaseManager(":memory:") # In-memory database for testing
face_processor = FaceProcessor(db_manager, verbose=1)
# Test different distance values
test_distances = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.2, 1.5, 2.0]
tolerance = DEFAULT_FACE_TOLERANCE
print(f"Tolerance threshold: {tolerance}")
print(f"Calibration enabled: {USE_CALIBRATED_CONFIDENCE}")
print(f"Calibration method: {CONFIDENCE_CALIBRATION_METHOD}")
print()
print("Distance | Old Linear | New Calibrated | Difference | Description")
print("-" * 70)
for distance in test_distances:
# Old linear calculation
old_confidence = (1 - distance) * 100
# New calibrated calculation
new_confidence, description = face_processor._get_calibrated_confidence(distance, tolerance)
difference = new_confidence - old_confidence
print(f"{distance:8.1f} | {old_confidence:10.1f}% | {new_confidence:13.1f}% | {difference:+9.1f}% | {description}")
print()
print("📊 Key Differences:")
print("- Old method: Simple linear transformation (1 - distance) * 100")
print("- New method: Empirical calibration based on DeepFace ArcFace characteristics")
print("- New method provides more realistic match probabilities")
print()
# Test different calibration methods
print("🔧 Testing Different Calibration Methods:")
print("-" * 50)
# Temporarily change calibration method to test different approaches
original_method = CONFIDENCE_CALIBRATION_METHOD
test_distance = 0.4 # Example distance
print(f"Distance: {test_distance}, Tolerance: {tolerance}")
print()
methods = ["linear", "sigmoid", "empirical"]
for method in methods:
# Update the global config (this is just for testing)
import src.core.config
src.core.config.CONFIDENCE_CALIBRATION_METHOD = method
confidence, desc = face_processor._get_calibrated_confidence(test_distance, tolerance)
print(f"{method:10}: {confidence:6.1f}% - {desc}")
# Restore original method
src.core.config.CONFIDENCE_CALIBRATION_METHOD = original_method
if __name__ == "__main__":
test_confidence_calibration()

View File

@ -1,685 +0,0 @@
#!/usr/bin/env python3
"""
DeepFace Integration Test Suite for PunimTag
Tests the complete integration of DeepFace into the application
"""
import os
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Suppress TensorFlow warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import warnings
warnings.filterwarnings('ignore')
from src.core.database import DatabaseManager
from src.core.face_processing import FaceProcessor
from src.core.config import DEEPFACE_DETECTOR_BACKEND, DEEPFACE_MODEL_NAME
def test_face_detection():
"""Test 1: Face detection with DeepFace"""
print("\n" + "="*60)
print("Test 1: DeepFace Face Detection")
print("="*60)
try:
db = DatabaseManager(":memory:", verbose=0) # In-memory database for testing
processor = FaceProcessor(db, verbose=1)
# Test with a sample image
test_image = "demo_photos/2019-11-22_0011.jpg"
if not os.path.exists(test_image):
print(f"❌ Test image not found: {test_image}")
print(" Please ensure demo photos are available")
return False
print(f"Testing with image: {test_image}")
# Add photo to database
photo_id = db.add_photo(test_image, Path(test_image).name, None)
print(f"✓ Added photo to database (ID: {photo_id})")
# Process faces
count = processor.process_faces(limit=1)
print(f"✓ Processed {count} photos")
# Verify results
stats = db.get_statistics()
print(f"✓ Found {stats['total_faces']} faces in the photo")
if stats['total_faces'] == 0:
print("❌ FAIL: No faces detected")
return False
# Verify face encodings are 512-dimensional (ArcFace)
with db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT encoding FROM faces LIMIT 1")
encoding_blob = cursor.fetchone()[0]
encoding_size = len(encoding_blob)
expected_size = 512 * 8 # 512 floats * 8 bytes per float
print(f"✓ Encoding size: {encoding_size} bytes (expected: {expected_size})")
if encoding_size != expected_size:
print(f"❌ FAIL: Wrong encoding size (expected {expected_size}, got {encoding_size})")
return False
print("\n✅ PASS: Face detection working correctly")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_face_matching():
"""Test 2: Face matching with DeepFace"""
print("\n" + "="*60)
print("Test 2: DeepFace Face Matching")
print("="*60)
try:
db = DatabaseManager(":memory:", verbose=0)
processor = FaceProcessor(db, verbose=1)
# Test with multiple images
test_images = [
"demo_photos/2019-11-22_0011.jpg",
"demo_photos/2019-11-22_0012.jpg"
]
# Check if test images exist
available_images = [img for img in test_images if os.path.exists(img)]
if len(available_images) < 2:
print(f"⚠️ Only {len(available_images)} test images available")
print(" Skipping face matching test (need at least 2 images)")
return True # Skip but don't fail
print(f"Testing with {len(available_images)} images")
# Add photos to database
for img in available_images:
photo_id = db.add_photo(img, Path(img).name, None)
print(f"✓ Added {Path(img).name} (ID: {photo_id})")
# Process all faces
count = processor.process_faces(limit=10)
print(f"✓ Processed {count} photos")
# Get statistics
stats = db.get_statistics()
print(f"✓ Found {stats['total_faces']} total faces")
if stats['total_faces'] < 2:
print("⚠️ Not enough faces for matching test")
return True # Skip but don't fail
# Find similar faces
faces = db.get_all_face_encodings()
if len(faces) >= 2:
face_id = faces[0][0]
print(f"✓ Testing similarity for face ID {face_id}")
matches = processor.find_similar_faces(face_id, tolerance=0.4)
print(f"✓ Found {len(matches)} similar faces (within tolerance)")
# Display match details
if matches:
for i, match in enumerate(matches[:3], 1): # Show top 3 matches
confidence_pct = (1 - match['distance']) * 100
print(f" Match {i}: Face {match['face_id']}, Confidence: {confidence_pct:.1f}%")
print("\n✅ PASS: Face matching working correctly")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_deepface_metadata():
"""Test 3: DeepFace metadata storage and retrieval"""
print("\n" + "="*60)
print("Test 3: DeepFace Metadata Storage")
print("="*60)
try:
db = DatabaseManager(":memory:", verbose=0)
processor = FaceProcessor(db, verbose=1)
# Test with a sample image
test_image = "demo_photos/2019-11-22_0011.jpg"
if not os.path.exists(test_image):
print(f"⚠️ Test image not found: {test_image}")
return True # Skip but don't fail
# Add photo and process
photo_id = db.add_photo(test_image, Path(test_image).name, None)
processor.process_faces(limit=1)
# Query face metadata
with db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT face_confidence, quality_score, detector_backend, model_name
FROM faces
LIMIT 1
""")
result = cursor.fetchone()
if not result:
print("❌ FAIL: No face metadata found")
return False
face_conf, quality, detector, model = result
print(f"✓ Face Confidence: {face_conf}")
print(f"✓ Quality Score: {quality}")
print(f"✓ Detector Backend: {detector}")
print(f"✓ Model Name: {model}")
# Verify metadata is present
if detector is None:
print("❌ FAIL: Detector backend not stored")
return False
if model is None:
print("❌ FAIL: Model name not stored")
return False
# Verify detector matches configuration
if detector != DEEPFACE_DETECTOR_BACKEND:
print(f"⚠️ Warning: Detector mismatch (expected {DEEPFACE_DETECTOR_BACKEND}, got {detector})")
# Verify model matches configuration
if model != DEEPFACE_MODEL_NAME:
print(f"⚠️ Warning: Model mismatch (expected {DEEPFACE_MODEL_NAME}, got {model})")
print("\n✅ PASS: DeepFace metadata stored correctly")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_configuration():
"""Test 4: FaceProcessor configuration with different backends"""
print("\n" + "="*60)
print("Test 4: FaceProcessor Configuration")
print("="*60)
try:
db = DatabaseManager(":memory:", verbose=0)
# Test default configuration
processor_default = FaceProcessor(db, verbose=0)
print(f"✓ Default detector: {processor_default.detector_backend}")
print(f"✓ Default model: {processor_default.model_name}")
if processor_default.detector_backend != DEEPFACE_DETECTOR_BACKEND:
print(f"❌ FAIL: Default detector mismatch")
return False
if processor_default.model_name != DEEPFACE_MODEL_NAME:
print(f"❌ FAIL: Default model mismatch")
return False
# Test custom configuration
custom_configs = [
('mtcnn', 'Facenet512'),
('opencv', 'VGG-Face'),
('ssd', 'ArcFace'),
]
for detector, model in custom_configs:
processor = FaceProcessor(db, verbose=0,
detector_backend=detector,
model_name=model)
print(f"✓ Custom config: {detector}/{model}")
if processor.detector_backend != detector:
print(f"❌ FAIL: Custom detector not applied")
return False
if processor.model_name != model:
print(f"❌ FAIL: Custom model not applied")
return False
print("\n✅ PASS: FaceProcessor configuration working correctly")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_cosine_similarity():
"""Test 5: Cosine similarity calculation"""
print("\n" + "="*60)
print("Test 5: Cosine Similarity Calculation")
print("="*60)
try:
import numpy as np
db = DatabaseManager(":memory:", verbose=0)
processor = FaceProcessor(db, verbose=0)
# Test with identical encodings
encoding1 = np.random.rand(512).astype(np.float64)
encoding2 = encoding1.copy()
distance = processor._calculate_cosine_similarity(encoding1, encoding2)
print(f"✓ Identical encodings distance: {distance:.6f}")
if distance > 0.01: # Should be very close to 0
print(f"❌ FAIL: Identical encodings should have distance near 0")
return False
# Test with different encodings
encoding3 = np.random.rand(512).astype(np.float64)
distance2 = processor._calculate_cosine_similarity(encoding1, encoding3)
print(f"✓ Different encodings distance: {distance2:.6f}")
if distance2 < 0.1: # Should be significantly different
print(f"⚠️ Warning: Random encodings have low distance (might be coincidence)")
# Test with mismatched lengths
encoding4 = np.random.rand(128).astype(np.float64)
distance3 = processor._calculate_cosine_similarity(encoding1, encoding4)
print(f"✓ Mismatched lengths distance: {distance3:.6f}")
if distance3 != 2.0: # Should return max distance
print(f"❌ FAIL: Mismatched lengths should return 2.0")
return False
print("\n✅ PASS: Cosine similarity calculation working correctly")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_database_schema():
"""Test 6: Database schema validation"""
print("\n" + "="*60)
print("Test 6: Database Schema Validation")
print("="*60)
try:
db = DatabaseManager(":memory:", verbose=0)
# Check if new DeepFace columns exist
with db.get_db_connection() as conn:
cursor = conn.cursor()
# Get faces table schema
cursor.execute("PRAGMA table_info(faces)")
columns = {row[1]: row[2] for row in cursor.fetchall()}
print("✓ Faces table columns:")
for col_name in columns:
print(f" - {col_name}")
# Verify DeepFace columns
required_columns = {
'detector_backend': 'TEXT',
'model_name': 'TEXT',
'face_confidence': 'REAL'
}
for col, dtype in required_columns.items():
if col not in columns:
print(f"❌ FAIL: Missing column '{col}' in faces table")
return False
print(f"✓ Column '{col}' exists with type {columns[col]}")
# Check person_encodings table
cursor.execute("PRAGMA table_info(person_encodings)")
pe_columns = {row[1]: row[2] for row in cursor.fetchall()}
print("\n✓ Person_encodings table columns:")
for col_name in pe_columns:
print(f" - {col_name}")
# Verify DeepFace columns in person_encodings
pe_required = {
'detector_backend': 'TEXT',
'model_name': 'TEXT',
}
for col, dtype in pe_required.items():
if col not in pe_columns:
print(f"❌ FAIL: Missing column '{col}' in person_encodings table")
return False
print(f"✓ Column '{col}' exists in person_encodings")
print("\n✅ PASS: Database schema is correct")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_face_location_format():
"""Test 7: Face location format validation"""
print("\n" + "="*60)
print("Test 7: Face Location Format")
print("="*60)
try:
import ast
db = DatabaseManager(":memory:", verbose=0)
processor = FaceProcessor(db, verbose=1)
# Test with a sample image
test_image = "demo_photos/2019-11-22_0011.jpg"
if not os.path.exists(test_image):
print(f"⚠️ Test image not found: {test_image}")
return True # Skip but don't fail
# Add photo and process
photo_id = db.add_photo(test_image, Path(test_image).name, None)
processor.process_faces(limit=1)
# Check face location format
with db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT location FROM faces LIMIT 1")
result = cursor.fetchone()
if not result:
print("⚠️ No faces found")
return True
location_str = result[0]
print(f"✓ Raw location: {location_str}")
# Parse location
try:
location = ast.literal_eval(location_str)
print(f"✓ Parsed location: {location}")
# Check if it's DeepFace format (dict with x, y, w, h)
if isinstance(location, dict):
required_keys = ['x', 'y', 'w', 'h']
for key in required_keys:
if key not in location:
print(f"❌ FAIL: Missing key '{key}' in location dict")
return False
print("✓ Location is in DeepFace dict format {x, y, w, h}")
else:
print(f"❌ FAIL: Location is not a dict, got {type(location)}")
return False
except Exception as e:
print(f"❌ FAIL: Could not parse location: {e}")
return False
print("\n✅ PASS: Face location format is correct")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_performance_benchmark():
"""Test 8: Performance benchmarking"""
print("\n" + "="*60)
print("Test 8: Performance Benchmark")
print("="*60)
try:
import time
db = DatabaseManager(":memory:", verbose=0)
processor = FaceProcessor(db, verbose=0)
# Test with multiple images
test_images = [
"demo_photos/2019-11-22_0011.jpg",
"demo_photos/2019-11-22_0012.jpg",
"demo_photos/2019-11-22_0015.jpg",
]
available_images = [img for img in test_images if os.path.exists(img)]
if not available_images:
print("⚠️ No test images available")
return True # Skip but don't fail
print(f"Testing with {len(available_images)} images")
# Add photos to database
for img in available_images:
db.add_photo(img, Path(img).name, None)
# Benchmark face detection
start_time = time.time()
count = processor.process_faces(limit=len(available_images))
detection_time = time.time() - start_time
print(f"✓ Processed {count} photos in {detection_time:.2f}s")
print(f"✓ Average time per photo: {detection_time/max(count, 1):.2f}s")
# Get statistics
stats = db.get_statistics()
total_faces = stats['total_faces']
print(f"✓ Found {total_faces} total faces")
if total_faces > 0:
print(f"✓ Average time per face: {detection_time/total_faces:.2f}s")
# Benchmark similarity calculation
if total_faces >= 2:
faces = db.get_all_face_encodings()
face_id = faces[0][0]
start_time = time.time()
matches = processor.find_similar_faces(face_id, tolerance=0.4)
matching_time = time.time() - start_time
print(f"✓ Similarity search completed in {matching_time:.2f}s")
print(f"✓ Found {len(matches)} matches")
print("\n✅ PASS: Performance benchmark completed")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_adaptive_tolerance():
"""Test 9: Adaptive tolerance calculation"""
print("\n" + "="*60)
print("Test 9: Adaptive Tolerance")
print("="*60)
try:
db = DatabaseManager(":memory:", verbose=0)
processor = FaceProcessor(db, verbose=0)
# Test with different quality scores
base_tolerance = 0.4
test_cases = [
(0.1, "Low quality"),
(0.5, "Medium quality"),
(0.9, "High quality"),
]
print(f"Base tolerance: {base_tolerance}")
for quality, desc in test_cases:
tolerance = processor._calculate_adaptive_tolerance(base_tolerance, quality)
print(f"{desc} ({quality:.1f}): tolerance = {tolerance:.3f}")
# Verify tolerance is within bounds
if tolerance < 0.2 or tolerance > 0.6:
print(f"❌ FAIL: Tolerance {tolerance} out of bounds [0.2, 0.6]")
return False
# Test with match confidence
tolerance_with_conf = processor._calculate_adaptive_tolerance(
base_tolerance, 0.7, match_confidence=0.8
)
print(f"✓ With match confidence: tolerance = {tolerance_with_conf:.3f}")
print("\n✅ PASS: Adaptive tolerance working correctly")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_multiple_detectors():
"""Test 10: Multiple detector backends"""
print("\n" + "="*60)
print("Test 10: Multiple Detector Backends")
print("="*60)
try:
# Test different detector backends
detectors = ['opencv', 'ssd'] # Skip retinaface and mtcnn for speed
test_image = "demo_photos/2019-11-22_0011.jpg"
if not os.path.exists(test_image):
print("⚠️ Test image not found")
return True # Skip but don't fail
results = {}
for detector in detectors:
print(f"\n Testing with {detector} detector:")
try:
db = DatabaseManager(":memory:", verbose=0)
processor = FaceProcessor(db, verbose=0,
detector_backend=detector,
model_name='ArcFace')
photo_id = db.add_photo(test_image, Path(test_image).name, None)
count = processor.process_faces(limit=1)
stats = db.get_statistics()
faces_found = stats['total_faces']
results[detector] = faces_found
print(f"{detector}: Found {faces_found} faces")
except Exception as e:
print(f"⚠️ {detector} failed: {e}")
results[detector] = 0
# Verify at least one detector worked
if sum(results.values()) == 0:
print("\n❌ FAIL: No detectors found any faces")
return False
print("\n✅ PASS: Multiple detectors tested")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def run_all_tests():
"""Run all DeepFace integration tests"""
print("\n" + "="*70)
print("DEEPFACE INTEGRATION TEST SUITE - PHASE 6")
print("="*70)
print()
print("Testing complete DeepFace integration in PunimTag")
print("This comprehensive test suite validates all aspects of the migration")
print()
tests = [
("Face Detection", test_face_detection),
("Face Matching", test_face_matching),
("Metadata Storage", test_deepface_metadata),
("Configuration", test_configuration),
("Cosine Similarity", test_cosine_similarity),
("Database Schema", test_database_schema),
("Face Location Format", test_face_location_format),
("Performance Benchmark", test_performance_benchmark),
("Adaptive Tolerance", test_adaptive_tolerance),
("Multiple Detectors", test_multiple_detectors),
]
results = []
for test_name, test_func in tests:
try:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f"\n❌ Test '{test_name}' crashed: {e}")
import traceback
traceback.print_exc()
results.append((test_name, False))
# Print summary
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
passed = 0
failed = 0
for test_name, result in results:
status = "✅ PASS" if result else "❌ FAIL"
print(f"{status}: {test_name}")
if result:
passed += 1
else:
failed += 1
print("="*70)
print(f"Tests passed: {passed}/{len(tests)}")
print(f"Tests failed: {failed}/{len(tests)}")
print("="*70)
if failed == 0:
print("\n🎉 ALL TESTS PASSED! DeepFace integration is working correctly!")
return 0
else:
print(f"\n⚠️ {failed} test(s) failed. Please review the errors above.")
return 1
if __name__ == "__main__":
sys.exit(run_all_tests())

View File

@ -1,329 +0,0 @@
#!/usr/bin/env python3
"""
Test Phase 1: Database Schema Updates for DeepFace Migration
This test verifies that:
1. Database schema includes new DeepFace columns
2. Method signatures accept new parameters
3. Data can be inserted with DeepFace-specific fields
"""
import os
import sys
import sqlite3
import tempfile
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.core.database import DatabaseManager
from src.core.config import (
DEEPFACE_DETECTOR_BACKEND,
DEEPFACE_MODEL_NAME,
DEFAULT_FACE_TOLERANCE,
DEEPFACE_SIMILARITY_THRESHOLD
)
def test_schema_has_deepface_columns():
"""Test that database schema includes DeepFace columns"""
print("\n🧪 Test 1: Verify schema has DeepFace columns")
# Create temporary database
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp:
tmp_db_path = tmp.name
try:
# Initialize database
db = DatabaseManager(tmp_db_path, verbose=0)
# Connect and check schema
conn = sqlite3.connect(tmp_db_path)
cursor = conn.cursor()
# Check faces table
cursor.execute("PRAGMA table_info(faces)")
faces_columns = {row[1]: row[2] for row in cursor.fetchall()}
required_columns = {
'detector_backend': 'TEXT',
'model_name': 'TEXT',
'face_confidence': 'REAL'
}
print(" Checking 'faces' table columns:")
for col_name, col_type in required_columns.items():
if col_name in faces_columns:
print(f"{col_name} ({faces_columns[col_name]})")
else:
print(f"{col_name} - MISSING!")
return False
# Check person_encodings table
cursor.execute("PRAGMA table_info(person_encodings)")
pe_columns = {row[1]: row[2] for row in cursor.fetchall()}
required_pe_columns = {
'detector_backend': 'TEXT',
'model_name': 'TEXT'
}
print(" Checking 'person_encodings' table columns:")
for col_name, col_type in required_pe_columns.items():
if col_name in pe_columns:
print(f"{col_name} ({pe_columns[col_name]})")
else:
print(f"{col_name} - MISSING!")
return False
conn.close()
print(" ✅ All schema columns present")
return True
finally:
# Cleanup
if os.path.exists(tmp_db_path):
os.unlink(tmp_db_path)
def test_add_face_with_deepface_params():
"""Test that add_face() accepts DeepFace parameters"""
print("\n🧪 Test 2: Test add_face() with DeepFace parameters")
# Create temporary database
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp:
tmp_db_path = tmp.name
try:
# Initialize database
db = DatabaseManager(tmp_db_path, verbose=0)
# Add a test photo
photo_id = db.add_photo(
photo_path="/test/photo.jpg",
filename="photo.jpg",
date_taken="2025-10-16"
)
if not photo_id:
print(" ❌ Failed to add photo")
return False
print(f" ✓ Added test photo (ID: {photo_id})")
# Create dummy 512-dimensional encoding (ArcFace)
import numpy as np
dummy_encoding = np.random.rand(512).astype(np.float64)
encoding_bytes = dummy_encoding.tobytes()
# Add face with DeepFace parameters
face_id = db.add_face(
photo_id=photo_id,
encoding=encoding_bytes,
location="{'x': 100, 'y': 150, 'w': 200, 'h': 200}",
confidence=0.0,
quality_score=0.85,
person_id=None,
detector_backend='retinaface',
model_name='ArcFace',
face_confidence=0.99
)
if not face_id:
print(" ❌ Failed to add face")
return False
print(f" ✓ Added face with DeepFace params (ID: {face_id})")
# Verify data was stored correctly
conn = sqlite3.connect(tmp_db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT detector_backend, model_name, face_confidence, quality_score
FROM faces WHERE id = ?
''', (face_id,))
result = cursor.fetchone()
conn.close()
if not result:
print(" ❌ Face data not found in database")
return False
detector, model, face_conf, quality = result
print(f" ✓ Verified stored data:")
print(f" - detector_backend: {detector}")
print(f" - model_name: {model}")
print(f" - face_confidence: {face_conf}")
print(f" - quality_score: {quality}")
if detector != 'retinaface' or model != 'ArcFace' or face_conf != 0.99:
print(" ❌ Stored data doesn't match input")
return False
print(" ✅ add_face() works with DeepFace parameters")
return True
finally:
# Cleanup
if os.path.exists(tmp_db_path):
os.unlink(tmp_db_path)
def test_add_person_encoding_with_deepface_params():
"""Test that add_person_encoding() accepts DeepFace parameters"""
print("\n🧪 Test 3: Test add_person_encoding() with DeepFace parameters")
# Create temporary database
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp:
tmp_db_path = tmp.name
try:
# Initialize database
db = DatabaseManager(tmp_db_path, verbose=0)
# Add a test person
person_id = db.add_person(
first_name="Test",
last_name="Person",
middle_name="",
maiden_name="",
date_of_birth=""
)
print(f" ✓ Added test person (ID: {person_id})")
# Add a test photo and face
photo_id = db.add_photo("/test/photo.jpg", "photo.jpg")
import numpy as np
dummy_encoding = np.random.rand(512).astype(np.float64)
encoding_bytes = dummy_encoding.tobytes()
face_id = db.add_face(
photo_id=photo_id,
encoding=encoding_bytes,
location="{'x': 100, 'y': 150, 'w': 200, 'h': 200}",
quality_score=0.85,
detector_backend='retinaface',
model_name='ArcFace'
)
print(f" ✓ Added test face (ID: {face_id})")
# Add person encoding with DeepFace parameters
db.add_person_encoding(
person_id=person_id,
face_id=face_id,
encoding=encoding_bytes,
quality_score=0.85,
detector_backend='retinaface',
model_name='ArcFace'
)
# Verify data was stored
conn = sqlite3.connect(tmp_db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT detector_backend, model_name, quality_score
FROM person_encodings WHERE person_id = ? AND face_id = ?
''', (person_id, face_id))
result = cursor.fetchone()
conn.close()
if not result:
print(" ❌ Person encoding not found in database")
return False
detector, model, quality = result
print(f" ✓ Verified stored data:")
print(f" - detector_backend: {detector}")
print(f" - model_name: {model}")
print(f" - quality_score: {quality}")
if detector != 'retinaface' or model != 'ArcFace':
print(" ❌ Stored data doesn't match input")
return False
print(" ✅ add_person_encoding() works with DeepFace parameters")
return True
finally:
# Cleanup
if os.path.exists(tmp_db_path):
os.unlink(tmp_db_path)
def test_config_constants():
"""Test that config.py has DeepFace constants"""
print("\n🧪 Test 4: Verify DeepFace configuration constants")
print(f" ✓ DEEPFACE_DETECTOR_BACKEND = {DEEPFACE_DETECTOR_BACKEND}")
print(f" ✓ DEEPFACE_MODEL_NAME = {DEEPFACE_MODEL_NAME}")
print(f" ✓ DEFAULT_FACE_TOLERANCE = {DEFAULT_FACE_TOLERANCE}")
print(f" ✓ DEEPFACE_SIMILARITY_THRESHOLD = {DEEPFACE_SIMILARITY_THRESHOLD}")
if DEEPFACE_DETECTOR_BACKEND != 'retinaface':
print(f" ⚠️ Warning: Expected detector 'retinaface', got '{DEEPFACE_DETECTOR_BACKEND}'")
if DEEPFACE_MODEL_NAME != 'ArcFace':
print(f" ⚠️ Warning: Expected model 'ArcFace', got '{DEEPFACE_MODEL_NAME}'")
if DEFAULT_FACE_TOLERANCE != 0.4:
print(f" ⚠️ Warning: Expected tolerance 0.4, got {DEFAULT_FACE_TOLERANCE}")
print(" ✅ Configuration constants loaded")
return True
def run_all_tests():
"""Run all Phase 1 tests"""
print("=" * 70)
print("Phase 1 Schema Tests - DeepFace Migration")
print("=" * 70)
tests = [
("Schema Columns", test_schema_has_deepface_columns),
("add_face() Method", test_add_face_with_deepface_params),
("add_person_encoding() Method", test_add_person_encoding_with_deepface_params),
("Config Constants", test_config_constants)
]
results = []
for test_name, test_func in tests:
try:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f" ❌ Test failed with exception: {e}")
import traceback
traceback.print_exc()
results.append((test_name, False))
print("\n" + "=" * 70)
print("Test Results Summary")
print("=" * 70)
for test_name, result in results:
status = "✅ PASS" if result else "❌ FAIL"
print(f" {status}: {test_name}")
passed = sum(1 for _, result in results if result)
total = len(results)
print()
print(f"Tests passed: {passed}/{total}")
print("=" * 70)
return all(result for _, result in results)
if __name__ == "__main__":
success = run_all_tests()
sys.exit(0 if success else 1)

View File

@ -1,232 +0,0 @@
#!/usr/bin/env python3
"""
Test Phase 2: Configuration Updates for DeepFace Migration
This test verifies that:
1. TensorFlow suppression is in place
2. FaceProcessor accepts detector_backend and model_name
3. Configuration constants are accessible
4. Entry points properly suppress warnings
"""
import os
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
def test_tensorflow_suppression():
"""Test that TensorFlow warnings are suppressed"""
print("\n🧪 Test 1: Verify TensorFlow suppression in config")
# Import config which sets the environment variable
from src.core import config
# Check environment variable is set (config.py sets it on import)
tf_log_level = os.environ.get('TF_CPP_MIN_LOG_LEVEL')
if tf_log_level == '3':
print(" ✓ TF_CPP_MIN_LOG_LEVEL = 3 (suppressed by config.py)")
print(" ✓ Entry points also set this before imports")
return True
else:
print(f" ❌ TF_CPP_MIN_LOG_LEVEL = {tf_log_level} (expected '3')")
return False
def test_faceprocessor_initialization():
"""Test that FaceProcessor accepts DeepFace parameters"""
print("\n🧪 Test 2: Test FaceProcessor with DeepFace parameters")
import tempfile
from src.core.database import DatabaseManager
from src.core.face_processing import FaceProcessor
try:
# Create temporary database
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp:
tmp_db_path = tmp.name
# Initialize database and face processor
db = DatabaseManager(tmp_db_path, verbose=0)
# Test with custom detector and model
processor = FaceProcessor(
db,
verbose=0,
detector_backend='mtcnn',
model_name='Facenet'
)
print(f" ✓ FaceProcessor initialized")
print(f" - detector_backend: {processor.detector_backend}")
print(f" - model_name: {processor.model_name}")
if processor.detector_backend != 'mtcnn':
print(" ❌ Detector backend not set correctly")
return False
if processor.model_name != 'Facenet':
print(" ❌ Model name not set correctly")
return False
# Test with defaults
processor2 = FaceProcessor(db, verbose=0)
print(f" ✓ FaceProcessor with defaults:")
print(f" - detector_backend: {processor2.detector_backend}")
print(f" - model_name: {processor2.model_name}")
# Cleanup
if os.path.exists(tmp_db_path):
os.unlink(tmp_db_path)
print(" ✅ FaceProcessor accepts and uses DeepFace parameters")
return True
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
def test_config_imports():
"""Test that all DeepFace config constants can be imported"""
print("\n🧪 Test 3: Test configuration imports")
try:
from src.core.config import (
DEEPFACE_DETECTOR_BACKEND,
DEEPFACE_MODEL_NAME,
DEEPFACE_DETECTOR_OPTIONS,
DEEPFACE_MODEL_OPTIONS,
DEEPFACE_DISTANCE_METRIC,
DEEPFACE_ENFORCE_DETECTION,
DEEPFACE_ALIGN_FACES,
DEEPFACE_SIMILARITY_THRESHOLD
)
print(" ✓ All DeepFace config constants imported:")
print(f" - DEEPFACE_DETECTOR_BACKEND = {DEEPFACE_DETECTOR_BACKEND}")
print(f" - DEEPFACE_MODEL_NAME = {DEEPFACE_MODEL_NAME}")
print(f" - DEEPFACE_DETECTOR_OPTIONS = {DEEPFACE_DETECTOR_OPTIONS}")
print(f" - DEEPFACE_MODEL_OPTIONS = {DEEPFACE_MODEL_OPTIONS}")
print(f" - DEEPFACE_DISTANCE_METRIC = {DEEPFACE_DISTANCE_METRIC}")
print(f" - DEEPFACE_ENFORCE_DETECTION = {DEEPFACE_ENFORCE_DETECTION}")
print(f" - DEEPFACE_ALIGN_FACES = {DEEPFACE_ALIGN_FACES}")
print(f" - DEEPFACE_SIMILARITY_THRESHOLD = {DEEPFACE_SIMILARITY_THRESHOLD}")
print(" ✅ All configuration constants accessible")
return True
except ImportError as e:
print(f" ❌ Failed to import config: {e}")
return False
def test_entry_point_imports():
"""Test that main entry points can be imported without errors"""
print("\n🧪 Test 4: Test entry point imports (with TF suppression)")
try:
# Desktop GUI has been archived - skip this test
print(" ⚠️ Desktop GUI entry points have been archived")
print(" ⚠️ Skipping desktop entry point import test")
print(" ✓ Web version entry points are available via API")
print(" ✅ Entry point test skipped (desktop archived)")
return True
except Exception as e:
print(f" ❌ Import error: {e}")
import traceback
traceback.print_exc()
return False
def test_gui_config_constants():
"""Test that GUI can access DeepFace options"""
print("\n🧪 Test 5: Test GUI access to DeepFace options")
try:
from src.core.config import DEEPFACE_DETECTOR_OPTIONS, DEEPFACE_MODEL_OPTIONS
# Verify options are lists
if not isinstance(DEEPFACE_DETECTOR_OPTIONS, list):
print(" ❌ DEEPFACE_DETECTOR_OPTIONS is not a list")
return False
if not isinstance(DEEPFACE_MODEL_OPTIONS, list):
print(" ❌ DEEPFACE_MODEL_OPTIONS is not a list")
return False
print(f" ✓ Detector options ({len(DEEPFACE_DETECTOR_OPTIONS)}): {DEEPFACE_DETECTOR_OPTIONS}")
print(f" ✓ Model options ({len(DEEPFACE_MODEL_OPTIONS)}): {DEEPFACE_MODEL_OPTIONS}")
# Verify expected values
expected_detectors = ["retinaface", "mtcnn", "opencv", "ssd"]
expected_models = ["ArcFace", "Facenet", "Facenet512", "VGG-Face"]
if set(DEEPFACE_DETECTOR_OPTIONS) != set(expected_detectors):
print(f" ⚠️ Detector options don't match expected: {expected_detectors}")
if set(DEEPFACE_MODEL_OPTIONS) != set(expected_models):
print(f" ⚠️ Model options don't match expected: {expected_models}")
print(" ✅ GUI can access DeepFace options for dropdowns")
return True
except Exception as e:
print(f" ❌ Error: {e}")
return False
def run_all_tests():
"""Run all Phase 2 tests"""
print("=" * 70)
print("Phase 2 Configuration Tests - DeepFace Migration")
print("=" * 70)
tests = [
("TensorFlow Suppression", test_tensorflow_suppression),
("FaceProcessor Initialization", test_faceprocessor_initialization),
("Config Imports", test_config_imports),
("Entry Point Imports", test_entry_point_imports),
("GUI Config Constants", test_gui_config_constants)
]
results = []
for test_name, test_func in tests:
try:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f" ❌ Test failed with exception: {e}")
import traceback
traceback.print_exc()
results.append((test_name, False))
print("\n" + "=" * 70)
print("Test Results Summary")
print("=" * 70)
for test_name, result in results:
status = "✅ PASS" if result else "❌ FAIL"
print(f" {status}: {test_name}")
passed = sum(1 for _, result in results if result)
total = len(results)
print()
print(f"Tests passed: {passed}/{total}")
print("=" * 70)
return all(result for _, result in results)
if __name__ == "__main__":
success = run_all_tests()
sys.exit(0 if success else 1)

View File

@ -1,326 +0,0 @@
#!/usr/bin/env python3
"""
Test Phase 3: Core Face Processing with DeepFace
This test verifies that:
1. DeepFace can be imported and used
2. Face detection works with DeepFace
3. Face encodings are 512-dimensional (ArcFace)
4. Cosine similarity calculation works
5. Location format handling works (dict vs tuple)
6. Full end-to-end processing works
"""
import os
import sys
import tempfile
import numpy as np
from pathlib import Path
# Suppress TensorFlow warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import warnings
warnings.filterwarnings('ignore')
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
def test_deepface_import():
"""Test that DeepFace can be imported"""
print("\n🧪 Test 1: DeepFace Import")
try:
from deepface import DeepFace
print(f" ✓ DeepFace imported successfully")
print(f" ✓ Version: {DeepFace.__version__ if hasattr(DeepFace, '__version__') else 'unknown'}")
return True
except ImportError as e:
print(f" ❌ Failed to import DeepFace: {e}")
return False
def test_deepface_detection():
"""Test DeepFace face detection"""
print("\n🧪 Test 2: DeepFace Face Detection")
try:
from deepface import DeepFace
# Check for test images
test_folder = Path("demo_photos/testdeepface")
if not test_folder.exists():
test_folder = Path("demo_photos")
test_images = list(test_folder.glob("*.jpg")) + list(test_folder.glob("*.JPG"))
if not test_images:
print(" ⚠️ No test images found, skipping")
return True
test_image = str(test_images[0])
print(f" Testing with: {Path(test_image).name}")
# Try to detect faces
results = DeepFace.represent(
img_path=test_image,
model_name='ArcFace',
detector_backend='retinaface',
enforce_detection=False,
align=True
)
if results:
print(f" ✓ Found {len(results)} face(s)")
# Check encoding dimensions
encoding = np.array(results[0]['embedding'])
print(f" ✓ Encoding shape: {encoding.shape}")
if len(encoding) == 512:
print(f" ✓ Correct encoding size (512-dimensional for ArcFace)")
else:
print(f" ⚠️ Unexpected encoding size: {len(encoding)}")
# Check facial_area format
facial_area = results[0].get('facial_area', {})
print(f" ✓ Facial area: {facial_area}")
if all(k in facial_area for k in ['x', 'y', 'w', 'h']):
print(f" ✓ Correct facial area format (x, y, w, h)")
else:
print(f" ⚠️ Unexpected facial area format")
return True
else:
print(f" ⚠️ No faces detected (image may have no faces)")
return True # Not a failure, just no faces
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
def test_cosine_similarity():
"""Test cosine similarity calculation"""
print("\n🧪 Test 3: Cosine Similarity Calculation")
try:
from src.core.database import DatabaseManager
from src.core.face_processing import FaceProcessor
# Create temporary database
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp:
tmp_db_path = tmp.name
db = DatabaseManager(tmp_db_path, verbose=0)
processor = FaceProcessor(db, verbose=0)
# Test with identical encodings
enc1 = np.random.rand(512)
distance_identical = processor._calculate_cosine_similarity(enc1, enc1)
print(f" ✓ Identical encodings distance: {distance_identical:.6f}")
if distance_identical < 0.01: # Should be very close to 0
print(f" ✓ Identical encodings produce near-zero distance")
else:
print(f" ⚠️ Identical encodings distance higher than expected")
# Test with different encodings
enc2 = np.random.rand(512)
distance_different = processor._calculate_cosine_similarity(enc1, enc2)
print(f" ✓ Different encodings distance: {distance_different:.6f}")
if 0 < distance_different < 2: # Should be in valid range
print(f" ✓ Different encodings produce valid distance")
else:
print(f" ⚠️ Distance out of expected range [0, 2]")
# Test with length mismatch
enc3 = np.random.rand(128) # Different length
distance_mismatch = processor._calculate_cosine_similarity(enc1, enc3)
print(f" ✓ Mismatched length distance: {distance_mismatch:.6f}")
if distance_mismatch == 2.0: # Should return max distance
print(f" ✓ Mismatched lengths handled correctly")
else:
print(f" ⚠️ Mismatch handling unexpected")
# Cleanup
if os.path.exists(tmp_db_path):
os.unlink(tmp_db_path)
print(" ✅ Cosine similarity calculation works correctly")
return True
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
def test_location_format_handling():
"""Test handling of both dict and tuple location formats"""
print("\n🧪 Test 4: Location Format Handling")
try:
# Test dict format (DeepFace)
location_dict = {'x': 100, 'y': 150, 'w': 200, 'h': 200}
location_str_dict = str(location_dict)
import ast
parsed_dict = ast.literal_eval(location_str_dict)
if isinstance(parsed_dict, dict):
left = parsed_dict.get('x', 0)
top = parsed_dict.get('y', 0)
width = parsed_dict.get('w', 0)
height = parsed_dict.get('h', 0)
right = left + width
bottom = top + height
print(f" ✓ Dict format parsed: {location_dict}")
print(f" ✓ Converted to box: top={top}, right={right}, bottom={bottom}, left={left}")
if (left == 100 and top == 150 and right == 300 and bottom == 350):
print(f" ✓ Dict conversion correct")
else:
print(f" ❌ Dict conversion incorrect")
return False
# Legacy tuple format tests removed - only DeepFace format supported
print(" ✅ DeepFace location format handled correctly")
return True
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
def test_end_to_end_processing():
"""Test end-to-end face processing with DeepFace"""
print("\n🧪 Test 5: End-to-End Processing")
try:
from src.core.database import DatabaseManager
from src.core.face_processing import FaceProcessor
# Check for test images
test_folder = Path("demo_photos/testdeepface")
if not test_folder.exists():
test_folder = Path("demo_photos")
test_images = list(test_folder.glob("*.jpg")) + list(test_folder.glob("*.JPG"))
if not test_images:
print(" ⚠️ No test images found, skipping")
return True
# Create temporary database
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp:
tmp_db_path = tmp.name
db = DatabaseManager(tmp_db_path, verbose=0)
processor = FaceProcessor(db, verbose=1,
detector_backend='retinaface',
model_name='ArcFace')
# Add a test photo
test_image = str(test_images[0])
photo_id = db.add_photo(test_image, Path(test_image).name, None)
if not photo_id:
print(f" ❌ Failed to add photo")
return False
print(f" ✓ Added test photo (ID: {photo_id})")
# Process faces
print(f" Processing faces...")
count = processor.process_faces(limit=1)
print(f" ✓ Processed {count} photo(s)")
# Verify results
stats = db.get_statistics()
print(f" ✓ Statistics: {stats['total_faces']} faces found")
if stats['total_faces'] > 0:
# Check encoding size
faces = db.get_all_face_encodings()
if faces:
face_id, encoding_bytes, person_id, quality = faces[0]
encoding = np.frombuffer(encoding_bytes, dtype=np.float64)
print(f" ✓ Encoding size: {len(encoding)} dimensions")
if len(encoding) == 512:
print(f" ✅ Correct encoding size (512-dim ArcFace)")
else:
print(f" ⚠️ Unexpected encoding size: {len(encoding)}")
# Cleanup
if os.path.exists(tmp_db_path):
os.unlink(tmp_db_path)
print(" ✅ End-to-end processing successful")
return True
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
def run_all_tests():
"""Run all Phase 3 tests"""
print("=" * 70)
print("Phase 3 DeepFace Integration Tests")
print("=" * 70)
tests = [
("DeepFace Import", test_deepface_import),
("DeepFace Detection", test_deepface_detection),
("Cosine Similarity", test_cosine_similarity),
("Location Format Handling", test_location_format_handling),
("End-to-End Processing", test_end_to_end_processing)
]
results = []
for test_name, test_func in tests:
try:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f" ❌ Test failed with exception: {e}")
import traceback
traceback.print_exc()
results.append((test_name, False))
print("\n" + "=" * 70)
print("Test Results Summary")
print("=" * 70)
for test_name, result in results:
status = "✅ PASS" if result else "❌ FAIL"
print(f" {status}: {test_name}")
passed = sum(1 for _, result in results if result)
total = len(results)
print()
print(f"Tests passed: {passed}/{total}")
print("=" * 70)
return all(result for _, result in results)
if __name__ == "__main__":
success = run_all_tests()
sys.exit(0 if success else 1)

View File

@ -1,427 +0,0 @@
#!/usr/bin/env python3
"""
Phase 4 Integration Test: GUI Updates for DeepFace
Tests that all GUI panels correctly handle DeepFace metadata and location formats
"""
import os
import sys
import tempfile
import sqlite3
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Suppress TensorFlow warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import warnings
warnings.filterwarnings('ignore')
from src.core.database import DatabaseManager
from src.core.face_processing import FaceProcessor
from src.core.config import DEEPFACE_DETECTOR_BACKEND, DEEPFACE_MODEL_NAME
def test_database_schema():
"""Test 1: Verify database schema has DeepFace columns"""
print("\n" + "="*60)
print("Test 1: Database Schema with DeepFace Columns")
print("="*60)
try:
# Create in-memory database
db = DatabaseManager(":memory:", verbose=0)
# Check faces table schema
with db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(faces)")
columns = {row[1]: row[2] for row in cursor.fetchall()}
# Verify DeepFace columns exist
required_columns = {
'id': 'INTEGER',
'photo_id': 'INTEGER',
'person_id': 'INTEGER',
'encoding': 'BLOB',
'location': 'TEXT',
'confidence': 'REAL',
'quality_score': 'REAL',
'detector_backend': 'TEXT',
'model_name': 'TEXT',
'face_confidence': 'REAL'
}
missing_columns = []
for col_name, col_type in required_columns.items():
if col_name not in columns:
missing_columns.append(col_name)
else:
print(f"✓ Column '{col_name}' exists with type '{columns[col_name]}'")
if missing_columns:
print(f"\n❌ FAIL: Missing columns: {missing_columns}")
return False
print("\n✅ PASS: All DeepFace columns present in database schema")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_face_data_retrieval():
"""Test 2: Verify face data retrieval includes DeepFace metadata"""
print("\n" + "="*60)
print("Test 2: Face Data Retrieval with DeepFace Metadata")
print("="*60)
try:
# Create in-memory database
db = DatabaseManager(":memory:", verbose=0)
# Create a test photo
test_photo_path = "/tmp/test_photo.jpg"
photo_id = db.add_photo(test_photo_path, "test_photo.jpg", None)
# Create a test face with DeepFace metadata
import numpy as np
test_encoding = np.random.rand(512).astype(np.float64) # 512-dim for ArcFace
test_location = "{'x': 100, 'y': 100, 'w': 50, 'h': 50}"
face_id = db.add_face(
photo_id=photo_id,
encoding=test_encoding.tobytes(),
location=test_location,
confidence=0.0,
quality_score=0.85,
person_id=None,
detector_backend='retinaface',
model_name='ArcFace',
face_confidence=0.95
)
print(f"✓ Created test face with ID {face_id}")
# Query the face data (simulating GUI panel queries)
with db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT f.id, f.photo_id, p.path, p.filename, f.location,
f.face_confidence, f.quality_score, f.detector_backend, f.model_name
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id = ?
""", (face_id,))
result = cursor.fetchone()
if not result:
print("\n❌ FAIL: Could not retrieve face data")
return False
# Unpack the result (9 fields)
face_id_ret, photo_id_ret, path, filename, location, face_conf, quality, detector, model = result
print(f"✓ Retrieved face data:")
print(f" - Face ID: {face_id_ret}")
print(f" - Photo ID: {photo_id_ret}")
print(f" - Location: {location}")
print(f" - Face Confidence: {face_conf}")
print(f" - Quality Score: {quality}")
print(f" - Detector: {detector}")
print(f" - Model: {model}")
# Verify the metadata
if face_conf != 0.95:
print(f"\n❌ FAIL: Face confidence mismatch: expected 0.95, got {face_conf}")
return False
if quality != 0.85:
print(f"\n❌ FAIL: Quality score mismatch: expected 0.85, got {quality}")
return False
if detector != 'retinaface':
print(f"\n❌ FAIL: Detector mismatch: expected 'retinaface', got {detector}")
return False
if model != 'ArcFace':
print(f"\n❌ FAIL: Model mismatch: expected 'ArcFace', got {model}")
return False
print("\n✅ PASS: Face data retrieval includes all DeepFace metadata")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_location_format_handling():
"""Test 3: Verify both location formats are handled correctly"""
print("\n" + "="*60)
print("Test 3: Location Format Handling (Dict & Tuple)")
print("="*60)
try:
# Test both location formats
deepface_location = "{'x': 100, 'y': 150, 'w': 80, 'h': 90}"
legacy_location = "(150, 180, 240, 100)"
# Parse DeepFace dict format
import ast
deepface_loc = ast.literal_eval(deepface_location)
if not isinstance(deepface_loc, dict):
print(f"❌ FAIL: DeepFace location not parsed as dict")
return False
if 'x' not in deepface_loc or 'y' not in deepface_loc or 'w' not in deepface_loc or 'h' not in deepface_loc:
print(f"❌ FAIL: DeepFace location missing required keys")
return False
print(f"✓ DeepFace format parsed correctly: {deepface_loc}")
# Legacy tuple format tests removed - only DeepFace format supported
print(f"✓ DeepFace format is the only supported format")
print("\n✅ PASS: DeepFace location format handled correctly")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_face_processor_configuration():
"""Test 4: Verify FaceProcessor accepts DeepFace configuration"""
print("\n" + "="*60)
print("Test 4: FaceProcessor DeepFace Configuration")
print("="*60)
try:
# Create in-memory database
db = DatabaseManager(":memory:", verbose=0)
# Create FaceProcessor with default config
processor_default = FaceProcessor(db, verbose=0)
print(f"✓ Default detector: {processor_default.detector_backend}")
print(f"✓ Default model: {processor_default.model_name}")
if processor_default.detector_backend != DEEPFACE_DETECTOR_BACKEND:
print(f"❌ FAIL: Default detector mismatch")
return False
if processor_default.model_name != DEEPFACE_MODEL_NAME:
print(f"❌ FAIL: Default model mismatch")
return False
# Create FaceProcessor with custom config
processor_custom = FaceProcessor(db, verbose=0,
detector_backend='mtcnn',
model_name='Facenet512')
print(f"✓ Custom detector: {processor_custom.detector_backend}")
print(f"✓ Custom model: {processor_custom.model_name}")
if processor_custom.detector_backend != 'mtcnn':
print(f"❌ FAIL: Custom detector not applied")
return False
if processor_custom.model_name != 'Facenet512':
print(f"❌ FAIL: Custom model not applied")
return False
print("\n✅ PASS: FaceProcessor correctly configured with DeepFace settings")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def test_gui_panel_compatibility():
"""Test 5: Verify GUI panels can unpack face data correctly"""
print("\n" + "="*60)
print("Test 5: GUI Panel Data Unpacking")
print("="*60)
try:
# Create in-memory database
db = DatabaseManager(":memory:", verbose=0)
# Create test photo and face
test_photo_path = "/tmp/test_photo.jpg"
photo_id = db.add_photo(test_photo_path, "test_photo.jpg", None)
import numpy as np
test_encoding = np.random.rand(512).astype(np.float64)
test_location = "{'x': 100, 'y': 100, 'w': 50, 'h': 50}"
face_id = db.add_face(
photo_id=photo_id,
encoding=test_encoding.tobytes(),
location=test_location,
confidence=0.0,
quality_score=0.85,
person_id=None,
detector_backend='retinaface',
model_name='ArcFace',
face_confidence=0.95
)
# Simulate identify_panel query
with db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT f.id, f.photo_id, p.path, p.filename, f.location,
f.face_confidence, f.quality_score, f.detector_backend, f.model_name
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.person_id IS NULL
""")
faces = cursor.fetchall()
if not faces:
print("❌ FAIL: No faces retrieved")
return False
# Simulate unpacking in identify_panel
for face_tuple in faces:
face_id, photo_id, photo_path, filename, location, face_conf, quality, detector, model = face_tuple
print(f"✓ Unpacked identify_panel data:")
print(f" - Face ID: {face_id}")
print(f" - Photo ID: {photo_id}")
print(f" - Location: {location}")
print(f" - Face Confidence: {face_conf}")
print(f" - Quality: {quality}")
print(f" - Detector/Model: {detector}/{model}")
# Simulate auto_match_panel query
with db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT f.id, f.person_id, f.photo_id, f.location, p.filename, f.quality_score,
f.face_confidence, f.detector_backend, f.model_name
FROM faces f
JOIN photos p ON f.photo_id = p.id
""")
faces = cursor.fetchall()
# Simulate unpacking in auto_match_panel (uses tuple indexing)
for face in faces:
face_id = face[0]
person_id = face[1]
photo_id = face[2]
location = face[3]
filename = face[4]
quality = face[5]
face_conf = face[6]
detector = face[7]
model = face[8]
print(f"✓ Unpacked auto_match_panel data (tuple indexing):")
print(f" - Face ID: {face_id}")
print(f" - Quality: {quality}")
print(f" - Face Confidence: {face_conf}")
# Simulate modify_panel query
with db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT f.id, f.photo_id, p.path, p.filename, f.location,
f.face_confidence, f.quality_score, f.detector_backend, f.model_name
FROM faces f
JOIN photos p ON f.photo_id = p.id
""")
faces = cursor.fetchall()
# Simulate unpacking in modify_panel
for face_tuple in faces:
face_id, photo_id, photo_path, filename, location, face_conf, quality, detector, model = face_tuple
print(f"✓ Unpacked modify_panel data:")
print(f" - Face ID: {face_id}")
print(f" - Quality: {quality}")
print("\n✅ PASS: All GUI panels can correctly unpack face data")
return True
except Exception as e:
print(f"\n❌ FAIL: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all Phase 4 tests"""
print("\n" + "="*70)
print("PHASE 4 INTEGRATION TEST SUITE: GUI Updates for DeepFace")
print("="*70)
tests = [
("Database Schema", test_database_schema),
("Face Data Retrieval", test_face_data_retrieval),
("Location Format Handling", test_location_format_handling),
("FaceProcessor Configuration", test_face_processor_configuration),
("GUI Panel Compatibility", test_gui_panel_compatibility),
]
results = []
for test_name, test_func in tests:
try:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f"\n❌ Test '{test_name}' crashed: {e}")
import traceback
traceback.print_exc()
results.append((test_name, False))
# Print summary
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
passed = 0
failed = 0
for test_name, result in results:
status = "✅ PASS" if result else "❌ FAIL"
print(f"{status}: {test_name}")
if result:
passed += 1
else:
failed += 1
print("="*70)
print(f"Tests passed: {passed}/{len(tests)}")
print(f"Tests failed: {failed}/{len(tests)}")
print("="*70)
if failed == 0:
print("\n🎉 ALL TESTS PASSED! Phase 4 GUI integration is complete!")
return 0
else:
print(f"\n⚠️ {failed} test(s) failed. Please review the errors above.")
return 1
if __name__ == "__main__":
sys.exit(main())