punimtag/database.py
tanyar09 0883a47914 Add Dashboard GUI for core feature management
This commit introduces the DashboardGUI class, providing a user-friendly interface for launching core features of the PunimTag application. The dashboard includes placeholder buttons for scanning, processing, and identifying faces, along with options for folder input and batch processing. The PhotoTagger class is updated to integrate the new dashboard functionality, and the README is revised to include usage instructions for the new dashboard command. This enhancement aims to streamline user interactions and improve overall accessibility to key features.
2025-10-07 12:27:57 -04:00

482 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Database operations and schema management for PunimTag
"""
import sqlite3
import threading
from contextlib import contextmanager
from typing import Dict, List, Tuple, Optional
from config import DEFAULT_DB_PATH, DB_TIMEOUT
class DatabaseManager:
"""Handles all database operations for the photo tagger"""
def __init__(self, db_path: str = DEFAULT_DB_PATH, verbose: int = 0):
"""Initialize database manager"""
self.db_path = db_path
self.verbose = verbose
self._db_connection = None
self._db_lock = threading.Lock()
self.init_database()
@contextmanager
def get_db_connection(self):
"""Context manager for database connections with connection pooling"""
with self._db_lock:
if self._db_connection is None:
self._db_connection = sqlite3.connect(self.db_path, timeout=DB_TIMEOUT, check_same_thread=False)
self._db_connection.row_factory = sqlite3.Row
try:
yield self._db_connection
except Exception:
self._db_connection.rollback()
raise
else:
self._db_connection.commit()
def close_db_connection(self):
"""Close database connection"""
with self._db_lock:
if self._db_connection:
self._db_connection.close()
self._db_connection = None
def init_database(self):
"""Create database tables if they don't exist"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Photos table
cursor.execute('''
CREATE TABLE IF NOT EXISTS photos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT UNIQUE NOT NULL,
filename TEXT NOT NULL,
date_added DATETIME DEFAULT CURRENT_TIMESTAMP,
date_taken DATE,
processed BOOLEAN DEFAULT 0
)
''')
# People table
cursor.execute('''
CREATE TABLE IF NOT EXISTS people (
id INTEGER PRIMARY KEY AUTOINCREMENT,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
middle_name TEXT,
maiden_name TEXT,
date_of_birth DATE,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(first_name, last_name, middle_name, maiden_name, date_of_birth)
)
''')
# Faces table
cursor.execute('''
CREATE TABLE IF NOT EXISTS faces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
photo_id INTEGER NOT NULL,
person_id INTEGER,
encoding BLOB NOT NULL,
location TEXT NOT NULL,
confidence REAL DEFAULT 0.0,
quality_score REAL DEFAULT 0.0,
is_primary_encoding BOOLEAN DEFAULT 0,
FOREIGN KEY (photo_id) REFERENCES photos (id),
FOREIGN KEY (person_id) REFERENCES people (id)
)
''')
# Person encodings table for multiple encodings per person
cursor.execute('''
CREATE TABLE IF NOT EXISTS person_encodings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id INTEGER NOT NULL,
face_id INTEGER NOT NULL,
encoding BLOB NOT NULL,
quality_score REAL DEFAULT 0.0,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (person_id) REFERENCES people (id),
FOREIGN KEY (face_id) REFERENCES faces (id)
)
''')
# Tags table - holds only tag information
cursor.execute('''
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tag_name TEXT UNIQUE NOT NULL,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Photo-Tag linkage table
# linkage_type: INTEGER enum → 0 = single (per-photo add), 1 = bulk (folder-wide add)
cursor.execute('''
CREATE TABLE IF NOT EXISTS phototaglinkage (
linkage_id INTEGER PRIMARY KEY AUTOINCREMENT,
photo_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
linkage_type INTEGER NOT NULL DEFAULT 0 CHECK(linkage_type IN (0,1)),
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (photo_id) REFERENCES photos (id),
FOREIGN KEY (tag_id) REFERENCES tags (id),
UNIQUE(photo_id, tag_id)
)
''')
# Add indexes for better performance
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_person_id ON faces(person_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_photo_id ON faces(photo_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_processed ON photos(processed)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_quality ON faces(quality_score)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_person_id ON person_encodings(person_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_quality ON person_encodings(quality_score)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_taken ON photos(date_taken)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_added ON photos(date_added)')
if self.verbose >= 1:
print(f"✅ Database initialized: {self.db_path}")
def load_tag_mappings(self) -> Tuple[Dict[int, str], Dict[str, int]]:
"""Load tag name to ID and ID to name mappings from database"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT id, tag_name FROM tags ORDER BY tag_name')
tag_id_to_name = {}
tag_name_to_id = {}
for row in cursor.fetchall():
tag_id, tag_name = row
tag_id_to_name[tag_id] = tag_name
tag_name_to_id[tag_name] = tag_id
return tag_id_to_name, tag_name_to_id
def get_existing_tag_ids_for_photo(self, photo_id: int) -> List[int]:
"""Get list of tag IDs for a photo from database"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ptl.tag_id
FROM phototaglinkage ptl
WHERE ptl.photo_id = ?
ORDER BY ptl.created_date
''', (photo_id,))
return [row[0] for row in cursor.fetchall()]
def get_tag_id_by_name(self, tag_name: str, tag_name_to_id_map: Dict[str, int]) -> Optional[int]:
"""Get tag ID by name, creating the tag if it doesn't exist"""
if tag_name in tag_name_to_id_map:
return tag_name_to_id_map[tag_name]
return None
def get_tag_name_by_id(self, tag_id: int, tag_id_to_name_map: Dict[int, str]) -> str:
"""Get tag name by ID"""
return tag_id_to_name_map.get(tag_id, f"Unknown Tag {tag_id}")
def show_people_list(self, cursor=None) -> List[Tuple]:
"""Show list of people in database"""
if cursor is None:
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT id, first_name, last_name, middle_name, maiden_name, date_of_birth, created_date
FROM people
ORDER BY last_name, first_name
''')
return cursor.fetchall()
def add_photo(self, photo_path: str, filename: str, date_taken: Optional[str] = None) -> int:
"""Add a photo to the database and return its ID if new, None if already exists"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Check if photo already exists
cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,))
existing = cursor.fetchone()
if existing:
# Photo already exists, return None to indicate it wasn't added
return None
# Photo doesn't exist, insert it
cursor.execute('''
INSERT INTO photos (path, filename, date_taken)
VALUES (?, ?, ?)
''', (photo_path, filename, date_taken))
# Get the new photo ID
cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,))
result = cursor.fetchone()
return result[0] if result else None
def mark_photo_processed(self, photo_id: int):
"""Mark a photo as processed"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
def add_face(self, photo_id: int, encoding: bytes, location: str, confidence: float = 0.0,
quality_score: float = 0.0, person_id: Optional[int] = None) -> int:
"""Add a face to the database and return its ID"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO faces (photo_id, person_id, encoding, location, confidence, quality_score)
VALUES (?, ?, ?, ?, ?, ?)
''', (photo_id, person_id, encoding, location, confidence, quality_score))
return cursor.lastrowid
def update_face_person(self, face_id: int, person_id: Optional[int]):
"""Update the person_id for a face"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE faces SET person_id = ? WHERE id = ?', (person_id, face_id))
def add_person(self, first_name: str, last_name: str, middle_name: str = None,
maiden_name: str = None, date_of_birth: str = None) -> int:
"""Add a person to the database and return their ID"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO people (first_name, last_name, middle_name, maiden_name, date_of_birth)
VALUES (?, ?, ?, ?, ?)
''', (first_name, last_name, middle_name, maiden_name, date_of_birth))
# Get the person ID
cursor.execute('''
SELECT id FROM people
WHERE first_name = ? AND last_name = ? AND middle_name = ?
AND maiden_name = ? AND date_of_birth = ?
''', (first_name, last_name, middle_name, maiden_name, date_of_birth))
result = cursor.fetchone()
return result[0] if result else None
def add_tag(self, tag_name: str) -> int:
"""Add a tag to the database and return its ID"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('INSERT OR IGNORE INTO tags (tag_name) VALUES (?)', (tag_name,))
# Get the tag ID
cursor.execute('SELECT id FROM tags WHERE tag_name = ?', (tag_name,))
result = cursor.fetchone()
return result[0] if result else None
def link_photo_tag(self, photo_id: int, tag_id: int):
"""Link a photo to a tag"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO phototaglinkage (photo_id, tag_id)
VALUES (?, ?)
''', (photo_id, tag_id))
def unlink_photo_tag(self, photo_id: int, tag_id: int):
"""Unlink a photo from a tag"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM phototaglinkage
WHERE photo_id = ? AND tag_id = ?
''', (photo_id, tag_id))
def get_photos_by_pattern(self, pattern: str = None, limit: int = 10) -> List[Tuple]:
"""Get photos matching a pattern"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
if pattern:
cursor.execute('''
SELECT id, path, filename, date_taken, processed
FROM photos
WHERE filename LIKE ? OR path LIKE ?
ORDER BY date_added DESC
LIMIT ?
''', (f'%{pattern}%', f'%{pattern}%', limit))
else:
cursor.execute('''
SELECT id, path, filename, date_taken, processed
FROM photos
ORDER BY date_added DESC
LIMIT ?
''', (limit,))
return cursor.fetchall()
def get_unprocessed_photos(self, limit: int = 50) -> List[Tuple]:
"""Get unprocessed photos"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT id, path, filename, date_taken
FROM photos
WHERE processed = 0
ORDER BY date_added ASC
LIMIT ?
''', (limit,))
return cursor.fetchall()
def get_unidentified_faces(self, limit: int = 20) -> List[Tuple]:
"""Get unidentified faces"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT f.id, f.photo_id, f.location, f.confidence, f.quality_score,
p.path, p.filename
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.person_id IS NULL
ORDER BY f.quality_score DESC, f.confidence DESC
LIMIT ?
''', (limit,))
return cursor.fetchall()
def get_face_encodings(self, face_id: int) -> Optional[bytes]:
"""Get face encoding for a specific face"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT encoding FROM faces WHERE id = ?', (face_id,))
result = cursor.fetchone()
return result[0] if result else None
def get_face_photo_info(self, face_id: int) -> Optional[Tuple]:
"""Get photo information for a specific face"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT f.photo_id, p.filename, f.location
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id = ?
''', (face_id,))
result = cursor.fetchone()
return result if result else None
def get_all_face_encodings(self) -> List[Tuple]:
"""Get all face encodings with their IDs"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT id, encoding, person_id, quality_score FROM faces')
return cursor.fetchall()
def get_person_encodings(self, person_id: int, min_quality: float = 0.3) -> List[Tuple]:
"""Get all encodings for a person above minimum quality"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT pe.encoding, pe.quality_score, pe.face_id
FROM person_encodings pe
WHERE pe.person_id = ? AND pe.quality_score >= ?
ORDER BY pe.quality_score DESC
''', (person_id, min_quality))
return cursor.fetchall()
def add_person_encoding(self, person_id: int, face_id: int, encoding: bytes, quality_score: float):
"""Add a person encoding"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO person_encodings (person_id, face_id, encoding, quality_score)
VALUES (?, ?, ?, ?)
''', (person_id, face_id, encoding, quality_score))
def update_person_encodings(self, person_id: int):
"""Update person encodings by removing old ones and adding current face encodings"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Remove old encodings
cursor.execute('DELETE FROM person_encodings WHERE person_id = ?', (person_id,))
# Add current face encodings
cursor.execute('''
INSERT INTO person_encodings (person_id, face_id, encoding, quality_score)
SELECT ?, id, encoding, quality_score
FROM faces
WHERE person_id = ? AND quality_score >= 0.3
''', (person_id, person_id))
def get_similar_faces(self, face_id: int, tolerance: float = 0.6,
include_same_photo: bool = False) -> List[Dict]:
"""Get faces similar to the given face ID"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Get the target face encoding and photo
cursor.execute('''
SELECT f.encoding, f.photo_id, p.path, p.filename
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id = ?
''', (face_id,))
target_result = cursor.fetchone()
if not target_result:
return []
target_encoding = target_result[0]
target_photo_id = target_result[1]
target_path = target_result[2]
target_filename = target_result[3]
# Get all other faces
if include_same_photo:
cursor.execute('''
SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence,
p.path, p.filename, f.photo_id
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id != ?
''', (face_id,))
else:
cursor.execute('''
SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence,
p.path, p.filename, f.photo_id
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id != ? AND f.photo_id != ?
''', (face_id, target_photo_id))
return cursor.fetchall()
def get_statistics(self) -> Dict:
"""Get database statistics"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
stats = {}
# Photo statistics
cursor.execute('SELECT COUNT(*) FROM photos')
stats['total_photos'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM photos WHERE processed = 1')
stats['processed_photos'] = cursor.fetchone()[0]
# Face statistics
cursor.execute('SELECT COUNT(*) FROM faces')
stats['total_faces'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL')
stats['identified_faces'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NULL')
stats['unidentified_faces'] = cursor.fetchone()[0]
# People statistics
cursor.execute('SELECT COUNT(*) FROM people')
stats['total_people'] = cursor.fetchone()[0]
# Tag statistics
cursor.execute('SELECT COUNT(*) FROM tags')
stats['total_tags'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM phototaglinkage')
stats['total_photo_tags'] = cursor.fetchone()[0]
return stats