punimtag/database.py
tanyar09 b9a0637035 Enhance database schema with photo-tag linkage type and remove redundant migrations
This commit adds a new column, linkage_type, to the phototaglinkage table to distinguish between single and bulk tag additions. Additionally, the previous migration attempts to add date_taken and date_added columns to the photos table have been removed, streamlining the database initialization process. These changes improve the database structure for better tag management.
2025-10-06 14:42:22 -04:00

482 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Database operations and schema management for PunimTag
"""
import sqlite3
import threading
from contextlib import contextmanager
from typing import Dict, List, Tuple, Optional
from config import DEFAULT_DB_PATH, DB_TIMEOUT
class DatabaseManager:
"""Handles all database operations for the photo tagger"""
def __init__(self, db_path: str = DEFAULT_DB_PATH, verbose: int = 0):
"""Initialize database manager"""
self.db_path = db_path
self.verbose = verbose
self._db_connection = None
self._db_lock = threading.Lock()
self.init_database()
@contextmanager
def get_db_connection(self):
"""Context manager for database connections with connection pooling"""
with self._db_lock:
if self._db_connection is None:
self._db_connection = sqlite3.connect(self.db_path, timeout=DB_TIMEOUT)
self._db_connection.row_factory = sqlite3.Row
try:
yield self._db_connection
except Exception:
self._db_connection.rollback()
raise
else:
self._db_connection.commit()
def close_db_connection(self):
"""Close database connection"""
with self._db_lock:
if self._db_connection:
self._db_connection.close()
self._db_connection = None
def init_database(self):
"""Create database tables if they don't exist"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Photos table
cursor.execute('''
CREATE TABLE IF NOT EXISTS photos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT UNIQUE NOT NULL,
filename TEXT NOT NULL,
date_added DATETIME DEFAULT CURRENT_TIMESTAMP,
date_taken DATE,
processed BOOLEAN DEFAULT 0
)
''')
# People table
cursor.execute('''
CREATE TABLE IF NOT EXISTS people (
id INTEGER PRIMARY KEY AUTOINCREMENT,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
middle_name TEXT,
maiden_name TEXT,
date_of_birth DATE,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(first_name, last_name, middle_name, maiden_name, date_of_birth)
)
''')
# Faces table
cursor.execute('''
CREATE TABLE IF NOT EXISTS faces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
photo_id INTEGER NOT NULL,
person_id INTEGER,
encoding BLOB NOT NULL,
location TEXT NOT NULL,
confidence REAL DEFAULT 0.0,
quality_score REAL DEFAULT 0.0,
is_primary_encoding BOOLEAN DEFAULT 0,
FOREIGN KEY (photo_id) REFERENCES photos (id),
FOREIGN KEY (person_id) REFERENCES people (id)
)
''')
# Person encodings table for multiple encodings per person
cursor.execute('''
CREATE TABLE IF NOT EXISTS person_encodings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id INTEGER NOT NULL,
face_id INTEGER NOT NULL,
encoding BLOB NOT NULL,
quality_score REAL DEFAULT 0.0,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (person_id) REFERENCES people (id),
FOREIGN KEY (face_id) REFERENCES faces (id)
)
''')
# Tags table - holds only tag information
cursor.execute('''
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tag_name TEXT UNIQUE NOT NULL,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Photo-Tag linkage table
# linkage_type: INTEGER enum → 0 = single (per-photo add), 1 = bulk (folder-wide add)
cursor.execute('''
CREATE TABLE IF NOT EXISTS phototaglinkage (
linkage_id INTEGER PRIMARY KEY AUTOINCREMENT,
photo_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
linkage_type INTEGER NOT NULL DEFAULT 0 CHECK(linkage_type IN (0,1)),
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (photo_id) REFERENCES photos (id),
FOREIGN KEY (tag_id) REFERENCES tags (id),
UNIQUE(photo_id, tag_id)
)
''')
# Add indexes for better performance
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_person_id ON faces(person_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_photo_id ON faces(photo_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_processed ON photos(processed)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_quality ON faces(quality_score)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_person_id ON person_encodings(person_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_quality ON person_encodings(quality_score)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_taken ON photos(date_taken)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_added ON photos(date_added)')
if self.verbose >= 1:
print(f"✅ Database initialized: {self.db_path}")
def load_tag_mappings(self) -> Tuple[Dict[int, str], Dict[str, int]]:
"""Load tag name to ID and ID to name mappings from database"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT id, tag_name FROM tags ORDER BY tag_name')
tag_id_to_name = {}
tag_name_to_id = {}
for row in cursor.fetchall():
tag_id, tag_name = row
tag_id_to_name[tag_id] = tag_name
tag_name_to_id[tag_name] = tag_id
return tag_id_to_name, tag_name_to_id
def get_existing_tag_ids_for_photo(self, photo_id: int) -> List[int]:
"""Get list of tag IDs for a photo from database"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ptl.tag_id
FROM phototaglinkage ptl
WHERE ptl.photo_id = ?
ORDER BY ptl.created_date
''', (photo_id,))
return [row[0] for row in cursor.fetchall()]
def get_tag_id_by_name(self, tag_name: str, tag_name_to_id_map: Dict[str, int]) -> Optional[int]:
"""Get tag ID by name, creating the tag if it doesn't exist"""
if tag_name in tag_name_to_id_map:
return tag_name_to_id_map[tag_name]
return None
def get_tag_name_by_id(self, tag_id: int, tag_id_to_name_map: Dict[int, str]) -> str:
"""Get tag name by ID"""
return tag_id_to_name_map.get(tag_id, f"Unknown Tag {tag_id}")
def show_people_list(self, cursor=None) -> List[Tuple]:
"""Show list of people in database"""
if cursor is None:
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT id, first_name, last_name, middle_name, maiden_name, date_of_birth, created_date
FROM people
ORDER BY last_name, first_name
''')
return cursor.fetchall()
def add_photo(self, photo_path: str, filename: str, date_taken: Optional[str] = None) -> int:
"""Add a photo to the database and return its ID if new, None if already exists"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Check if photo already exists
cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,))
existing = cursor.fetchone()
if existing:
# Photo already exists, return None to indicate it wasn't added
return None
# Photo doesn't exist, insert it
cursor.execute('''
INSERT INTO photos (path, filename, date_taken)
VALUES (?, ?, ?)
''', (photo_path, filename, date_taken))
# Get the new photo ID
cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,))
result = cursor.fetchone()
return result[0] if result else None
def mark_photo_processed(self, photo_id: int):
"""Mark a photo as processed"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
def add_face(self, photo_id: int, encoding: bytes, location: str, confidence: float = 0.0,
quality_score: float = 0.0, person_id: Optional[int] = None) -> int:
"""Add a face to the database and return its ID"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO faces (photo_id, person_id, encoding, location, confidence, quality_score)
VALUES (?, ?, ?, ?, ?, ?)
''', (photo_id, person_id, encoding, location, confidence, quality_score))
return cursor.lastrowid
def update_face_person(self, face_id: int, person_id: Optional[int]):
"""Update the person_id for a face"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE faces SET person_id = ? WHERE id = ?', (person_id, face_id))
def add_person(self, first_name: str, last_name: str, middle_name: str = None,
maiden_name: str = None, date_of_birth: str = None) -> int:
"""Add a person to the database and return their ID"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO people (first_name, last_name, middle_name, maiden_name, date_of_birth)
VALUES (?, ?, ?, ?, ?)
''', (first_name, last_name, middle_name, maiden_name, date_of_birth))
# Get the person ID
cursor.execute('''
SELECT id FROM people
WHERE first_name = ? AND last_name = ? AND middle_name = ?
AND maiden_name = ? AND date_of_birth = ?
''', (first_name, last_name, middle_name, maiden_name, date_of_birth))
result = cursor.fetchone()
return result[0] if result else None
def add_tag(self, tag_name: str) -> int:
"""Add a tag to the database and return its ID"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('INSERT OR IGNORE INTO tags (tag_name) VALUES (?)', (tag_name,))
# Get the tag ID
cursor.execute('SELECT id FROM tags WHERE tag_name = ?', (tag_name,))
result = cursor.fetchone()
return result[0] if result else None
def link_photo_tag(self, photo_id: int, tag_id: int):
"""Link a photo to a tag"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO phototaglinkage (photo_id, tag_id)
VALUES (?, ?)
''', (photo_id, tag_id))
def unlink_photo_tag(self, photo_id: int, tag_id: int):
"""Unlink a photo from a tag"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM phototaglinkage
WHERE photo_id = ? AND tag_id = ?
''', (photo_id, tag_id))
def get_photos_by_pattern(self, pattern: str = None, limit: int = 10) -> List[Tuple]:
"""Get photos matching a pattern"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
if pattern:
cursor.execute('''
SELECT id, path, filename, date_taken, processed
FROM photos
WHERE filename LIKE ? OR path LIKE ?
ORDER BY date_added DESC
LIMIT ?
''', (f'%{pattern}%', f'%{pattern}%', limit))
else:
cursor.execute('''
SELECT id, path, filename, date_taken, processed
FROM photos
ORDER BY date_added DESC
LIMIT ?
''', (limit,))
return cursor.fetchall()
def get_unprocessed_photos(self, limit: int = 50) -> List[Tuple]:
"""Get unprocessed photos"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT id, path, filename, date_taken
FROM photos
WHERE processed = 0
ORDER BY date_added ASC
LIMIT ?
''', (limit,))
return cursor.fetchall()
def get_unidentified_faces(self, limit: int = 20) -> List[Tuple]:
"""Get unidentified faces"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT f.id, f.photo_id, f.location, f.confidence, f.quality_score,
p.path, p.filename
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.person_id IS NULL
ORDER BY f.quality_score DESC, f.confidence DESC
LIMIT ?
''', (limit,))
return cursor.fetchall()
def get_face_encodings(self, face_id: int) -> Optional[bytes]:
"""Get face encoding for a specific face"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT encoding FROM faces WHERE id = ?', (face_id,))
result = cursor.fetchone()
return result[0] if result else None
def get_face_photo_info(self, face_id: int) -> Optional[Tuple]:
"""Get photo information for a specific face"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT f.photo_id, p.filename, f.location
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id = ?
''', (face_id,))
result = cursor.fetchone()
return result if result else None
def get_all_face_encodings(self) -> List[Tuple]:
"""Get all face encodings with their IDs"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT id, encoding, person_id, quality_score FROM faces')
return cursor.fetchall()
def get_person_encodings(self, person_id: int, min_quality: float = 0.3) -> List[Tuple]:
"""Get all encodings for a person above minimum quality"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT pe.encoding, pe.quality_score, pe.face_id
FROM person_encodings pe
WHERE pe.person_id = ? AND pe.quality_score >= ?
ORDER BY pe.quality_score DESC
''', (person_id, min_quality))
return cursor.fetchall()
def add_person_encoding(self, person_id: int, face_id: int, encoding: bytes, quality_score: float):
"""Add a person encoding"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO person_encodings (person_id, face_id, encoding, quality_score)
VALUES (?, ?, ?, ?)
''', (person_id, face_id, encoding, quality_score))
def update_person_encodings(self, person_id: int):
"""Update person encodings by removing old ones and adding current face encodings"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Remove old encodings
cursor.execute('DELETE FROM person_encodings WHERE person_id = ?', (person_id,))
# Add current face encodings
cursor.execute('''
INSERT INTO person_encodings (person_id, face_id, encoding, quality_score)
SELECT ?, id, encoding, quality_score
FROM faces
WHERE person_id = ? AND quality_score >= 0.3
''', (person_id, person_id))
def get_similar_faces(self, face_id: int, tolerance: float = 0.6,
include_same_photo: bool = False) -> List[Dict]:
"""Get faces similar to the given face ID"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Get the target face encoding and photo
cursor.execute('''
SELECT f.encoding, f.photo_id, p.path, p.filename
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id = ?
''', (face_id,))
target_result = cursor.fetchone()
if not target_result:
return []
target_encoding = target_result[0]
target_photo_id = target_result[1]
target_path = target_result[2]
target_filename = target_result[3]
# Get all other faces
if include_same_photo:
cursor.execute('''
SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence,
p.path, p.filename, f.photo_id
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id != ?
''', (face_id,))
else:
cursor.execute('''
SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence,
p.path, p.filename, f.photo_id
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id != ? AND f.photo_id != ?
''', (face_id, target_photo_id))
return cursor.fetchall()
def get_statistics(self) -> Dict:
"""Get database statistics"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
stats = {}
# Photo statistics
cursor.execute('SELECT COUNT(*) FROM photos')
stats['total_photos'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM photos WHERE processed = 1')
stats['processed_photos'] = cursor.fetchone()[0]
# Face statistics
cursor.execute('SELECT COUNT(*) FROM faces')
stats['total_faces'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL')
stats['identified_faces'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NULL')
stats['unidentified_faces'] = cursor.fetchone()[0]
# People statistics
cursor.execute('SELECT COUNT(*) FROM people')
stats['total_people'] = cursor.fetchone()[0]
# Tag statistics
cursor.execute('SELECT COUNT(*) FROM tags')
stats['total_tags'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM phototaglinkage')
stats['total_photo_tags'] = cursor.fetchone()[0]
return stats