#!/usr/bin/env python3 """ Database operations and schema management for PunimTag """ import sqlite3 import threading from contextlib import contextmanager from typing import Dict, List, Tuple, Optional from config import DEFAULT_DB_PATH, DB_TIMEOUT class DatabaseManager: """Handles all database operations for the photo tagger""" def __init__(self, db_path: str = DEFAULT_DB_PATH, verbose: int = 0): """Initialize database manager""" self.db_path = db_path self.verbose = verbose self._db_connection = None self._db_lock = threading.Lock() self.init_database() @contextmanager def get_db_connection(self): """Context manager for database connections with connection pooling""" with self._db_lock: if self._db_connection is None: self._db_connection = sqlite3.connect(self.db_path, timeout=DB_TIMEOUT, check_same_thread=False) self._db_connection.row_factory = sqlite3.Row try: yield self._db_connection except Exception: self._db_connection.rollback() raise else: self._db_connection.commit() def close_db_connection(self): """Close database connection""" with self._db_lock: if self._db_connection: self._db_connection.close() self._db_connection = None def init_database(self): """Create database tables if they don't exist""" with self.get_db_connection() as conn: cursor = conn.cursor() # Photos table cursor.execute(''' CREATE TABLE IF NOT EXISTS photos ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT UNIQUE NOT NULL, filename TEXT NOT NULL, date_added DATETIME DEFAULT CURRENT_TIMESTAMP, date_taken DATE, processed BOOLEAN DEFAULT 0 ) ''') # People table cursor.execute(''' CREATE TABLE IF NOT EXISTS people ( id INTEGER PRIMARY KEY AUTOINCREMENT, first_name TEXT NOT NULL, last_name TEXT NOT NULL, middle_name TEXT, maiden_name TEXT, date_of_birth DATE, created_date DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(first_name, last_name, middle_name, maiden_name, date_of_birth) ) ''') # Faces table cursor.execute(''' CREATE TABLE IF NOT EXISTS faces ( id INTEGER PRIMARY KEY AUTOINCREMENT, photo_id INTEGER NOT NULL, person_id INTEGER, encoding BLOB NOT NULL, location TEXT NOT NULL, confidence REAL DEFAULT 0.0, quality_score REAL DEFAULT 0.0, is_primary_encoding BOOLEAN DEFAULT 0, FOREIGN KEY (photo_id) REFERENCES photos (id), FOREIGN KEY (person_id) REFERENCES people (id) ) ''') # Person encodings table for multiple encodings per person cursor.execute(''' CREATE TABLE IF NOT EXISTS person_encodings ( id INTEGER PRIMARY KEY AUTOINCREMENT, person_id INTEGER NOT NULL, face_id INTEGER NOT NULL, encoding BLOB NOT NULL, quality_score REAL DEFAULT 0.0, created_date DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (person_id) REFERENCES people (id), FOREIGN KEY (face_id) REFERENCES faces (id) ) ''') # Tags table - holds only tag information cursor.execute(''' CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, tag_name TEXT UNIQUE NOT NULL, created_date DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # Photo-Tag linkage table # linkage_type: INTEGER enum → 0 = single (per-photo add), 1 = bulk (folder-wide add) cursor.execute(''' CREATE TABLE IF NOT EXISTS phototaglinkage ( linkage_id INTEGER PRIMARY KEY AUTOINCREMENT, photo_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, linkage_type INTEGER NOT NULL DEFAULT 0 CHECK(linkage_type IN (0,1)), created_date DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (photo_id) REFERENCES photos (id), FOREIGN KEY (tag_id) REFERENCES tags (id), UNIQUE(photo_id, tag_id) ) ''') # Add indexes for better performance cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_person_id ON faces(person_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_photo_id ON faces(photo_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_processed ON photos(processed)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_quality ON faces(quality_score)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_person_id ON person_encodings(person_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_quality ON person_encodings(quality_score)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_taken ON photos(date_taken)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_added ON photos(date_added)') if self.verbose >= 1: print(f"✅ Database initialized: {self.db_path}") def load_tag_mappings(self) -> Tuple[Dict[int, str], Dict[str, int]]: """Load tag name to ID and ID to name mappings from database""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT id, tag_name FROM tags ORDER BY tag_name') tag_id_to_name = {} tag_name_to_id = {} for row in cursor.fetchall(): tag_id, tag_name = row tag_id_to_name[tag_id] = tag_name tag_name_to_id[tag_name] = tag_id return tag_id_to_name, tag_name_to_id def get_existing_tag_ids_for_photo(self, photo_id: int) -> List[int]: """Get list of tag IDs for a photo from database""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT ptl.tag_id FROM phototaglinkage ptl WHERE ptl.photo_id = ? ORDER BY ptl.created_date ''', (photo_id,)) return [row[0] for row in cursor.fetchall()] def get_tag_id_by_name(self, tag_name: str, tag_name_to_id_map: Dict[str, int]) -> Optional[int]: """Get tag ID by name, creating the tag if it doesn't exist""" if tag_name in tag_name_to_id_map: return tag_name_to_id_map[tag_name] return None def get_tag_name_by_id(self, tag_id: int, tag_id_to_name_map: Dict[int, str]) -> str: """Get tag name by ID""" return tag_id_to_name_map.get(tag_id, f"Unknown Tag {tag_id}") def show_people_list(self, cursor=None) -> List[Tuple]: """Show list of people in database""" if cursor is None: with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT id, first_name, last_name, middle_name, maiden_name, date_of_birth, created_date FROM people ORDER BY last_name, first_name ''') return cursor.fetchall() def add_photo(self, photo_path: str, filename: str, date_taken: Optional[str] = None) -> int: """Add a photo to the database and return its ID if new, None if already exists""" with self.get_db_connection() as conn: cursor = conn.cursor() # Check if photo already exists cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,)) existing = cursor.fetchone() if existing: # Photo already exists, return None to indicate it wasn't added return None # Photo doesn't exist, insert it cursor.execute(''' INSERT INTO photos (path, filename, date_taken) VALUES (?, ?, ?) ''', (photo_path, filename, date_taken)) # Get the new photo ID cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,)) result = cursor.fetchone() return result[0] if result else None def mark_photo_processed(self, photo_id: int): """Mark a photo as processed""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,)) def add_face(self, photo_id: int, encoding: bytes, location: str, confidence: float = 0.0, quality_score: float = 0.0, person_id: Optional[int] = None) -> int: """Add a face to the database and return its ID""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO faces (photo_id, person_id, encoding, location, confidence, quality_score) VALUES (?, ?, ?, ?, ?, ?) ''', (photo_id, person_id, encoding, location, confidence, quality_score)) return cursor.lastrowid def update_face_person(self, face_id: int, person_id: Optional[int]): """Update the person_id for a face""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute('UPDATE faces SET person_id = ? WHERE id = ?', (person_id, face_id)) def add_person(self, first_name: str, last_name: str, middle_name: str = None, maiden_name: str = None, date_of_birth: str = None) -> int: """Add a person to the database and return their ID""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR IGNORE INTO people (first_name, last_name, middle_name, maiden_name, date_of_birth) VALUES (?, ?, ?, ?, ?) ''', (first_name, last_name, middle_name, maiden_name, date_of_birth)) # Get the person ID cursor.execute(''' SELECT id FROM people WHERE first_name = ? AND last_name = ? AND middle_name = ? AND maiden_name = ? AND date_of_birth = ? ''', (first_name, last_name, middle_name, maiden_name, date_of_birth)) result = cursor.fetchone() return result[0] if result else None def add_tag(self, tag_name: str) -> int: """Add a tag to the database and return its ID""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute('INSERT OR IGNORE INTO tags (tag_name) VALUES (?)', (tag_name,)) # Get the tag ID cursor.execute('SELECT id FROM tags WHERE tag_name = ?', (tag_name,)) result = cursor.fetchone() return result[0] if result else None def link_photo_tag(self, photo_id: int, tag_id: int): """Link a photo to a tag""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR IGNORE INTO phototaglinkage (photo_id, tag_id) VALUES (?, ?) ''', (photo_id, tag_id)) def unlink_photo_tag(self, photo_id: int, tag_id: int): """Unlink a photo from a tag""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' DELETE FROM phototaglinkage WHERE photo_id = ? AND tag_id = ? ''', (photo_id, tag_id)) def get_photos_by_pattern(self, pattern: str = None, limit: int = 10) -> List[Tuple]: """Get photos matching a pattern""" with self.get_db_connection() as conn: cursor = conn.cursor() if pattern: cursor.execute(''' SELECT id, path, filename, date_taken, processed FROM photos WHERE filename LIKE ? OR path LIKE ? ORDER BY date_added DESC LIMIT ? ''', (f'%{pattern}%', f'%{pattern}%', limit)) else: cursor.execute(''' SELECT id, path, filename, date_taken, processed FROM photos ORDER BY date_added DESC LIMIT ? ''', (limit,)) return cursor.fetchall() def get_unprocessed_photos(self, limit: int = 50) -> List[Tuple]: """Get unprocessed photos""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT id, path, filename, date_taken FROM photos WHERE processed = 0 ORDER BY date_added ASC LIMIT ? ''', (limit,)) return cursor.fetchall() def get_unidentified_faces(self, limit: int = 20) -> List[Tuple]: """Get unidentified faces""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT f.id, f.photo_id, f.location, f.confidence, f.quality_score, p.path, p.filename FROM faces f JOIN photos p ON f.photo_id = p.id WHERE f.person_id IS NULL ORDER BY f.quality_score DESC, f.confidence DESC LIMIT ? ''', (limit,)) return cursor.fetchall() def get_face_encodings(self, face_id: int) -> Optional[bytes]: """Get face encoding for a specific face""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT encoding FROM faces WHERE id = ?', (face_id,)) result = cursor.fetchone() return result[0] if result else None def get_face_photo_info(self, face_id: int) -> Optional[Tuple]: """Get photo information for a specific face""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT f.photo_id, p.filename, f.location FROM faces f JOIN photos p ON f.photo_id = p.id WHERE f.id = ? ''', (face_id,)) result = cursor.fetchone() return result if result else None def get_all_face_encodings(self) -> List[Tuple]: """Get all face encodings with their IDs""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT id, encoding, person_id, quality_score FROM faces') return cursor.fetchall() def get_person_encodings(self, person_id: int, min_quality: float = 0.3) -> List[Tuple]: """Get all encodings for a person above minimum quality""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT pe.encoding, pe.quality_score, pe.face_id FROM person_encodings pe WHERE pe.person_id = ? AND pe.quality_score >= ? ORDER BY pe.quality_score DESC ''', (person_id, min_quality)) return cursor.fetchall() def add_person_encoding(self, person_id: int, face_id: int, encoding: bytes, quality_score: float): """Add a person encoding""" with self.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO person_encodings (person_id, face_id, encoding, quality_score) VALUES (?, ?, ?, ?) ''', (person_id, face_id, encoding, quality_score)) def update_person_encodings(self, person_id: int): """Update person encodings by removing old ones and adding current face encodings""" with self.get_db_connection() as conn: cursor = conn.cursor() # Remove old encodings cursor.execute('DELETE FROM person_encodings WHERE person_id = ?', (person_id,)) # Add current face encodings cursor.execute(''' INSERT INTO person_encodings (person_id, face_id, encoding, quality_score) SELECT ?, id, encoding, quality_score FROM faces WHERE person_id = ? AND quality_score >= 0.3 ''', (person_id, person_id)) def get_similar_faces(self, face_id: int, tolerance: float = 0.6, include_same_photo: bool = False) -> List[Dict]: """Get faces similar to the given face ID""" with self.get_db_connection() as conn: cursor = conn.cursor() # Get the target face encoding and photo cursor.execute(''' SELECT f.encoding, f.photo_id, p.path, p.filename FROM faces f JOIN photos p ON f.photo_id = p.id WHERE f.id = ? ''', (face_id,)) target_result = cursor.fetchone() if not target_result: return [] target_encoding = target_result[0] target_photo_id = target_result[1] target_path = target_result[2] target_filename = target_result[3] # Get all other faces if include_same_photo: cursor.execute(''' SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence, p.path, p.filename, f.photo_id FROM faces f JOIN photos p ON f.photo_id = p.id WHERE f.id != ? ''', (face_id,)) else: cursor.execute(''' SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence, p.path, p.filename, f.photo_id FROM faces f JOIN photos p ON f.photo_id = p.id WHERE f.id != ? AND f.photo_id != ? ''', (face_id, target_photo_id)) return cursor.fetchall() def get_statistics(self) -> Dict: """Get database statistics""" with self.get_db_connection() as conn: cursor = conn.cursor() stats = {} # Photo statistics cursor.execute('SELECT COUNT(*) FROM photos') stats['total_photos'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM photos WHERE processed = 1') stats['processed_photos'] = cursor.fetchone()[0] # Face statistics cursor.execute('SELECT COUNT(*) FROM faces') stats['total_faces'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL') stats['identified_faces'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NULL') stats['unidentified_faces'] = cursor.fetchone()[0] # People statistics cursor.execute('SELECT COUNT(*) FROM people') stats['total_people'] = cursor.fetchone()[0] # Tag statistics cursor.execute('SELECT COUNT(*) FROM tags') stats['total_tags'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM phototaglinkage') stats['total_photo_tags'] = cursor.fetchone()[0] return stats