punimtag/photo_tagger.py

2659 lines
122 KiB
Python

#!/usr/bin/env python3
"""
PunimTag CLI - Minimal Photo Face Tagger
Simple command-line tool for face recognition and photo tagging
"""
import os
import sqlite3
import argparse
import face_recognition
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
import pickle
import numpy as np
from typing import List, Dict, Tuple, Optional
import sys
import tempfile
import subprocess
import threading
import time
from functools import lru_cache
from contextlib import contextmanager
class PhotoTagger:
def __init__(self, db_path: str = "data/photos.db", verbose: int = 0, debug: bool = False):
"""Initialize the photo tagger with database"""
self.db_path = db_path
self.verbose = verbose
self.debug = debug
self._face_encoding_cache = {}
self._image_cache = {}
self._db_connection = None
self._db_lock = threading.Lock()
self.init_database()
@contextmanager
def get_db_connection(self):
"""Context manager for database connections with connection pooling"""
with self._db_lock:
if self._db_connection is None:
self._db_connection = sqlite3.connect(self.db_path)
self._db_connection.row_factory = sqlite3.Row
try:
yield self._db_connection
except Exception:
self._db_connection.rollback()
raise
else:
self._db_connection.commit()
def close_db_connection(self):
"""Close database connection"""
with self._db_lock:
if self._db_connection:
self._db_connection.close()
self._db_connection = None
@lru_cache(maxsize=1000)
def _get_cached_face_encoding(self, face_id: int, encoding_bytes: bytes) -> np.ndarray:
"""Cache face encodings to avoid repeated numpy conversions"""
return np.frombuffer(encoding_bytes, dtype=np.float64)
def _clear_caches(self):
"""Clear all caches to free memory"""
self._face_encoding_cache.clear()
self._image_cache.clear()
self._get_cached_face_encoding.cache_clear()
def cleanup(self):
"""Clean up resources and close connections"""
self._clear_caches()
self.close_db_connection()
def _cleanup_face_crops(self, current_face_crop_path=None):
"""Clean up face crop files and caches"""
# Clean up current face crop if provided
if current_face_crop_path and os.path.exists(current_face_crop_path):
try:
os.remove(current_face_crop_path)
except:
pass # Ignore cleanup errors
# Clean up all cached face crop files
for cache_key, cached_path in list(self._image_cache.items()):
if os.path.exists(cached_path):
try:
os.remove(cached_path)
except:
pass # Ignore cleanup errors
# Clear caches
self._clear_caches()
def _setup_window_size_saving(self, root, config_file="gui_config.json"):
"""Set up window size saving functionality"""
import json
import tkinter as tk
# Load saved window size
default_size = "600x500"
saved_size = default_size
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
config = json.load(f)
saved_size = config.get('window_size', default_size)
except:
saved_size = default_size
# Calculate center position before showing window
try:
width = int(saved_size.split('x')[0])
height = int(saved_size.split('x')[1])
x = (root.winfo_screenwidth() // 2) - (width // 2)
y = (root.winfo_screenheight() // 2) - (height // 2)
root.geometry(f"{saved_size}+{x}+{y}")
except tk.TclError:
# Fallback to default geometry if positioning fails
root.geometry(saved_size)
# Track previous size to detect actual resizing
last_size = None
def save_window_size(event=None):
nonlocal last_size
if event and event.widget == root:
current_size = f"{root.winfo_width()}x{root.winfo_height()}"
# Only save if size actually changed
if current_size != last_size:
last_size = current_size
try:
config = {'window_size': current_size}
with open(config_file, 'w') as f:
json.dump(config, f)
except:
pass # Ignore save errors
# Bind resize event
root.bind('<Configure>', save_window_size)
return saved_size
def init_database(self):
"""Create database tables if they don't exist"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Photos table
cursor.execute('''
CREATE TABLE IF NOT EXISTS photos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT UNIQUE NOT NULL,
filename TEXT NOT NULL,
date_added DATETIME DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT 0
)
''')
# People table
cursor.execute('''
CREATE TABLE IF NOT EXISTS people (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Faces table
cursor.execute('''
CREATE TABLE IF NOT EXISTS faces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
photo_id INTEGER NOT NULL,
person_id INTEGER,
encoding BLOB NOT NULL,
location TEXT NOT NULL,
confidence REAL DEFAULT 0.0,
quality_score REAL DEFAULT 0.0,
is_primary_encoding BOOLEAN DEFAULT 0,
FOREIGN KEY (photo_id) REFERENCES photos (id),
FOREIGN KEY (person_id) REFERENCES people (id)
)
''')
# Person encodings table for multiple encodings per person
cursor.execute('''
CREATE TABLE IF NOT EXISTS person_encodings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id INTEGER NOT NULL,
face_id INTEGER NOT NULL,
encoding BLOB NOT NULL,
quality_score REAL DEFAULT 0.0,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (person_id) REFERENCES people (id),
FOREIGN KEY (face_id) REFERENCES faces (id)
)
''')
# Tags table
cursor.execute('''
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
photo_id INTEGER NOT NULL,
tag_name TEXT NOT NULL,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (photo_id) REFERENCES photos (id)
)
''')
# Add indexes for better performance
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_person_id ON faces(person_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_photo_id ON faces(photo_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_processed ON photos(processed)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_quality ON faces(quality_score)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_person_id ON person_encodings(person_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_quality ON person_encodings(quality_score)')
if self.verbose >= 1:
print(f"✅ Database initialized: {self.db_path}")
def scan_folder(self, folder_path: str, recursive: bool = True) -> int:
"""Scan folder for photos and add to database"""
# BREAKPOINT: Set breakpoint here for debugging
if not os.path.exists(folder_path):
print(f"❌ Folder not found: {folder_path}")
return 0
photo_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff'}
found_photos = []
# BREAKPOINT: Set breakpoint here for debugging
if recursive:
for root, dirs, files in os.walk(folder_path):
for file in files:
file_ext = Path(file).suffix.lower()
if file_ext in photo_extensions:
photo_path = os.path.join(root, file)
found_photos.append((photo_path, file))
else:
for file in os.listdir(folder_path):
file_ext = Path(file).suffix.lower()
if file_ext in photo_extensions:
photo_path = os.path.join(folder_path, file)
found_photos.append((photo_path, file))
if not found_photos:
print(f"📁 No photos found in {folder_path}")
return 0
# Add to database
# BREAKPOINT: Set breakpoint here for debugging
with self.get_db_connection() as conn:
cursor = conn.cursor()
added_count = 0
for photo_path, filename in found_photos:
try:
cursor.execute(
'INSERT OR IGNORE INTO photos (path, filename) VALUES (?, ?)',
(photo_path, filename)
)
if cursor.rowcount > 0:
added_count += 1
if self.verbose >= 2:
print(f" 📸 Added: {filename}")
elif self.verbose >= 3:
print(f" 📸 Already exists: {filename}")
except Exception as e:
print(f"⚠️ Error adding {filename}: {e}")
print(f"📁 Found {len(found_photos)} photos, added {added_count} new photos")
return added_count
def process_faces(self, limit: int = 50, model: str = "hog") -> int:
"""Process unprocessed photos for faces"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
'SELECT id, path, filename FROM photos WHERE processed = 0 LIMIT ?',
(limit,)
)
unprocessed = cursor.fetchall()
if not unprocessed:
print("✅ No unprocessed photos found")
return 0
print(f"🔍 Processing {len(unprocessed)} photos for faces...")
processed_count = 0
for photo_id, photo_path, filename in unprocessed:
if not os.path.exists(photo_path):
print(f"❌ File not found: {filename}")
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
continue
try:
# Load image and find faces
if self.verbose >= 1:
print(f"📸 Processing: {filename}")
elif self.verbose == 0:
print(".", end="", flush=True)
if self.verbose >= 2:
print(f" 🔍 Loading image: {photo_path}")
image = face_recognition.load_image_file(photo_path)
face_locations = face_recognition.face_locations(image, model=model)
if face_locations:
face_encodings = face_recognition.face_encodings(image, face_locations)
if self.verbose >= 1:
print(f" 👤 Found {len(face_locations)} faces")
# Save faces to database with quality scores
for i, (encoding, location) in enumerate(zip(face_encodings, face_locations)):
# Calculate face quality score
quality_score = self._calculate_face_quality_score(image, location)
cursor.execute(
'INSERT INTO faces (photo_id, encoding, location, quality_score) VALUES (?, ?, ?, ?)',
(photo_id, encoding.tobytes(), str(location), quality_score)
)
if self.verbose >= 3:
print(f" Face {i+1}: {location} (quality: {quality_score:.2f})")
else:
if self.verbose >= 1:
print(f" 👤 No faces found")
elif self.verbose >= 2:
print(f" 👤 {filename}: No faces found")
# Mark as processed
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
processed_count += 1
except Exception as e:
print(f"❌ Error processing {filename}: {e}")
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
if self.verbose == 0:
print() # New line after dots
print(f"✅ Processed {processed_count} photos")
return processed_count
def identify_faces(self, batch_size: int = 20, show_faces: bool = False, tolerance: float = 0.6) -> int:
"""Interactive face identification with optimized performance"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT f.id, f.photo_id, p.path, p.filename, f.location
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.person_id IS NULL
LIMIT ?
''', (batch_size,))
unidentified = cursor.fetchall()
if not unidentified:
print("🎉 All faces have been identified!")
return 0
print(f"\n👤 Found {len(unidentified)} unidentified faces")
print("Commands: [name] = identify, 's' = skip, 'q' = quit, 'list' = show people\n")
# Pre-fetch all needed data to avoid repeated database queries
print("📊 Pre-fetching data for optimal performance...")
identify_data_cache = {}
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Pre-fetch all photo paths for unidentified faces
photo_ids = [face[1] for face in unidentified] # face[1] is photo_id
if photo_ids:
placeholders = ','.join('?' * len(photo_ids))
cursor.execute(f'SELECT id, path, filename FROM photos WHERE id IN ({placeholders})', photo_ids)
identify_data_cache['photo_paths'] = {row[0]: {'path': row[1], 'filename': row[2]} for row in cursor.fetchall()}
# Pre-fetch all people names for dropdown
cursor.execute('SELECT name FROM people ORDER BY name')
people = cursor.fetchall()
identify_data_cache['people_names'] = [name[0] for name in people]
print(f"✅ Pre-fetched {len(identify_data_cache.get('photo_paths', {}))} photo paths and {len(identify_data_cache.get('people_names', []))} people names")
identified_count = 0
# Use integrated GUI with image and input
import tkinter as tk
from tkinter import ttk, messagebox
from PIL import Image, ImageTk
import json
import os
# Create the main window once
root = tk.Tk()
root.title("Face Identification")
root.resizable(True, True)
# Track window state to prevent multiple destroy calls
window_destroyed = False
force_exit = False
# Track current face crop path for cleanup
current_face_crop_path = None
# Hide window initially to prevent flash at corner
root.withdraw()
def save_all_pending_identifications():
"""Save all pending identifications from face_person_names"""
nonlocal identified_count
saved_count = 0
for face_id, person_name in face_person_names.items():
if person_name.strip():
try:
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Add person if doesn't exist
cursor.execute('INSERT OR IGNORE INTO people (name) VALUES (?)', (person_name,))
cursor.execute('SELECT id FROM people WHERE name = ?', (person_name,))
result = cursor.fetchone()
person_id = result[0] if result else None
# Update people cache if new person was added
if person_name not in identify_data_cache['people_names']:
identify_data_cache['people_names'].append(person_name)
identify_data_cache['people_names'].sort() # Keep sorted
# Assign face to person
cursor.execute(
'UPDATE faces SET person_id = ? WHERE id = ?',
(person_id, face_id)
)
# Update person encodings
self._update_person_encodings(person_id)
saved_count += 1
print(f"✅ Saved identification: {person_name}")
except Exception as e:
print(f"❌ Error saving identification for {person_name}: {e}")
if saved_count > 0:
identified_count += saved_count
print(f"💾 Saved {saved_count} pending identifications")
return saved_count
# Set up protocol handler for window close button (X)
def on_closing():
nonlocal command, waiting_for_input, window_destroyed, current_face_crop_path, force_exit
# First check for selected similar faces without person name
if not validate_navigation():
return # Cancel close
# Check if there are pending identifications
pending_identifications = {k: v for k, v in face_person_names.items() if v.strip()}
if pending_identifications:
# Ask user if they want to save pending identifications
result = messagebox.askyesnocancel(
"Save Pending Identifications?",
f"You have {len(pending_identifications)} pending identifications.\n\n"
"Do you want to save them before closing?\n\n"
"• Yes: Save all pending identifications and close\n"
"• No: Close without saving\n"
"• Cancel: Return to identification"
)
if result is True: # Yes - Save and close
save_all_pending_identifications()
command = 'q'
waiting_for_input = False
elif result is False: # No - Close without saving
command = 'q'
waiting_for_input = False
else: # Cancel - Don't close
return
# Clean up face crops and caches
self._cleanup_face_crops(current_face_crop_path)
self.close_db_connection()
if not window_destroyed:
window_destroyed = True
try:
root.destroy()
except tk.TclError:
pass # Window already destroyed
# Force process termination
force_exit = True
root.quit()
root.protocol("WM_DELETE_WINDOW", on_closing)
# Set up window size saving
saved_size = self._setup_window_size_saving(root)
# Create main frame
main_frame = ttk.Frame(root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# Configure grid weights
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1) # Left panel
main_frame.columnconfigure(1, weight=1) # Right panel for similar faces
# Photo info
info_label = ttk.Label(main_frame, text="", font=("Arial", 10, "bold"))
info_label.grid(row=0, column=0, columnspan=2, pady=(0, 10), sticky=tk.W)
# Left panel for main face
left_panel = ttk.Frame(main_frame)
left_panel.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 5))
left_panel.columnconfigure(0, weight=1)
# Right panel for similar faces
right_panel = ttk.LabelFrame(main_frame, text="Similar Faces", padding="5")
right_panel.grid(row=1, column=1, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 0))
right_panel.columnconfigure(0, weight=1)
right_panel.rowconfigure(0, weight=1) # Make right panel expandable vertically
# Image display (left panel)
image_frame = ttk.Frame(left_panel)
image_frame.grid(row=0, column=0, pady=(0, 10), sticky=(tk.W, tk.E, tk.N, tk.S))
# Create canvas for image display
canvas = tk.Canvas(image_frame, width=400, height=400, bg='white')
canvas.grid(row=0, column=0)
# Input section (left panel)
input_frame = ttk.LabelFrame(left_panel, text="Person Identification", padding="10")
input_frame.grid(row=1, column=0, pady=(10, 0), sticky=(tk.W, tk.E))
input_frame.columnconfigure(1, weight=1)
# Person name input with dropdown
ttk.Label(input_frame, text="Person name:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10))
name_var = tk.StringVar()
name_combo = ttk.Combobox(input_frame, textvariable=name_var, width=27, state="normal")
name_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 10))
# Define update_similar_faces function first - reusing auto-match display logic
def update_similar_faces():
"""Update the similar faces panel when compare is enabled - reuses auto-match display logic"""
nonlocal similar_faces_data, similar_face_vars, similar_face_images, similar_face_crops, face_selection_states
# Note: Selection states are now saved automatically via callbacks (auto-match style)
# Clear existing similar faces
for widget in similar_scrollable_frame.winfo_children():
widget.destroy()
similar_face_vars.clear()
similar_face_images.clear()
# Clean up existing face crops
for crop_path in similar_face_crops:
try:
if os.path.exists(crop_path):
os.remove(crop_path)
except:
pass
similar_face_crops.clear()
if compare_var.get():
# Use the same filtering and sorting logic as auto-match
unidentified_similar_faces = self._get_filtered_similar_faces(face_id, tolerance, include_same_photo=False, face_status=face_status)
if unidentified_similar_faces:
# Get current face_id for selection state management
current_face_id = original_faces[i][0] # Get current face_id
# Reuse auto-match display logic for similar faces
self._display_similar_faces_in_panel(similar_scrollable_frame, unidentified_similar_faces,
similar_face_vars, similar_face_images, similar_face_crops,
current_face_id, face_selection_states, identify_data_cache)
# Note: Selection states are now restored automatically during checkbox creation (auto-match style)
else:
# No similar unidentified faces found
no_faces_label = ttk.Label(similar_scrollable_frame, text="No similar unidentified faces found",
foreground="gray", font=("Arial", 10))
no_faces_label.pack(pady=20)
else:
# Compare disabled - clear the panel
clear_label = ttk.Label(similar_scrollable_frame, text="Enable 'Compare with similar faces' to see matches",
foreground="gray", font=("Arial", 10))
clear_label.pack(pady=20)
# Update button states based on compare checkbox
update_select_clear_buttons_state()
# Compare checkbox
compare_var = tk.BooleanVar()
def on_compare_change():
"""Handle compare checkbox change"""
update_similar_faces()
update_select_clear_buttons_state()
compare_checkbox = ttk.Checkbutton(input_frame, text="Compare with similar faces", variable=compare_var,
command=on_compare_change)
compare_checkbox.grid(row=1, column=0, columnspan=2, sticky=tk.W, pady=(5, 0))
# Add callback to save person name when it changes
def on_name_change(*args):
if i < len(original_faces):
current_face_id = original_faces[i][0]
current_name = name_var.get().strip()
if current_name:
face_person_names[current_face_id] = current_name
elif current_face_id in face_person_names:
# Remove empty names from storage
del face_person_names[current_face_id]
name_var.trace('w', on_name_change)
# Buttons
button_frame = ttk.Frame(input_frame)
button_frame.grid(row=2, column=0, columnspan=2, pady=(10, 0))
# Instructions
instructions = ttk.Label(input_frame, text="Select from dropdown or type new name", foreground="gray")
instructions.grid(row=3, column=0, columnspan=2, pady=(10, 0))
# Right panel for similar faces
similar_faces_frame = ttk.Frame(right_panel)
similar_faces_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
similar_faces_frame.columnconfigure(0, weight=1)
similar_faces_frame.rowconfigure(0, weight=0) # Controls row takes minimal space
similar_faces_frame.rowconfigure(1, weight=1) # Canvas row expandable
# Control buttons for similar faces (Select All / Clear All)
similar_controls_frame = ttk.Frame(similar_faces_frame)
similar_controls_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=5, pady=(5, 0))
def select_all_similar_faces():
"""Select all similar faces checkboxes"""
for face_id, var in similar_face_vars:
var.set(True)
def clear_all_similar_faces():
"""Clear all similar faces checkboxes"""
for face_id, var in similar_face_vars:
var.set(False)
select_all_btn = ttk.Button(similar_controls_frame, text="☑️ Select All", command=select_all_similar_faces, state='disabled')
select_all_btn.pack(side=tk.LEFT, padx=(0, 5))
clear_all_btn = ttk.Button(similar_controls_frame, text="☐ Clear All", command=clear_all_similar_faces, state='disabled')
clear_all_btn.pack(side=tk.LEFT)
def update_select_clear_buttons_state():
"""Enable/disable Select All and Clear All buttons based on compare checkbox state"""
if compare_var.get():
select_all_btn.config(state='normal')
clear_all_btn.config(state='normal')
else:
select_all_btn.config(state='disabled')
clear_all_btn.config(state='disabled')
# Create canvas for similar faces with scrollbar
similar_canvas = tk.Canvas(similar_faces_frame, bg='white')
similar_scrollbar = ttk.Scrollbar(similar_faces_frame, orient="vertical", command=similar_canvas.yview)
similar_scrollable_frame = ttk.Frame(similar_canvas)
similar_scrollable_frame.bind(
"<Configure>",
lambda e: similar_canvas.configure(scrollregion=similar_canvas.bbox("all"))
)
similar_canvas.create_window((0, 0), window=similar_scrollable_frame, anchor="nw")
similar_canvas.configure(yscrollcommand=similar_scrollbar.set)
# Pack canvas and scrollbar
similar_canvas.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
similar_scrollbar.grid(row=0, column=1, rowspan=2, sticky=(tk.N, tk.S))
# Variables for similar faces
similar_faces_data = []
similar_face_vars = []
similar_face_images = []
similar_face_crops = []
# Store face selection states per face ID to preserve selections during navigation (auto-match style)
face_selection_states = {} # {face_id: {unique_key: bool}}
# Store person names per face ID to preserve names during navigation
face_person_names = {} # {face_id: person_name}
def save_current_face_selection_states():
"""Save current checkbox states and person name for the current face (auto-match style backup)"""
if i < len(original_faces):
current_face_id = original_faces[i][0]
# Save checkbox states
if similar_face_vars:
if current_face_id not in face_selection_states:
face_selection_states[current_face_id] = {}
# Save current checkbox states using unique keys
for similar_face_id, var in similar_face_vars:
unique_key = f"{current_face_id}_{similar_face_id}"
face_selection_states[current_face_id][unique_key] = var.get()
# Save person name
current_name = name_var.get().strip()
if current_name:
face_person_names[current_face_id] = current_name
# Button commands
command = None
waiting_for_input = False
def on_identify():
nonlocal command, waiting_for_input
command = name_var.get().strip()
compare_enabled = compare_var.get()
if not command.strip():
print("⚠️ Please enter a person name before identifying")
return
if compare_enabled:
# Get selected similar faces
selected_face_ids = [face_id for face_id, var in similar_face_vars if var.get()]
if selected_face_ids:
# Create compare command with selected face IDs
command = f"compare:{command}:{','.join(map(str, selected_face_ids))}"
# If no similar faces selected, just identify the current face
else:
# Regular identification
pass
waiting_for_input = False
def validate_navigation():
"""Check if navigation is allowed (no selected similar faces without person name)"""
# Check if compare is enabled and similar faces are selected
if compare_var.get() and similar_face_vars:
selected_faces = [face_id for face_id, var in similar_face_vars if var.get()]
if selected_faces and not name_var.get().strip():
# Show warning dialog
result = messagebox.askyesno(
"Selected Faces Not Identified",
f"You have {len(selected_faces)} similar face(s) selected but no person name entered.\n\n"
"These faces will not be identified if you continue.\n\n"
"Do you want to continue anyway?",
icon='warning'
)
return result # True = continue, False = cancel
return True # No validation issues, allow navigation
def on_back():
nonlocal command, waiting_for_input
if not validate_navigation():
return # Cancel navigation
command = 'back'
waiting_for_input = False
def on_skip():
nonlocal command, waiting_for_input
if not validate_navigation():
return # Cancel navigation
command = 's'
waiting_for_input = False
def on_quit():
nonlocal command, waiting_for_input, window_destroyed, force_exit
# First check for selected similar faces without person name
if not validate_navigation():
return # Cancel quit
# Check if there are pending identifications
pending_identifications = {k: v for k, v in face_person_names.items() if v.strip()}
if pending_identifications:
# Ask user if they want to save pending identifications
result = messagebox.askyesnocancel(
"Save Pending Identifications?",
f"You have {len(pending_identifications)} pending identifications.\n\n"
"Do you want to save them before quitting?\n\n"
"• Yes: Save all pending identifications and quit\n"
"• No: Quit without saving\n"
"• Cancel: Return to identification"
)
if result is True: # Yes - Save and quit
save_all_pending_identifications()
command = 'q'
waiting_for_input = False
elif result is False: # No - Quit without saving
command = 'q'
waiting_for_input = False
else: # Cancel - Don't quit
return
else:
# No pending identifications, quit normally
command = 'q'
waiting_for_input = False
if not window_destroyed:
window_destroyed = True
try:
root.destroy()
except tk.TclError:
pass # Window already destroyed
# Force process termination
force_exit = True
root.quit()
def update_people_dropdown():
"""Update the dropdown with current people names"""
# Use cached people names instead of database query
if 'people_names' in identify_data_cache:
name_combo['values'] = identify_data_cache['people_names']
else:
# Fallback to database if cache not available
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT name FROM people ORDER BY name')
people = cursor.fetchall()
people_names = [name[0] for name in people]
name_combo['values'] = people_names
def update_button_states():
"""Update button states based on current position and unidentified faces"""
# Check if there are previous unidentified faces
has_prev_unidentified = False
for j in range(i - 1, -1, -1):
if original_faces[j][0] not in face_status or face_status[original_faces[j][0]] != 'identified':
has_prev_unidentified = True
break
# Check if there are next unidentified faces
has_next_unidentified = False
for j in range(i + 1, len(original_faces)):
if original_faces[j][0] not in face_status or face_status[original_faces[j][0]] != 'identified':
has_next_unidentified = True
break
# Enable/disable Back button
if has_prev_unidentified:
back_btn.config(state='normal')
else:
back_btn.config(state='disabled')
# Enable/disable Next button
if has_next_unidentified:
next_btn.config(state='normal')
else:
next_btn.config(state='disabled')
# Create button references for state management
identify_btn = ttk.Button(button_frame, text="✅ Identify", command=on_identify, state='disabled')
back_btn = ttk.Button(button_frame, text="⬅️ Back", command=on_back)
next_btn = ttk.Button(button_frame, text="➡️ Next", command=on_skip)
quit_btn = ttk.Button(button_frame, text="❌ Quit", command=on_quit)
identify_btn.grid(row=0, column=0, padx=(0, 5))
back_btn.grid(row=0, column=1, padx=5)
next_btn.grid(row=0, column=2, padx=5)
quit_btn.grid(row=0, column=3, padx=(5, 0))
def update_identify_button_state():
"""Enable/disable identify button based on name input"""
if name_var.get().strip():
identify_btn.config(state='normal')
else:
identify_btn.config(state='disabled')
# Bind name input changes to update button state
name_var.trace('w', lambda *args: update_identify_button_state())
# Handle Enter key
def on_enter(event):
on_identify()
name_combo.bind('<Return>', on_enter)
# Show the window
try:
root.deiconify()
root.lift()
root.focus_force()
except tk.TclError:
# Window was destroyed before we could show it
conn.close()
return 0
# Initialize the people dropdown
update_people_dropdown()
# Process each face with back navigation support
# Keep track of original face list and current position
original_faces = list(unidentified) # Make a copy of the original list
i = 0
face_status = {} # Track which faces have been identified
def get_unidentified_faces():
"""Get list of faces that haven't been identified yet"""
return [face for face in original_faces if face[0] not in face_status or face_status[face[0]] != 'identified']
def get_current_face_position():
"""Get current face position among unidentified faces"""
unidentified_faces = get_unidentified_faces()
current_face_id = original_faces[i][0] if i < len(original_faces) else None
# Find position of current face in unidentified list
for pos, face in enumerate(unidentified_faces):
if face[0] == current_face_id:
return pos + 1, len(unidentified_faces)
return 1, len(unidentified_faces) # Fallback
def update_current_face_index():
"""Update the current face index to point to a valid unidentified face"""
nonlocal i
unidentified_faces = get_unidentified_faces()
if not unidentified_faces:
# All faces identified, we're done
return False
# Find the current face in the unidentified list
current_face_id = original_faces[i][0] if i < len(original_faces) else None
if current_face_id and current_face_id in face_status and face_status[current_face_id] == 'identified':
# Current face was just identified, find the next unidentified face
if i < len(original_faces) - 1:
# Try to find the next unidentified face
for j in range(i + 1, len(original_faces)):
if original_faces[j][0] not in face_status or face_status[original_faces[j][0]] != 'identified':
i = j
break
else:
# No more faces after current, go to previous
for j in range(i - 1, -1, -1):
if original_faces[j][0] not in face_status or face_status[original_faces[j][0]] != 'identified':
i = j
break
else:
# At the end, go to previous unidentified face
for j in range(i - 1, -1, -1):
if original_faces[j][0] not in face_status or face_status[original_faces[j][0]] != 'identified':
i = j
break
# Ensure index is within bounds
if i >= len(original_faces):
i = len(original_faces) - 1
if i < 0:
i = 0
return True
while not window_destroyed:
# Check if current face is identified and update index if needed
if not update_current_face_index():
# All faces have been identified
print("\n🎉 All faces have been identified!")
break
# Ensure we don't go beyond the bounds
if i >= len(original_faces):
# Stay on the last face instead of breaking
i = len(original_faces) - 1
face_id, photo_id, photo_path, filename, location = original_faces[i]
# Check if this face was already identified in this session
is_already_identified = face_id in face_status and face_status[face_id] == 'identified'
# Reset command and waiting state for each face
command = None
waiting_for_input = True
# Update the display
current_pos, total_unidentified = get_current_face_position()
print(f"\n--- Face {current_pos}/{total_unidentified} (unidentified) ---")
print(f"📁 Photo: {filename}")
print(f"📍 Face location: {location}")
# Update title
root.title(f"Face Identification - {current_pos}/{total_unidentified} (unidentified)")
# Update button states
update_button_states()
# Update similar faces panel
update_similar_faces()
# Update photo info
if is_already_identified:
# Get the person name for this face
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT p.name FROM people p
JOIN faces f ON p.id = f.person_id
WHERE f.id = ?
''', (face_id,))
result = cursor.fetchone()
person_name = result[0] if result else "Unknown"
info_label.config(text=f"📁 Photo: {filename} (Face {current_pos}/{total_unidentified}) - ✅ Already identified as: {person_name}")
print(f"✅ Already identified as: {person_name}")
else:
info_label.config(text=f"📁 Photo: {filename} (Face {current_pos}/{total_unidentified})")
# Extract face crop if enabled
face_crop_path = None
if show_faces:
face_crop_path = self._extract_face_crop(photo_path, location, face_id)
if face_crop_path:
print(f"🖼️ Face crop saved: {face_crop_path}")
current_face_crop_path = face_crop_path # Track for cleanup
else:
print("💡 Use --show-faces flag to display individual face crops")
current_face_crop_path = None
print(f"\n🖼️ Viewing face {current_pos}/{total_unidentified} from {filename}")
# Clear and update image
canvas.delete("all")
if show_faces and face_crop_path and os.path.exists(face_crop_path):
try:
# Load and display the face crop image
pil_image = Image.open(face_crop_path)
# Resize image to fit in window (max 400x400)
pil_image.thumbnail((400, 400), Image.Resampling.LANCZOS)
photo = ImageTk.PhotoImage(pil_image)
# Update canvas
canvas.create_image(200, 200, image=photo)
# Keep a reference to prevent garbage collection
canvas.image = photo
except Exception as e:
canvas.create_text(200, 200, text=f"❌ Could not load image: {e}", fill="red")
else:
canvas.create_text(200, 200, text="🖼️ No face crop available", fill="gray")
# Set person name input - restore saved name or use database/empty value
if face_id in face_person_names:
# Restore previously entered name for this face
name_var.set(face_person_names[face_id])
elif is_already_identified:
# Pre-populate with the current person name from database
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT p.name FROM people p
JOIN faces f ON p.id = f.person_id
WHERE f.id = ?
''', (face_id,))
result = cursor.fetchone()
current_name = result[0] if result else ""
name_var.set(current_name)
else:
name_var.set("")
# Keep compare checkbox state persistent across navigation
name_combo.focus_set()
name_combo.icursor(0)
# Force GUI update before waiting for input
root.update_idletasks()
# Wait for user input
while waiting_for_input:
try:
root.update()
# Small delay to prevent excessive CPU usage
time.sleep(0.01)
except tk.TclError:
# Window was destroyed, break out of loop
break
# Check if force exit was requested
if force_exit:
break
# Check if force exit was requested (exit immediately)
if force_exit:
print("Force exit requested...")
# Clean up face crops and caches
self._cleanup_face_crops(face_crop_path)
self.close_db_connection()
return identified_count
# Process the command
if command is None: # User clicked Cancel
command = 'q'
else:
command = command.strip()
if command.lower() == 'q':
print("Quitting...")
# Clean up face crops and caches
self._cleanup_face_crops(face_crop_path)
self.close_db_connection()
if not window_destroyed:
window_destroyed = True
try:
root.destroy()
except tk.TclError:
pass # Window already destroyed
return identified_count
elif command.lower() == 's':
print("➡️ Next")
# Save current checkbox states before navigating away (auto-match style backup)
save_current_face_selection_states()
# Clean up current face crop when moving forward
if face_crop_path and os.path.exists(face_crop_path):
try:
os.remove(face_crop_path)
except:
pass # Ignore cleanup errors
current_face_crop_path = None # Clear tracked path
# Find next unidentified face
next_found = False
for j in range(i + 1, len(original_faces)):
if original_faces[j][0] not in face_status or face_status[original_faces[j][0]] != 'identified':
i = j
next_found = True
break
if not next_found:
print("⚠️ No more unidentified faces - Next button disabled")
continue
update_button_states()
update_similar_faces()
continue
elif command.lower() == 'back':
print("⬅️ Going back to previous face")
# Save current checkbox states before navigating away (auto-match style backup)
save_current_face_selection_states()
# Find previous unidentified face
prev_found = False
for j in range(i - 1, -1, -1):
if original_faces[j][0] not in face_status or face_status[original_faces[j][0]] != 'identified':
i = j
prev_found = True
break
if not prev_found:
print("⚠️ No more unidentified faces - Back button disabled")
continue
update_button_states()
update_similar_faces()
continue
elif command.lower() == 'list':
self._show_people_list()
continue
elif command:
try:
# Check if this is a compare command
if command.startswith('compare:'):
# Parse compare command: compare:person_name:face_id1,face_id2,face_id3
parts = command.split(':', 2)
if len(parts) == 3:
person_name = parts[1]
selected_face_ids = [int(fid.strip()) for fid in parts[2].split(',') if fid.strip()]
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Add person if doesn't exist
cursor.execute('INSERT OR IGNORE INTO people (name) VALUES (?)', (person_name,))
cursor.execute('SELECT id FROM people WHERE name = ?', (person_name,))
result = cursor.fetchone()
person_id = result[0] if result else None
# Update people cache if new person was added
if person_name not in identify_data_cache['people_names']:
identify_data_cache['people_names'].append(person_name)
identify_data_cache['people_names'].sort() # Keep sorted
# Identify all selected faces (including current face)
all_face_ids = [face_id] + selected_face_ids
for fid in all_face_ids:
cursor.execute(
'UPDATE faces SET person_id = ? WHERE id = ?',
(person_id, fid)
)
# Mark all faces as identified in our tracking
for fid in all_face_ids:
face_status[fid] = 'identified'
if is_already_identified:
print(f"✅ Re-identified current face and {len(selected_face_ids)} similar faces as: {person_name}")
else:
print(f"✅ Identified current face and {len(selected_face_ids)} similar faces as: {person_name}")
identified_count += 1 + len(selected_face_ids)
# Update the people dropdown to include the new person
update_people_dropdown()
# Update person encodings after database transaction is complete
self._update_person_encodings(person_id)
else:
print("❌ Invalid compare command format")
else:
# Regular identification
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Add person if doesn't exist
cursor.execute('INSERT OR IGNORE INTO people (name) VALUES (?)', (command,))
cursor.execute('SELECT id FROM people WHERE name = ?', (command,))
result = cursor.fetchone()
person_id = result[0] if result else None
# Update people cache if new person was added
if command not in identify_data_cache['people_names']:
identify_data_cache['people_names'].append(command)
identify_data_cache['people_names'].sort() # Keep sorted
# Assign face to person
cursor.execute(
'UPDATE faces SET person_id = ? WHERE id = ?',
(person_id, face_id)
)
if is_already_identified:
print(f"✅ Re-identified as: {command}")
else:
print(f"✅ Identified as: {command}")
identified_count += 1
# Mark this face as identified in our tracking
face_status[face_id] = 'identified'
# Update the people dropdown to include the new person
update_people_dropdown()
# Update person encodings after database transaction is complete
self._update_person_encodings(person_id)
except Exception as e:
print(f"❌ Error: {e}")
else:
print("Please enter a name, 's' to skip, 'q' to quit, or use buttons")
# Increment index for normal flow (identification or error) - but not if we're at the last item
if i < len(original_faces) - 1:
i += 1
update_button_states()
update_similar_faces()
# Clean up current face crop when moving forward after identification
if face_crop_path and os.path.exists(face_crop_path):
try:
os.remove(face_crop_path)
except:
pass # Ignore cleanup errors
current_face_crop_path = None # Clear tracked path
# Only close the window if user explicitly quit (not when reaching end of faces)
if not window_destroyed:
# Keep the window open - user can still navigate and quit manually
print(f"\n✅ Identified {identified_count} faces")
print("💡 Application remains open - use Quit button to close")
# Don't destroy the window - let user quit manually
return identified_count
print(f"\n✅ Identified {identified_count} faces")
return identified_count
def _display_similar_faces_in_panel(self, parent_frame, similar_faces_data, face_vars, face_images, face_crops, current_face_id=None, face_selection_states=None, data_cache=None):
"""Display similar faces in a panel - reuses auto-match display logic"""
import tkinter as tk
from tkinter import ttk
from PIL import Image, ImageTk
import os
# Create all similar faces using auto-match style display
for i, face_data in enumerate(similar_faces_data[:10]): # Limit to 10 faces
similar_face_id = face_data['face_id']
filename = face_data['filename']
distance = face_data['distance']
quality = face_data.get('quality_score', 0.5)
# Calculate confidence like in auto-match
confidence_pct = (1 - distance) * 100
confidence_desc = self._get_confidence_description(confidence_pct)
# Create match frame using auto-match style
match_frame = ttk.Frame(parent_frame)
match_frame.pack(fill=tk.X, padx=5, pady=5)
# Checkbox for this match (reusing auto-match checkbox style)
match_var = tk.BooleanVar()
face_vars.append((similar_face_id, match_var))
# Restore previous checkbox state if available (auto-match style)
if current_face_id is not None and face_selection_states is not None:
unique_key = f"{current_face_id}_{similar_face_id}"
if current_face_id in face_selection_states and unique_key in face_selection_states[current_face_id]:
saved_state = face_selection_states[current_face_id][unique_key]
match_var.set(saved_state)
# Add immediate callback to save state when checkbox changes (auto-match style)
def make_callback(var, face_id, similar_face_id):
def on_checkbox_change(*args):
unique_key = f"{face_id}_{similar_face_id}"
if face_id not in face_selection_states:
face_selection_states[face_id] = {}
face_selection_states[face_id][unique_key] = var.get()
return on_checkbox_change
# Bind the callback to the variable
match_var.trace('w', make_callback(match_var, current_face_id, similar_face_id))
# Create a frame for the checkbox and text labels
text_frame = ttk.Frame(match_frame)
text_frame.pack(side=tk.LEFT, padx=(0, 10))
# Checkbox without text
checkbox = ttk.Checkbutton(text_frame, variable=match_var)
checkbox.pack(side=tk.LEFT, padx=(0, 5))
# Create labels for confidence and filename
confidence_label = ttk.Label(text_frame, text=f"{confidence_pct:.1f}% {confidence_desc}", font=("Arial", 9, "bold"))
confidence_label.pack(anchor=tk.W)
filename_label = ttk.Label(text_frame, text=f"📁 {filename}", font=("Arial", 8), foreground="gray")
filename_label.pack(anchor=tk.W)
# Face image (reusing auto-match image display)
try:
# Get photo path from cache or database
photo_path = None
if data_cache and 'photo_paths' in data_cache:
# Find photo path by filename in cache
for photo_data in data_cache['photo_paths'].values():
if photo_data['filename'] == filename:
photo_path = photo_data['path']
break
# Fallback to database if not in cache
if photo_path is None:
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT path FROM photos WHERE filename = ?', (filename,))
result = cursor.fetchone()
photo_path = result[0] if result else None
# Extract face crop using existing method
face_crop_path = self._extract_face_crop(photo_path, face_data['location'], similar_face_id)
if face_crop_path and os.path.exists(face_crop_path):
face_crops.append(face_crop_path)
# Create canvas for face image (like in auto-match)
match_canvas = tk.Canvas(match_frame, width=80, height=80, bg='white')
match_canvas.pack(side=tk.LEFT, padx=(10, 0))
# Load and display image (reusing auto-match image loading)
pil_image = Image.open(face_crop_path)
pil_image.thumbnail((80, 80), Image.Resampling.LANCZOS)
photo = ImageTk.PhotoImage(pil_image)
match_canvas.create_image(40, 40, image=photo)
match_canvas.image = photo # Keep reference
face_images.append(photo)
else:
# No image available
match_canvas = tk.Canvas(match_frame, width=80, height=80, bg='white')
match_canvas.pack(side=tk.LEFT, padx=(10, 0))
match_canvas.create_text(40, 40, text="🖼️", fill="gray")
except Exception as e:
# Error loading image
match_canvas = tk.Canvas(match_frame, width=80, height=80, bg='white')
match_canvas.pack(side=tk.LEFT, padx=(10, 0))
match_canvas.create_text(40, 40, text="", fill="red")
def _extract_face_crop(self, photo_path: str, location: tuple, face_id: int) -> str:
"""Extract and save individual face crop for identification with caching"""
try:
# Check cache first
cache_key = f"{photo_path}_{location}_{face_id}"
if cache_key in self._image_cache:
cached_path = self._image_cache[cache_key]
# Verify the cached file still exists
if os.path.exists(cached_path):
return cached_path
else:
# Remove from cache if file doesn't exist
del self._image_cache[cache_key]
# Parse location tuple from string format
if isinstance(location, str):
location = eval(location)
top, right, bottom, left = location
# Load the image
image = Image.open(photo_path)
# Add padding around the face (20% of face size)
face_width = right - left
face_height = bottom - top
padding_x = int(face_width * 0.2)
padding_y = int(face_height * 0.2)
# Calculate crop bounds with padding
crop_left = max(0, left - padding_x)
crop_top = max(0, top - padding_y)
crop_right = min(image.width, right + padding_x)
crop_bottom = min(image.height, bottom + padding_y)
# Crop the face
face_crop = image.crop((crop_left, crop_top, crop_right, crop_bottom))
# Create temporary file for the face crop
temp_dir = tempfile.gettempdir()
face_filename = f"face_{face_id}_crop.jpg"
face_path = os.path.join(temp_dir, face_filename)
# Resize for better viewing (minimum 200px width)
if face_crop.width < 200:
ratio = 200 / face_crop.width
new_width = 200
new_height = int(face_crop.height * ratio)
face_crop = face_crop.resize((new_width, new_height), Image.Resampling.LANCZOS)
face_crop.save(face_path, "JPEG", quality=95)
# Cache the result
self._image_cache[cache_key] = face_path
return face_path
except Exception as e:
if self.verbose >= 1:
print(f"⚠️ Could not extract face crop: {e}")
return None
def _create_comparison_image(self, unid_crop_path: str, match_crop_path: str, person_name: str, confidence: float) -> str:
"""Create a side-by-side comparison image"""
try:
# Load both face crops
unid_img = Image.open(unid_crop_path)
match_img = Image.open(match_crop_path)
# Resize both to same height for better comparison
target_height = 300
unid_ratio = target_height / unid_img.height
match_ratio = target_height / match_img.height
unid_resized = unid_img.resize((int(unid_img.width * unid_ratio), target_height), Image.Resampling.LANCZOS)
match_resized = match_img.resize((int(match_img.width * match_ratio), target_height), Image.Resampling.LANCZOS)
# Create comparison image
total_width = unid_resized.width + match_resized.width + 20 # 20px gap
comparison = Image.new('RGB', (total_width, target_height + 60), 'white')
# Paste images
comparison.paste(unid_resized, (0, 30))
comparison.paste(match_resized, (unid_resized.width + 20, 30))
# Add labels
draw = ImageDraw.Draw(comparison)
try:
# Try to use a font
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 16)
except:
font = ImageFont.load_default()
draw.text((10, 5), "UNKNOWN", fill='red', font=font)
draw.text((unid_resized.width + 30, 5), f"{person_name.upper()}", fill='green', font=font)
draw.text((10, target_height + 35), f"Confidence: {confidence:.1%}", fill='blue', font=font)
# Save comparison image
temp_dir = tempfile.gettempdir()
comparison_path = os.path.join(temp_dir, f"face_comparison_{person_name}.jpg")
comparison.save(comparison_path, "JPEG", quality=95)
return comparison_path
except Exception as e:
if self.verbose >= 1:
print(f"⚠️ Could not create comparison image: {e}")
return None
def _get_confidence_description(self, confidence_pct: float) -> str:
"""Get human-readable confidence description"""
if confidence_pct >= 80:
return "🟢 (Very High - Almost Certain)"
elif confidence_pct >= 70:
return "🟡 (High - Likely Match)"
elif confidence_pct >= 60:
return "🟠 (Medium - Possible Match)"
elif confidence_pct >= 50:
return "🔴 (Low - Questionable)"
else:
return "⚫ (Very Low - Unlikely)"
def _calculate_face_quality_score(self, image: np.ndarray, face_location: tuple) -> float:
"""Calculate face quality score based on multiple factors"""
try:
top, right, bottom, left = face_location
face_height = bottom - top
face_width = right - left
# Basic size check - faces too small get lower scores
min_face_size = 50
size_score = min(1.0, (face_height * face_width) / (min_face_size * min_face_size))
# Extract face region
face_region = image[top:bottom, left:right]
if face_region.size == 0:
return 0.0
# Convert to grayscale for analysis
if len(face_region.shape) == 3:
gray_face = np.mean(face_region, axis=2)
else:
gray_face = face_region
# Calculate sharpness (Laplacian variance)
laplacian_var = np.var(np.array([[0, -1, 0], [-1, 4, -1], [0, -1, 0]]).astype(np.float32))
if laplacian_var > 0:
sharpness = np.var(np.array([[0, -1, 0], [-1, 4, -1], [0, -1, 0]]).astype(np.float32))
else:
sharpness = 0.0
sharpness_score = min(1.0, sharpness / 1000.0) # Normalize sharpness
# Calculate brightness and contrast
mean_brightness = np.mean(gray_face)
brightness_score = 1.0 - abs(mean_brightness - 128) / 128.0 # Prefer middle brightness
contrast = np.std(gray_face)
contrast_score = min(1.0, contrast / 64.0) # Prefer good contrast
# Calculate aspect ratio (faces should be roughly square)
aspect_ratio = face_width / face_height if face_height > 0 else 1.0
aspect_score = 1.0 - abs(aspect_ratio - 1.0) # Prefer square faces
# Calculate position in image (centered faces are better)
image_height, image_width = image.shape[:2]
center_x = (left + right) / 2
center_y = (top + bottom) / 2
position_x_score = 1.0 - abs(center_x - image_width / 2) / (image_width / 2)
position_y_score = 1.0 - abs(center_y - image_height / 2) / (image_height / 2)
position_score = (position_x_score + position_y_score) / 2.0
# Weighted combination of all factors
quality_score = (
size_score * 0.25 +
sharpness_score * 0.25 +
brightness_score * 0.15 +
contrast_score * 0.15 +
aspect_score * 0.10 +
position_score * 0.10
)
return max(0.0, min(1.0, quality_score))
except Exception as e:
if self.verbose >= 2:
print(f"⚠️ Error calculating face quality: {e}")
return 0.5 # Default medium quality on error
def _add_person_encoding(self, person_id: int, face_id: int, encoding: np.ndarray, quality_score: float):
"""Add a face encoding to a person's encoding collection"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
'INSERT INTO person_encodings (person_id, face_id, encoding, quality_score) VALUES (?, ?, ?, ?)',
(person_id, face_id, encoding.tobytes(), quality_score)
)
def _get_person_encodings(self, person_id: int, min_quality: float = 0.3) -> List[Tuple[np.ndarray, float]]:
"""Get all high-quality encodings for a person"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
'SELECT encoding, quality_score FROM person_encodings WHERE person_id = ? AND quality_score >= ? ORDER BY quality_score DESC',
(person_id, min_quality)
)
results = cursor.fetchall()
return [(np.frombuffer(encoding, dtype=np.float64), quality_score) for encoding, quality_score in results]
def _update_person_encodings(self, person_id: int):
"""Update person encodings when a face is identified"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Get all faces for this person
cursor.execute(
'SELECT id, encoding, quality_score FROM faces WHERE person_id = ? ORDER BY quality_score DESC',
(person_id,)
)
faces = cursor.fetchall()
# Clear existing person encodings
cursor.execute('DELETE FROM person_encodings WHERE person_id = ?', (person_id,))
# Add all faces as person encodings
for face_id, encoding, quality_score in faces:
cursor.execute(
'INSERT INTO person_encodings (person_id, face_id, encoding, quality_score) VALUES (?, ?, ?, ?)',
(person_id, face_id, encoding, quality_score)
)
def _calculate_adaptive_tolerance(self, base_tolerance: float, face_quality: float, match_confidence: float = None) -> float:
"""Calculate adaptive tolerance based on face quality and match confidence"""
# Start with base tolerance
tolerance = base_tolerance
# Adjust based on face quality (higher quality = stricter tolerance)
# More conservative: range 0.9 to 1.1 instead of 0.8 to 1.2
quality_factor = 0.9 + (face_quality * 0.2) # Range: 0.9 to 1.1
tolerance *= quality_factor
# If we have match confidence, adjust further
if match_confidence is not None:
# Higher confidence matches can use stricter tolerance
# More conservative: range 0.95 to 1.05 instead of 0.9 to 1.1
confidence_factor = 0.95 + (match_confidence * 0.1) # Range: 0.95 to 1.05
tolerance *= confidence_factor
# Ensure tolerance stays within reasonable bounds
return max(0.3, min(0.8, tolerance)) # Reduced max from 0.9 to 0.8
def _get_filtered_similar_faces(self, face_id: int, tolerance: float, include_same_photo: bool = False, face_status: dict = None) -> List[Dict]:
"""Get similar faces with consistent filtering and sorting logic used by both auto-match and identify"""
# Find similar faces using the core function
similar_faces_data = self.find_similar_faces(face_id, tolerance=tolerance, include_same_photo=include_same_photo)
# Filter to only show unidentified faces with confidence filtering
filtered_faces = []
for face in similar_faces_data:
# For auto-match: only filter by database state (keep existing behavior)
# For identify: also filter by current session state
is_identified_in_db = face.get('person_id') is not None
is_identified_in_session = face_status and face.get('face_id') in face_status and face_status[face.get('face_id')] == 'identified'
# If face_status is provided (identify mode), use both filters
# If face_status is None (auto-match mode), only use database filter
if face_status is not None:
# Identify mode: filter out both database and session identified faces
if not is_identified_in_db and not is_identified_in_session:
# Calculate confidence percentage
confidence_pct = (1 - face['distance']) * 100
# Only include matches with reasonable confidence (at least 40%)
if confidence_pct >= 40:
filtered_faces.append(face)
else:
# Auto-match mode: only filter by database state (keep existing behavior)
if not is_identified_in_db:
# Calculate confidence percentage
confidence_pct = (1 - face['distance']) * 100
# Only include matches with reasonable confidence (at least 40%)
if confidence_pct >= 40:
filtered_faces.append(face)
# Sort by confidence (distance) - highest confidence first
filtered_faces.sort(key=lambda x: x['distance'])
return filtered_faces
def _show_people_list(self, cursor=None):
"""Show list of known people"""
if cursor is None:
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT name FROM people ORDER BY name')
people = cursor.fetchall()
else:
cursor.execute('SELECT name FROM people ORDER BY name')
people = cursor.fetchall()
if people:
print("👥 Known people:", ", ".join([p[0] for p in people]))
else:
print("👥 No people identified yet")
def add_tags(self, photo_pattern: str = None, batch_size: int = 10) -> int:
"""Add custom tags to photos"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
if photo_pattern:
cursor.execute(
'SELECT id, filename FROM photos WHERE filename LIKE ? LIMIT ?',
(f'%{photo_pattern}%', batch_size)
)
else:
cursor.execute('SELECT id, filename FROM photos LIMIT ?', (batch_size,))
photos = cursor.fetchall()
if not photos:
print("No photos found")
return 0
print(f"🏷️ Tagging {len(photos)} photos (enter comma-separated tags)")
tagged_count = 0
for photo_id, filename in photos:
print(f"\n📸 {filename}")
tags_input = input("🏷️ Tags: ").strip()
if tags_input.lower() == 'q':
break
if tags_input:
tags = [tag.strip() for tag in tags_input.split(',') if tag.strip()]
for tag in tags:
cursor.execute(
'INSERT INTO tags (photo_id, tag_name) VALUES (?, ?)',
(photo_id, tag)
)
print(f" ✅ Added {len(tags)} tags")
tagged_count += 1
print(f"✅ Tagged {tagged_count} photos")
return tagged_count
def stats(self) -> Dict:
"""Show database statistics"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
stats = {}
# Basic counts
cursor.execute('SELECT COUNT(*) FROM photos')
result = cursor.fetchone()
stats['total_photos'] = result[0] if result else 0
cursor.execute('SELECT COUNT(*) FROM photos WHERE processed = 1')
result = cursor.fetchone()
stats['processed_photos'] = result[0] if result else 0
cursor.execute('SELECT COUNT(*) FROM faces')
result = cursor.fetchone()
stats['total_faces'] = result[0] if result else 0
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL')
result = cursor.fetchone()
stats['identified_faces'] = result[0] if result else 0
cursor.execute('SELECT COUNT(*) FROM people')
result = cursor.fetchone()
stats['total_people'] = result[0] if result else 0
cursor.execute('SELECT COUNT(DISTINCT tag_name) FROM tags')
result = cursor.fetchone()
stats['unique_tags'] = result[0] if result else 0
# Top people
cursor.execute('''
SELECT p.name, COUNT(f.id) as face_count
FROM people p
LEFT JOIN faces f ON p.id = f.person_id
GROUP BY p.id
ORDER BY face_count DESC
LIMIT 15
''')
stats['top_people'] = cursor.fetchall()
# Display stats
print(f"\n📊 Database Statistics")
print("=" * 40)
print(f"Photos: {stats['processed_photos']}/{stats['total_photos']} processed")
print(f"Faces: {stats['identified_faces']}/{stats['total_faces']} identified")
print(f"People: {stats['total_people']} unique")
print(f"Tags: {stats['unique_tags']} unique")
if stats['top_people']:
print(f"\n👥 Top People:")
for name, count in stats['top_people']:
print(f" {name}: {count} faces")
unidentified = stats['total_faces'] - stats['identified_faces']
if unidentified > 0:
print(f"\n⚠️ {unidentified} faces still need identification")
return stats
def search_faces(self, person_name: str) -> List[str]:
"""Search for photos containing a specific person"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT DISTINCT p.filename, p.path
FROM photos p
JOIN faces f ON p.id = f.photo_id
JOIN people pe ON f.person_id = pe.id
WHERE pe.name LIKE ?
''', (f'%{person_name}%',))
results = cursor.fetchall()
if results:
print(f"\n🔍 Found {len(results)} photos with '{person_name}':")
for filename, path in results:
print(f" 📸 {filename}")
else:
print(f"🔍 No photos found with '{person_name}'")
return [path for filename, path in results]
def find_similar_faces(self, face_id: int = None, tolerance: float = 0.6, include_same_photo: bool = False) -> List[Dict]:
"""Find similar faces across all photos with improved multi-encoding and quality scoring"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
if face_id:
# Find faces similar to a specific face
cursor.execute('''
SELECT id, photo_id, encoding, location, quality_score
FROM faces
WHERE id = ?
''', (face_id,))
target_face = cursor.fetchone()
if not target_face:
print(f"❌ Face ID {face_id} not found")
return []
target_encoding = self._get_cached_face_encoding(face_id, target_face[2])
target_quality = target_face[4] if len(target_face) > 4 else 0.5
# Get all other faces with quality scores
cursor.execute('''
SELECT f.id, f.photo_id, f.encoding, f.location, p.filename, f.person_id, f.quality_score
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.id != ? AND f.quality_score >= 0.2
''', (face_id,))
else:
# Find all unidentified faces and try to match them with identified ones
cursor.execute('''
SELECT f.id, f.photo_id, f.encoding, f.location, p.filename, f.person_id, f.quality_score
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.quality_score >= 0.2
ORDER BY f.quality_score DESC, f.id
''')
all_faces = cursor.fetchall()
matches = []
if face_id:
# Compare target face with all other faces using adaptive tolerance
for face_data in all_faces:
other_id, other_photo_id, other_encoding, other_location, other_filename, other_person_id, other_quality = face_data
other_enc = self._get_cached_face_encoding(other_id, other_encoding)
# Calculate adaptive tolerance based on both face qualities
avg_quality = (target_quality + other_quality) / 2
adaptive_tolerance = self._calculate_adaptive_tolerance(tolerance, avg_quality)
distance = face_recognition.face_distance([target_encoding], other_enc)[0]
if distance <= adaptive_tolerance:
matches.append({
'face_id': other_id,
'photo_id': other_photo_id,
'filename': other_filename,
'location': other_location,
'distance': distance,
'person_id': other_person_id,
'quality_score': other_quality,
'adaptive_tolerance': adaptive_tolerance
})
# Get target photo info
cursor.execute('SELECT filename FROM photos WHERE id = ?', (target_face[1],))
result = cursor.fetchone()
target_filename = result[0] if result else "Unknown"
print(f"\n🔍 Finding faces similar to face in: {target_filename}")
print(f"📍 Target face location: {target_face[3]}")
else:
# Auto-match unidentified faces with identified ones using multi-encoding
identified_faces = [f for f in all_faces if f[5] is not None] # person_id is not None
unidentified_faces = [f for f in all_faces if f[5] is None] # person_id is None
print(f"\n🔍 Auto-matching {len(unidentified_faces)} unidentified faces with {len(identified_faces)} known faces...")
# Group identified faces by person (simplified for now)
person_encodings = {}
for id_face in identified_faces:
person_id = id_face[5]
if person_id not in person_encodings:
# Use single encoding per person for now (simplified)
id_enc = self._get_cached_face_encoding(id_face[0], id_face[2])
person_encodings[person_id] = [(id_enc, id_face[6])]
for unid_face in unidentified_faces:
unid_id, unid_photo_id, unid_encoding, unid_location, unid_filename, _, unid_quality = unid_face
unid_enc = self._get_cached_face_encoding(unid_id, unid_encoding)
best_match = None
best_distance = float('inf')
best_person_id = None
# Compare with all person encodings
for person_id, encodings in person_encodings.items():
for person_enc, person_quality in encodings:
# Calculate adaptive tolerance based on both face qualities
avg_quality = (unid_quality + person_quality) / 2
adaptive_tolerance = self._calculate_adaptive_tolerance(tolerance, avg_quality)
distance = face_recognition.face_distance([unid_enc], person_enc)[0]
# Skip if same photo (unless specifically requested for twins detection)
# Note: Same photo check is simplified for performance
if not include_same_photo:
# For now, we'll skip this check to avoid performance issues
# TODO: Implement efficient same-photo checking
pass
if distance <= adaptive_tolerance and distance < best_distance:
best_distance = distance
best_person_id = person_id
# Get the best matching face info for this person
cursor.execute('''
SELECT f.id, f.photo_id, f.location, p.filename
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.person_id = ? AND f.quality_score >= ?
ORDER BY f.quality_score DESC
LIMIT 1
''', (person_id, 0.3))
best_face_info = cursor.fetchone()
if best_face_info:
best_match = {
'unidentified_id': unid_id,
'unidentified_photo_id': unid_photo_id,
'unidentified_filename': unid_filename,
'unidentified_location': unid_location,
'matched_id': best_face_info[0],
'matched_photo_id': best_face_info[1],
'matched_filename': best_face_info[3],
'matched_location': best_face_info[2],
'person_id': person_id,
'distance': distance,
'quality_score': unid_quality,
'adaptive_tolerance': adaptive_tolerance
}
if best_match:
matches.append(best_match)
return matches
def auto_identify_matches(self, tolerance: float = 0.6, confirm: bool = True, show_faces: bool = False, include_same_photo: bool = False) -> int:
"""Automatically identify faces that match already identified faces using GUI"""
# Get all identified faces (one per person) to use as reference faces
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT f.id, f.person_id, f.photo_id, f.location, p.filename, f.quality_score
FROM faces f
JOIN photos p ON f.photo_id = p.id
WHERE f.person_id IS NOT NULL AND f.quality_score >= 0.3
ORDER BY f.person_id, f.quality_score DESC
''')
identified_faces = cursor.fetchall()
if not identified_faces:
print("🔍 No identified faces found for auto-matching")
return 0
# Group by person and get the best quality face per person
person_faces = {}
for face in identified_faces:
person_id = face[1]
if person_id not in person_faces:
person_faces[person_id] = face
# Convert to ordered list to ensure consistent ordering
# Order by person name for user-friendly consistent results across runs
person_faces_list = []
for person_id, face in person_faces.items():
# Get person name for ordering
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT name FROM people WHERE id = ?', (person_id,))
result = cursor.fetchone()
person_name = result[0] if result else "Unknown"
person_faces_list.append((person_id, face, person_name))
# Sort by person name for consistent, user-friendly ordering
person_faces_list.sort(key=lambda x: x[2]) # Sort by person name (index 2)
print(f"\n🎯 Found {len(person_faces)} identified people to match against")
print("📊 Confidence Guide: 🟢80%+ = Very High, 🟡70%+ = High, 🟠60%+ = Medium, 🔴50%+ = Low, ⚫<50% = Very Low")
# Find similar faces for each identified person using face-to-face comparison
matches_by_matched = {}
for person_id, reference_face, person_name in person_faces_list:
reference_face_id = reference_face[0]
# Use the same filtering and sorting logic as identify
similar_faces = self._get_filtered_similar_faces(reference_face_id, tolerance, include_same_photo, face_status=None)
# Convert to auto-match format
person_matches = []
for similar_face in similar_faces:
# Convert to auto-match format
match = {
'unidentified_id': similar_face['face_id'],
'unidentified_photo_id': similar_face['photo_id'],
'unidentified_filename': similar_face['filename'],
'unidentified_location': similar_face['location'],
'matched_id': reference_face_id,
'matched_photo_id': reference_face[2],
'matched_filename': reference_face[4],
'matched_location': reference_face[3],
'person_id': person_id,
'distance': similar_face['distance'],
'quality_score': similar_face['quality_score'],
'adaptive_tolerance': similar_face.get('adaptive_tolerance', tolerance)
}
person_matches.append(match)
matches_by_matched[person_id] = person_matches
# Flatten all matches for counting
all_matches = []
for person_matches in matches_by_matched.values():
all_matches.extend(person_matches)
if not all_matches:
print("🔍 No similar faces found for auto-identification")
return 0
print(f"\n🎯 Found {len(all_matches)} potential matches")
# Pre-fetch all needed data to avoid repeated database queries in update_display
print("📊 Pre-fetching data for optimal performance...")
data_cache = {}
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Pre-fetch all person names
person_ids = list(matches_by_matched.keys())
if person_ids:
placeholders = ','.join('?' * len(person_ids))
cursor.execute(f'SELECT id, name FROM people WHERE id IN ({placeholders})', person_ids)
data_cache['person_names'] = {row[0]: row[1] for row in cursor.fetchall()}
# Pre-fetch all photo paths (both matched and unidentified)
all_photo_ids = set()
for person_matches in matches_by_matched.values():
for match in person_matches:
all_photo_ids.add(match['matched_photo_id'])
all_photo_ids.add(match['unidentified_photo_id'])
if all_photo_ids:
photo_ids_list = list(all_photo_ids)
placeholders = ','.join('?' * len(photo_ids_list))
cursor.execute(f'SELECT id, path FROM photos WHERE id IN ({placeholders})', photo_ids_list)
data_cache['photo_paths'] = {row[0]: row[1] for row in cursor.fetchall()}
print(f"✅ Pre-fetched {len(data_cache.get('person_names', {}))} person names and {len(data_cache.get('photo_paths', {}))} photo paths")
identified_count = 0
# Use integrated GUI for auto-matching
import tkinter as tk
from tkinter import ttk, messagebox
from PIL import Image, ImageTk
import json
import os
# Create the main window
root = tk.Tk()
root.title("Auto-Match Face Identification")
root.resizable(True, True)
# Track window state to prevent multiple destroy calls
window_destroyed = False
# Hide window initially to prevent flash at corner
root.withdraw()
# Set up protocol handler for window close button (X)
def on_closing():
nonlocal window_destroyed
# Clean up face crops and caches
self._cleanup_face_crops()
self.close_db_connection()
if not window_destroyed:
window_destroyed = True
try:
root.destroy()
except tk.TclError:
pass # Window already destroyed
root.protocol("WM_DELETE_WINDOW", on_closing)
# Set up window size saving with larger default size
saved_size = self._setup_window_size_saving(root, "gui_config.json")
# Override with larger size for auto-match window
root.geometry("1000x700")
# Create main frame
main_frame = ttk.Frame(root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# Configure grid weights
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
# Left side - matched face (person)
left_frame = ttk.LabelFrame(main_frame, text="Matched Person", padding="10")
left_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 5))
# Right side - unidentified faces that match this person
right_frame = ttk.LabelFrame(main_frame, text="Unidentified Faces to Match", padding="10")
right_frame.grid(row=0, column=1, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 0))
# Configure row weights
main_frame.rowconfigure(0, weight=1)
# Matched person info
matched_info_label = ttk.Label(left_frame, text="", font=("Arial", 10, "bold"))
matched_info_label.grid(row=0, column=0, pady=(0, 10), sticky=tk.W)
# Matched person image
matched_canvas = tk.Canvas(left_frame, width=300, height=300, bg='white')
matched_canvas.grid(row=1, column=0, pady=(0, 10))
# Save button for this person (will be created after function definitions)
save_btn = None
# Matches scrollable frame
matches_frame = ttk.Frame(right_frame)
matches_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# Create scrollbar for matches
scrollbar = ttk.Scrollbar(right_frame, orient="vertical", command=None)
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
# Create canvas for matches with scrollbar
matches_canvas = tk.Canvas(matches_frame, yscrollcommand=scrollbar.set)
matches_canvas.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
scrollbar.config(command=matches_canvas.yview)
# Configure grid weights
right_frame.columnconfigure(0, weight=1)
right_frame.rowconfigure(0, weight=1)
matches_frame.columnconfigure(0, weight=1)
matches_frame.rowconfigure(0, weight=1)
# Control buttons (navigation only)
control_frame = ttk.Frame(main_frame)
control_frame.grid(row=1, column=0, columnspan=2, pady=(10, 0))
# Button commands
current_matched_index = 0
matched_ids = [person_id for person_id, _, _ in person_faces_list if person_id in matches_by_matched and matches_by_matched[person_id]]
match_checkboxes = []
match_vars = []
identified_faces_per_person = {} # Track which faces were identified for each person
checkbox_states_per_person = {} # Track checkbox states for each person (unsaved selections)
def on_confirm_matches():
nonlocal identified_count, current_matched_index, identified_faces_per_person
if current_matched_index < len(matched_ids):
matched_id = matched_ids[current_matched_index]
matches_for_this_person = matches_by_matched[matched_id]
# Initialize identified faces for this person if not exists
if matched_id not in identified_faces_per_person:
identified_faces_per_person[matched_id] = set()
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Process all matches (both checked and unchecked)
for i, (match, var) in enumerate(zip(matches_for_this_person, match_vars)):
if var.get():
# Face is checked - assign to person
cursor.execute(
'UPDATE faces SET person_id = ? WHERE id = ?',
(match['person_id'], match['unidentified_id'])
)
# Use cached person name instead of database query
person_name = data_cache['person_names'].get(match['person_id'], "Unknown")
# Track this face as identified for this person
identified_faces_per_person[matched_id].add(match['unidentified_id'])
print(f"✅ Identified as: {person_name}")
identified_count += 1
else:
# Face is unchecked - check if it was previously identified for this person
if match['unidentified_id'] in identified_faces_per_person[matched_id]:
# This face was previously identified for this person, now unchecking it
cursor.execute(
'UPDATE faces SET person_id = NULL WHERE id = ?',
(match['unidentified_id'],)
)
# Remove from identified faces for this person
identified_faces_per_person[matched_id].discard(match['unidentified_id'])
print(f"❌ Unidentified: {match['unidentified_filename']}")
# Update person encodings for all affected persons after database transaction is complete
for person_id in set(match['person_id'] for match in matches_for_this_person if match['person_id']):
self._update_person_encodings(person_id)
# Clear checkbox states for this person after saving
if matched_id in checkbox_states_per_person:
del checkbox_states_per_person[matched_id]
def on_skip_current():
nonlocal current_matched_index
# Save current checkbox states before navigating away
save_current_checkbox_states()
current_matched_index += 1
if current_matched_index < len(matched_ids):
update_display()
else:
finish_auto_match()
def on_go_back():
nonlocal current_matched_index
if current_matched_index > 0:
# Save current checkbox states before navigating away
save_current_checkbox_states()
current_matched_index -= 1
update_display()
def on_quit_auto_match():
nonlocal window_destroyed
if not window_destroyed:
window_destroyed = True
try:
root.destroy()
except tk.TclError:
pass # Window already destroyed
def finish_auto_match():
nonlocal window_destroyed
print(f"\n✅ Auto-identified {identified_count} faces")
if not window_destroyed:
window_destroyed = True
try:
root.destroy()
except tk.TclError:
pass # Window already destroyed
# Create button references for state management
back_btn = ttk.Button(control_frame, text="⏮️ Back", command=on_go_back)
next_btn = ttk.Button(control_frame, text="⏭️ Next", command=on_skip_current)
quit_btn = ttk.Button(control_frame, text="❌ Quit", command=on_quit_auto_match)
back_btn.grid(row=0, column=0, padx=(0, 5))
next_btn.grid(row=0, column=1, padx=5)
quit_btn.grid(row=0, column=2, padx=(5, 0))
# Create save button now that functions are defined
save_btn = ttk.Button(left_frame, text="💾 Save Changes", command=on_confirm_matches)
save_btn.grid(row=2, column=0, pady=(0, 10), sticky=(tk.W, tk.E))
def update_button_states():
"""Update button states based on current position"""
# Enable/disable Back button based on position
if current_matched_index > 0:
back_btn.config(state='normal')
else:
back_btn.config(state='disabled')
# Enable/disable Next button based on position
if current_matched_index < len(matched_ids) - 1:
next_btn.config(state='normal')
else:
next_btn.config(state='disabled')
def update_save_button_text():
"""Update save button text with current person name"""
if current_matched_index < len(matched_ids):
matched_id = matched_ids[current_matched_index]
# Get person name from the first match for this person
matches_for_current_person = matches_by_matched[matched_id]
if matches_for_current_person:
person_id = matches_for_current_person[0]['person_id']
# Use cached person name instead of database query
person_name = data_cache['person_names'].get(person_id, "Unknown")
save_btn.config(text=f"💾 Save changes for {person_name}")
else:
save_btn.config(text="💾 Save Changes")
else:
save_btn.config(text="💾 Save Changes")
def save_current_checkbox_states():
"""Save current checkbox states for the current person"""
if current_matched_index < len(matched_ids) and match_vars:
current_matched_id = matched_ids[current_matched_index]
matches_for_current_person = matches_by_matched[current_matched_id]
if len(match_vars) == len(matches_for_current_person):
if current_matched_id not in checkbox_states_per_person:
checkbox_states_per_person[current_matched_id] = {}
# Save current checkbox states for this person
for i, (match, var) in enumerate(zip(matches_for_current_person, match_vars)):
unique_key = f"{current_matched_id}_{match['unidentified_id']}"
checkbox_states_per_person[current_matched_id][unique_key] = var.get()
if self.verbose >= 2:
print(f"DEBUG: Saved state for person {current_matched_id}, face {match['unidentified_id']}: {var.get()}")
def update_display():
nonlocal current_matched_index
if current_matched_index >= len(matched_ids):
finish_auto_match()
return
matched_id = matched_ids[current_matched_index]
matches_for_this_person = matches_by_matched[matched_id]
# Update button states
update_button_states()
# Update save button text with person name
update_save_button_text()
# Update title
root.title(f"Auto-Match Face Identification - {current_matched_index + 1}/{len(matched_ids)}")
# Get the first match to get matched person info
if not matches_for_this_person:
print(f"❌ Error: No matches found for current person {matched_id}")
# Skip to next person if available
if current_matched_index < len(matched_ids) - 1:
current_matched_index += 1
update_display()
else:
finish_auto_match()
return
first_match = matches_for_this_person[0]
# Use cached data instead of database queries
person_name = data_cache['person_names'].get(first_match['person_id'], "Unknown")
matched_photo_path = data_cache['photo_paths'].get(first_match['matched_photo_id'], None)
# Update matched person info
matched_info_label.config(text=f"👤 Person: {person_name}\n📁 Photo: {first_match['matched_filename']}\n📍 Face location: {first_match['matched_location']}")
# Display matched person face
matched_canvas.delete("all")
if show_faces:
matched_crop_path = self._extract_face_crop(
matched_photo_path,
first_match['matched_location'],
f"matched_{first_match['person_id']}"
)
if matched_crop_path and os.path.exists(matched_crop_path):
try:
pil_image = Image.open(matched_crop_path)
pil_image.thumbnail((300, 300), Image.Resampling.LANCZOS)
photo = ImageTk.PhotoImage(pil_image)
matched_canvas.create_image(150, 150, image=photo)
matched_canvas.image = photo
except Exception as e:
matched_canvas.create_text(150, 150, text=f"❌ Could not load image: {e}", fill="red")
else:
matched_canvas.create_text(150, 150, text="🖼️ No face crop available", fill="gray")
# Clear and populate unidentified faces
matches_canvas.delete("all")
match_checkboxes.clear()
match_vars.clear()
# Create frame for unidentified faces inside canvas
matches_inner_frame = ttk.Frame(matches_canvas)
matches_canvas.create_window((0, 0), window=matches_inner_frame, anchor="nw")
# Use cached photo paths instead of database queries
photo_paths = data_cache['photo_paths']
# Create all checkboxes
for i, match in enumerate(matches_for_this_person):
# Get unidentified face info from cached data
unidentified_photo_path = photo_paths.get(match['unidentified_photo_id'], '')
# Calculate confidence
confidence_pct = (1 - match['distance']) * 100
confidence_desc = self._get_confidence_description(confidence_pct)
# Create match frame
match_frame = ttk.Frame(matches_inner_frame)
match_frame.grid(row=i, column=0, sticky=(tk.W, tk.E), pady=5)
# Checkbox for this match
match_var = tk.BooleanVar()
# Restore previous checkbox state if available
unique_key = f"{matched_id}_{match['unidentified_id']}"
if matched_id in checkbox_states_per_person and unique_key in checkbox_states_per_person[matched_id]:
saved_state = checkbox_states_per_person[matched_id][unique_key]
match_var.set(saved_state)
if self.verbose >= 2:
print(f"DEBUG: Restored state for person {matched_id}, face {match['unidentified_id']}: {saved_state}")
# Otherwise, pre-select if this face was previously identified for this person
elif matched_id in identified_faces_per_person and match['unidentified_id'] in identified_faces_per_person[matched_id]:
match_var.set(True)
if self.verbose >= 2:
print(f"DEBUG: Pre-selected identified face for person {matched_id}, face {match['unidentified_id']}")
match_vars.append(match_var)
# Add callback to save state immediately when checkbox changes
def on_checkbox_change(var, person_id, face_id):
unique_key = f"{person_id}_{face_id}"
if person_id not in checkbox_states_per_person:
checkbox_states_per_person[person_id] = {}
checkbox_states_per_person[person_id][unique_key] = var.get()
if self.verbose >= 2:
print(f"DEBUG: Checkbox changed for person {person_id}, face {face_id}: {var.get()}")
# Bind the callback to the variable
match_var.trace('w', lambda *args: on_checkbox_change(match_var, matched_id, match['unidentified_id']))
# Create a frame for the checkbox and text labels
text_frame = ttk.Frame(match_frame)
text_frame.grid(row=0, column=0, sticky=tk.W, padx=(0, 10))
# Checkbox without text
checkbox = ttk.Checkbutton(text_frame, variable=match_var)
checkbox.pack(side=tk.LEFT, padx=(0, 5))
match_checkboxes.append(checkbox)
# Create labels for confidence and filename
confidence_label = ttk.Label(text_frame, text=f"{confidence_pct:.1f}% {confidence_desc}", font=("Arial", 9, "bold"))
confidence_label.pack(anchor=tk.W)
filename_label = ttk.Label(text_frame, text=f"📁 {match['unidentified_filename']}", font=("Arial", 8), foreground="gray")
filename_label.pack(anchor=tk.W)
# Unidentified face image
if show_faces:
match_canvas = tk.Canvas(match_frame, width=100, height=100, bg='white')
match_canvas.grid(row=0, column=1, padx=(10, 0))
unidentified_crop_path = self._extract_face_crop(
unidentified_photo_path,
match['unidentified_location'],
f"unid_{match['unidentified_id']}"
)
if unidentified_crop_path and os.path.exists(unidentified_crop_path):
try:
pil_image = Image.open(unidentified_crop_path)
pil_image.thumbnail((100, 100), Image.Resampling.LANCZOS)
photo = ImageTk.PhotoImage(pil_image)
match_canvas.create_image(50, 50, image=photo)
match_canvas.image = photo
except Exception as e:
match_canvas.create_text(50, 50, text="", fill="red")
else:
match_canvas.create_text(50, 50, text="🖼️", fill="gray")
# Update scroll region
matches_canvas.update_idletasks()
matches_canvas.configure(scrollregion=matches_canvas.bbox("all"))
# Show the window
try:
root.deiconify()
root.lift()
root.focus_force()
except tk.TclError:
# Window was destroyed before we could show it
return 0
# Start with first matched person
update_display()
# Main event loop
try:
root.mainloop()
except tk.TclError:
pass # Window was destroyed
return identified_count
def main():
"""Main CLI interface"""
parser = argparse.ArgumentParser(
description="PunimTag CLI - Simple photo face tagger",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
photo_tagger.py scan /path/to/photos # Scan folder for photos
photo_tagger.py process --limit 20 # Process 20 photos for faces
photo_tagger.py identify --batch 10 # Identify 10 faces interactively
photo_tagger.py auto-match # Auto-identify matching faces
photo_tagger.py match 15 # Find faces similar to face ID 15
photo_tagger.py tag --pattern "vacation" # Tag photos matching pattern
photo_tagger.py search "John" # Find photos with John
photo_tagger.py stats # Show statistics
"""
)
parser.add_argument('command',
choices=['scan', 'process', 'identify', 'tag', 'search', 'stats', 'match', 'auto-match'],
help='Command to execute')
parser.add_argument('target', nargs='?',
help='Target folder (scan), person name (search), or pattern (tag)')
parser.add_argument('--db', default='data/photos.db',
help='Database file path (default: data/photos.db)')
parser.add_argument('--limit', type=int, default=50,
help='Batch size limit for processing (default: 50)')
parser.add_argument('--batch', type=int, default=20,
help='Batch size for identification (default: 20)')
parser.add_argument('--pattern',
help='Pattern for filtering photos when tagging')
parser.add_argument('--model', choices=['hog', 'cnn'], default='hog',
help='Face detection model: hog (faster) or cnn (more accurate)')
parser.add_argument('--recursive', action='store_true',
help='Scan folders recursively')
parser.add_argument('--show-faces', action='store_true',
help='Show individual face crops during identification')
parser.add_argument('--tolerance', type=float, default=0.5,
help='Face matching tolerance (0.0-1.0, lower = stricter, default: 0.5)')
parser.add_argument('--auto', action='store_true',
help='Auto-identify high-confidence matches without confirmation')
parser.add_argument('--include-twins', action='store_true',
help='Include same-photo matching (for twins or multiple instances)')
parser.add_argument('-v', '--verbose', action='count', default=0,
help='Increase verbosity (-v, -vv, -vvv for more detail)')
parser.add_argument('--debug', action='store_true',
help='Enable line-by-line debugging with pdb')
args = parser.parse_args()
# Initialize tagger
tagger = PhotoTagger(args.db, args.verbose, args.debug)
try:
if args.command == 'scan':
if not args.target:
print("❌ Please specify a folder to scan")
return 1
tagger.scan_folder(args.target, args.recursive)
elif args.command == 'process':
tagger.process_faces(args.limit, args.model)
elif args.command == 'identify':
show_faces = getattr(args, 'show_faces', False)
tagger.identify_faces(args.batch, show_faces, args.tolerance)
elif args.command == 'tag':
tagger.add_tags(args.pattern or args.target, args.batch)
elif args.command == 'search':
if not args.target:
print("❌ Please specify a person name to search for")
return 1
tagger.search_faces(args.target)
elif args.command == 'stats':
tagger.stats()
elif args.command == 'match':
if args.target and args.target.isdigit():
face_id = int(args.target)
matches = tagger.find_similar_faces(face_id, args.tolerance)
if matches:
print(f"\n🎯 Found {len(matches)} similar faces:")
for match in matches:
person_name = "Unknown" if match['person_id'] is None else f"Person ID {match['person_id']}"
print(f" 📸 {match['filename']} - {person_name} (confidence: {(1-match['distance']):.1%})")
else:
print("🔍 No similar faces found")
else:
print("❌ Please specify a face ID number to find matches for")
elif args.command == 'auto-match':
show_faces = getattr(args, 'show_faces', False)
include_twins = getattr(args, 'include_twins', False)
tagger.auto_identify_matches(args.tolerance, not args.auto, show_faces, include_twins)
return 0
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
return 1
except Exception as e:
print(f"❌ Error: {e}")
return 1
finally:
# Always cleanup resources
tagger.cleanup()
if __name__ == "__main__":
sys.exit(main())