This commit updates the DatabaseManager to support case-insensitive tag lookups and additions, ensuring consistent tag handling. The SearchGUI and TagManagerGUI have been modified to reflect these changes, allowing for improved user experience when managing tags. Additionally, the search logic in SearchStats and TagManagement has been adjusted for case-insensitive tag ID retrieval, enhancing overall functionality and reliability in tag management across the application.
493 lines
21 KiB
Python
493 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Database operations and schema management for PunimTag
|
|
"""
|
|
|
|
import sqlite3
|
|
import threading
|
|
from contextlib import contextmanager
|
|
from typing import Dict, List, Tuple, Optional
|
|
from config import DEFAULT_DB_PATH, DB_TIMEOUT
|
|
|
|
|
|
class DatabaseManager:
|
|
"""Handles all database operations for the photo tagger"""
|
|
|
|
def __init__(self, db_path: str = DEFAULT_DB_PATH, verbose: int = 0):
|
|
"""Initialize database manager"""
|
|
self.db_path = db_path
|
|
self.verbose = verbose
|
|
self._db_connection = None
|
|
self._db_lock = threading.Lock()
|
|
self.init_database()
|
|
|
|
@contextmanager
|
|
def get_db_connection(self):
|
|
"""Context manager for database connections with connection pooling"""
|
|
with self._db_lock:
|
|
if self._db_connection is None:
|
|
self._db_connection = sqlite3.connect(self.db_path, timeout=DB_TIMEOUT, check_same_thread=False)
|
|
self._db_connection.row_factory = sqlite3.Row
|
|
try:
|
|
yield self._db_connection
|
|
except Exception:
|
|
self._db_connection.rollback()
|
|
raise
|
|
else:
|
|
self._db_connection.commit()
|
|
|
|
def close_db_connection(self):
|
|
"""Close database connection"""
|
|
with self._db_lock:
|
|
if self._db_connection:
|
|
self._db_connection.close()
|
|
self._db_connection = None
|
|
|
|
def init_database(self):
|
|
"""Create database tables if they don't exist"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Photos table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS photos (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
path TEXT UNIQUE NOT NULL,
|
|
filename TEXT NOT NULL,
|
|
date_added DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
date_taken DATE,
|
|
processed BOOLEAN DEFAULT 0
|
|
)
|
|
''')
|
|
|
|
# People table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS people (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
first_name TEXT NOT NULL,
|
|
last_name TEXT NOT NULL,
|
|
middle_name TEXT,
|
|
maiden_name TEXT,
|
|
date_of_birth DATE,
|
|
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(first_name, last_name, middle_name, maiden_name, date_of_birth)
|
|
)
|
|
''')
|
|
|
|
# Faces table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS faces (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
photo_id INTEGER NOT NULL,
|
|
person_id INTEGER,
|
|
encoding BLOB NOT NULL,
|
|
location TEXT NOT NULL,
|
|
confidence REAL DEFAULT 0.0,
|
|
quality_score REAL DEFAULT 0.0,
|
|
is_primary_encoding BOOLEAN DEFAULT 0,
|
|
FOREIGN KEY (photo_id) REFERENCES photos (id),
|
|
FOREIGN KEY (person_id) REFERENCES people (id)
|
|
)
|
|
''')
|
|
|
|
# Person encodings table for multiple encodings per person
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS person_encodings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
person_id INTEGER NOT NULL,
|
|
face_id INTEGER NOT NULL,
|
|
encoding BLOB NOT NULL,
|
|
quality_score REAL DEFAULT 0.0,
|
|
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (person_id) REFERENCES people (id),
|
|
FOREIGN KEY (face_id) REFERENCES faces (id)
|
|
)
|
|
''')
|
|
|
|
# Tags table - holds only tag information
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS tags (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
tag_name TEXT UNIQUE NOT NULL,
|
|
created_date DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
# Photo-Tag linkage table
|
|
# linkage_type: INTEGER enum → 0 = single (per-photo add), 1 = bulk (folder-wide add)
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS phototaglinkage (
|
|
linkage_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
photo_id INTEGER NOT NULL,
|
|
tag_id INTEGER NOT NULL,
|
|
linkage_type INTEGER NOT NULL DEFAULT 0 CHECK(linkage_type IN (0,1)),
|
|
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (photo_id) REFERENCES photos (id),
|
|
FOREIGN KEY (tag_id) REFERENCES tags (id),
|
|
UNIQUE(photo_id, tag_id)
|
|
)
|
|
''')
|
|
|
|
# Add indexes for better performance
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_person_id ON faces(person_id)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_photo_id ON faces(photo_id)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_processed ON photos(processed)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_quality ON faces(quality_score)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_person_id ON person_encodings(person_id)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_quality ON person_encodings(quality_score)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_taken ON photos(date_taken)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_added ON photos(date_added)')
|
|
|
|
|
|
|
|
if self.verbose >= 1:
|
|
print(f"✅ Database initialized: {self.db_path}")
|
|
|
|
def load_tag_mappings(self) -> Tuple[Dict[int, str], Dict[str, int]]:
|
|
"""Load tag name to ID and ID to name mappings from database (case-insensitive)"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT id, tag_name FROM tags ORDER BY LOWER(tag_name)')
|
|
tag_id_to_name = {}
|
|
tag_name_to_id = {}
|
|
for row in cursor.fetchall():
|
|
tag_id, tag_name = row
|
|
tag_id_to_name[tag_id] = tag_name
|
|
# Use lowercase for case-insensitive lookups
|
|
tag_name_to_id[tag_name.lower()] = tag_id
|
|
return tag_id_to_name, tag_name_to_id
|
|
|
|
def get_existing_tag_ids_for_photo(self, photo_id: int) -> List[int]:
|
|
"""Get list of tag IDs for a photo from database"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
SELECT ptl.tag_id
|
|
FROM phototaglinkage ptl
|
|
WHERE ptl.photo_id = ?
|
|
ORDER BY ptl.created_date
|
|
''', (photo_id,))
|
|
return [row[0] for row in cursor.fetchall()]
|
|
|
|
def get_tag_id_by_name(self, tag_name: str, tag_name_to_id_map: Dict[str, int]) -> Optional[int]:
|
|
"""Get tag ID by name, creating the tag if it doesn't exist"""
|
|
if tag_name in tag_name_to_id_map:
|
|
return tag_name_to_id_map[tag_name]
|
|
return None
|
|
|
|
def get_tag_name_by_id(self, tag_id: int, tag_id_to_name_map: Dict[int, str]) -> str:
|
|
"""Get tag name by ID"""
|
|
return tag_id_to_name_map.get(tag_id, f"Unknown Tag {tag_id}")
|
|
|
|
def show_people_list(self, cursor=None) -> List[Tuple]:
|
|
"""Show list of people in database"""
|
|
if cursor is None:
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, first_name, last_name, middle_name, maiden_name, date_of_birth, created_date
|
|
FROM people
|
|
ORDER BY last_name, first_name
|
|
''')
|
|
return cursor.fetchall()
|
|
|
|
def add_photo(self, photo_path: str, filename: str, date_taken: Optional[str] = None) -> int:
|
|
"""Add a photo to the database and return its ID if new, None if already exists"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Check if photo already exists
|
|
cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,))
|
|
existing = cursor.fetchone()
|
|
|
|
if existing:
|
|
# Photo already exists, return None to indicate it wasn't added
|
|
return None
|
|
|
|
# Photo doesn't exist, insert it
|
|
cursor.execute('''
|
|
INSERT INTO photos (path, filename, date_taken)
|
|
VALUES (?, ?, ?)
|
|
''', (photo_path, filename, date_taken))
|
|
|
|
# Get the new photo ID
|
|
cursor.execute('SELECT id FROM photos WHERE path = ?', (photo_path,))
|
|
result = cursor.fetchone()
|
|
return result[0] if result else None
|
|
|
|
def mark_photo_processed(self, photo_id: int):
|
|
"""Mark a photo as processed"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
|
|
|
|
def add_face(self, photo_id: int, encoding: bytes, location: str, confidence: float = 0.0,
|
|
quality_score: float = 0.0, person_id: Optional[int] = None) -> int:
|
|
"""Add a face to the database and return its ID"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
INSERT INTO faces (photo_id, person_id, encoding, location, confidence, quality_score)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
''', (photo_id, person_id, encoding, location, confidence, quality_score))
|
|
return cursor.lastrowid
|
|
|
|
def update_face_person(self, face_id: int, person_id: Optional[int]):
|
|
"""Update the person_id for a face"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('UPDATE faces SET person_id = ? WHERE id = ?', (person_id, face_id))
|
|
|
|
def add_person(self, first_name: str, last_name: str, middle_name: str = None,
|
|
maiden_name: str = None, date_of_birth: str = None) -> int:
|
|
"""Add a person to the database and return their ID"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO people (first_name, last_name, middle_name, maiden_name, date_of_birth)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
''', (first_name, last_name, middle_name, maiden_name, date_of_birth))
|
|
|
|
# Get the person ID
|
|
cursor.execute('''
|
|
SELECT id FROM people
|
|
WHERE first_name = ? AND last_name = ? AND middle_name = ?
|
|
AND maiden_name = ? AND date_of_birth = ?
|
|
''', (first_name, last_name, middle_name, maiden_name, date_of_birth))
|
|
result = cursor.fetchone()
|
|
return result[0] if result else None
|
|
|
|
def add_tag(self, tag_name: str) -> int:
|
|
"""Add a tag to the database and return its ID (case-insensitive)"""
|
|
# Normalize tag name to lowercase for consistency
|
|
normalized_tag_name = tag_name.lower().strip()
|
|
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
# Check if tag already exists (case-insensitive)
|
|
cursor.execute('SELECT id FROM tags WHERE LOWER(tag_name) = ?', (normalized_tag_name,))
|
|
existing = cursor.fetchone()
|
|
if existing:
|
|
return existing[0]
|
|
|
|
# Insert new tag with original case
|
|
cursor.execute('INSERT INTO tags (tag_name) VALUES (?)', (tag_name.strip(),))
|
|
|
|
# Get the tag ID
|
|
cursor.execute('SELECT id FROM tags WHERE LOWER(tag_name) = ?', (normalized_tag_name,))
|
|
result = cursor.fetchone()
|
|
return result[0] if result else None
|
|
|
|
def link_photo_tag(self, photo_id: int, tag_id: int):
|
|
"""Link a photo to a tag"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
INSERT OR IGNORE INTO phototaglinkage (photo_id, tag_id)
|
|
VALUES (?, ?)
|
|
''', (photo_id, tag_id))
|
|
|
|
def unlink_photo_tag(self, photo_id: int, tag_id: int):
|
|
"""Unlink a photo from a tag"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
DELETE FROM phototaglinkage
|
|
WHERE photo_id = ? AND tag_id = ?
|
|
''', (photo_id, tag_id))
|
|
|
|
def get_photos_by_pattern(self, pattern: str = None, limit: int = 10) -> List[Tuple]:
|
|
"""Get photos matching a pattern"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
if pattern:
|
|
cursor.execute('''
|
|
SELECT id, path, filename, date_taken, processed
|
|
FROM photos
|
|
WHERE filename LIKE ? OR path LIKE ?
|
|
ORDER BY date_added DESC
|
|
LIMIT ?
|
|
''', (f'%{pattern}%', f'%{pattern}%', limit))
|
|
else:
|
|
cursor.execute('''
|
|
SELECT id, path, filename, date_taken, processed
|
|
FROM photos
|
|
ORDER BY date_added DESC
|
|
LIMIT ?
|
|
''', (limit,))
|
|
return cursor.fetchall()
|
|
|
|
def get_unprocessed_photos(self, limit: int = 50) -> List[Tuple]:
|
|
"""Get unprocessed photos"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
SELECT id, path, filename, date_taken
|
|
FROM photos
|
|
WHERE processed = 0
|
|
ORDER BY date_added ASC
|
|
LIMIT ?
|
|
''', (limit,))
|
|
return cursor.fetchall()
|
|
|
|
def get_unidentified_faces(self, limit: int = 20) -> List[Tuple]:
|
|
"""Get unidentified faces"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
SELECT f.id, f.photo_id, f.location, f.confidence, f.quality_score,
|
|
p.path, p.filename
|
|
FROM faces f
|
|
JOIN photos p ON f.photo_id = p.id
|
|
WHERE f.person_id IS NULL
|
|
ORDER BY f.quality_score DESC, f.confidence DESC
|
|
LIMIT ?
|
|
''', (limit,))
|
|
return cursor.fetchall()
|
|
|
|
def get_face_encodings(self, face_id: int) -> Optional[bytes]:
|
|
"""Get face encoding for a specific face"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT encoding FROM faces WHERE id = ?', (face_id,))
|
|
result = cursor.fetchone()
|
|
return result[0] if result else None
|
|
|
|
def get_face_photo_info(self, face_id: int) -> Optional[Tuple]:
|
|
"""Get photo information for a specific face"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
SELECT f.photo_id, p.filename, f.location
|
|
FROM faces f
|
|
JOIN photos p ON f.photo_id = p.id
|
|
WHERE f.id = ?
|
|
''', (face_id,))
|
|
result = cursor.fetchone()
|
|
return result if result else None
|
|
|
|
def get_all_face_encodings(self) -> List[Tuple]:
|
|
"""Get all face encodings with their IDs"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT id, encoding, person_id, quality_score FROM faces')
|
|
return cursor.fetchall()
|
|
|
|
def get_person_encodings(self, person_id: int, min_quality: float = 0.3) -> List[Tuple]:
|
|
"""Get all encodings for a person above minimum quality"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
SELECT pe.encoding, pe.quality_score, pe.face_id
|
|
FROM person_encodings pe
|
|
WHERE pe.person_id = ? AND pe.quality_score >= ?
|
|
ORDER BY pe.quality_score DESC
|
|
''', (person_id, min_quality))
|
|
return cursor.fetchall()
|
|
|
|
def add_person_encoding(self, person_id: int, face_id: int, encoding: bytes, quality_score: float):
|
|
"""Add a person encoding"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
INSERT INTO person_encodings (person_id, face_id, encoding, quality_score)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (person_id, face_id, encoding, quality_score))
|
|
|
|
def update_person_encodings(self, person_id: int):
|
|
"""Update person encodings by removing old ones and adding current face encodings"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Remove old encodings
|
|
cursor.execute('DELETE FROM person_encodings WHERE person_id = ?', (person_id,))
|
|
|
|
# Add current face encodings
|
|
cursor.execute('''
|
|
INSERT INTO person_encodings (person_id, face_id, encoding, quality_score)
|
|
SELECT ?, id, encoding, quality_score
|
|
FROM faces
|
|
WHERE person_id = ? AND quality_score >= 0.3
|
|
''', (person_id, person_id))
|
|
|
|
def get_similar_faces(self, face_id: int, tolerance: float = 0.6,
|
|
include_same_photo: bool = False) -> List[Dict]:
|
|
"""Get faces similar to the given face ID"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get the target face encoding and photo
|
|
cursor.execute('''
|
|
SELECT f.encoding, f.photo_id, p.path, p.filename
|
|
FROM faces f
|
|
JOIN photos p ON f.photo_id = p.id
|
|
WHERE f.id = ?
|
|
''', (face_id,))
|
|
target_result = cursor.fetchone()
|
|
|
|
if not target_result:
|
|
return []
|
|
|
|
target_encoding = target_result[0]
|
|
target_photo_id = target_result[1]
|
|
target_path = target_result[2]
|
|
target_filename = target_result[3]
|
|
|
|
# Get all other faces
|
|
if include_same_photo:
|
|
cursor.execute('''
|
|
SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence,
|
|
p.path, p.filename, f.photo_id
|
|
FROM faces f
|
|
JOIN photos p ON f.photo_id = p.id
|
|
WHERE f.id != ?
|
|
''', (face_id,))
|
|
else:
|
|
cursor.execute('''
|
|
SELECT f.id, f.encoding, f.person_id, f.quality_score, f.confidence,
|
|
p.path, p.filename, f.photo_id
|
|
FROM faces f
|
|
JOIN photos p ON f.photo_id = p.id
|
|
WHERE f.id != ? AND f.photo_id != ?
|
|
''', (face_id, target_photo_id))
|
|
|
|
return cursor.fetchall()
|
|
|
|
def get_statistics(self) -> Dict:
|
|
"""Get database statistics"""
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
stats = {}
|
|
|
|
# Photo statistics
|
|
cursor.execute('SELECT COUNT(*) FROM photos')
|
|
stats['total_photos'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM photos WHERE processed = 1')
|
|
stats['processed_photos'] = cursor.fetchone()[0]
|
|
|
|
# Face statistics
|
|
cursor.execute('SELECT COUNT(*) FROM faces')
|
|
stats['total_faces'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL')
|
|
stats['identified_faces'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NULL')
|
|
stats['unidentified_faces'] = cursor.fetchone()[0]
|
|
|
|
# People statistics
|
|
cursor.execute('SELECT COUNT(*) FROM people')
|
|
stats['total_people'] = cursor.fetchone()[0]
|
|
|
|
# Tag statistics
|
|
cursor.execute('SELECT COUNT(*) FROM tags')
|
|
stats['total_tags'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM phototaglinkage')
|
|
stats['total_photo_tags'] = cursor.fetchone()[0]
|
|
|
|
return stats
|