This commit introduces a comprehensive auto-identification feature within the PhotoTagger application, allowing users to automatically match unidentified faces against identified ones using a graphical user interface. The implementation includes database queries to fetch identified faces, a user-friendly display of potential matches, and options for selecting and saving identified faces. The GUI is designed for optimal performance and usability, ensuring a seamless experience. Additionally, the README has been updated to reflect this new functionality and provide usage instructions.
1557 lines
74 KiB
Python
1557 lines
74 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
PunimTag CLI - Minimal Photo Face Tagger (Refactored)
|
||
Simple command-line tool for face recognition and photo tagging
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import argparse
|
||
import threading
|
||
from typing import List, Dict, Tuple, Optional
|
||
|
||
# Import our new modules
|
||
from config import (
|
||
DEFAULT_DB_PATH, DEFAULT_FACE_DETECTION_MODEL, DEFAULT_FACE_TOLERANCE,
|
||
DEFAULT_BATCH_SIZE, DEFAULT_PROCESSING_LIMIT
|
||
)
|
||
from database import DatabaseManager
|
||
from face_processing import FaceProcessor
|
||
from photo_management import PhotoManager
|
||
from tag_management import TagManager
|
||
from search_stats import SearchStats
|
||
from gui_core import GUICore
|
||
from identify_gui import IdentifyGUI
|
||
|
||
|
||
class PhotoTagger:
|
||
"""Main PhotoTagger class - orchestrates all functionality"""
|
||
|
||
def __init__(self, db_path: str = DEFAULT_DB_PATH, verbose: int = 0, debug: bool = False):
|
||
"""Initialize the photo tagger with database and all managers"""
|
||
self.db_path = db_path
|
||
self.verbose = verbose
|
||
self.debug = debug
|
||
|
||
# Initialize all managers
|
||
self.db = DatabaseManager(db_path, verbose)
|
||
self.face_processor = FaceProcessor(self.db, verbose)
|
||
self.photo_manager = PhotoManager(self.db, verbose)
|
||
self.tag_manager = TagManager(self.db, verbose)
|
||
self.search_stats = SearchStats(self.db, verbose)
|
||
self.gui_core = GUICore()
|
||
self.identify_gui = IdentifyGUI(self.db, self.face_processor, verbose)
|
||
|
||
# Legacy compatibility - expose some methods directly
|
||
self._db_connection = None
|
||
self._db_lock = threading.Lock()
|
||
|
||
def cleanup(self):
|
||
"""Clean up resources and close connections"""
|
||
self.face_processor.cleanup_face_crops()
|
||
self.db.close_db_connection()
|
||
|
||
# Database methods (delegated)
|
||
def get_db_connection(self):
|
||
"""Get database connection (legacy compatibility)"""
|
||
return self.db.get_db_connection()
|
||
|
||
def close_db_connection(self):
|
||
"""Close database connection (legacy compatibility)"""
|
||
self.db.close_db_connection()
|
||
|
||
def init_database(self):
|
||
"""Initialize database (legacy compatibility)"""
|
||
self.db.init_database()
|
||
|
||
# Photo management methods (delegated)
|
||
def scan_folder(self, folder_path: str, recursive: bool = True) -> int:
|
||
"""Scan folder for photos and add to database"""
|
||
return self.photo_manager.scan_folder(folder_path, recursive)
|
||
|
||
def _extract_photo_date(self, photo_path: str) -> Optional[str]:
|
||
"""Extract date taken from photo EXIF data (legacy compatibility)"""
|
||
return self.photo_manager.extract_photo_date(photo_path)
|
||
|
||
# Face processing methods (delegated)
|
||
def process_faces(self, limit: int = DEFAULT_PROCESSING_LIMIT, model: str = DEFAULT_FACE_DETECTION_MODEL) -> int:
|
||
"""Process unprocessed photos for faces"""
|
||
return self.face_processor.process_faces(limit, model)
|
||
|
||
def _extract_face_crop(self, photo_path: str, location: tuple, face_id: int) -> str:
|
||
"""Extract and save individual face crop for identification (legacy compatibility)"""
|
||
return self.face_processor._extract_face_crop(photo_path, location, face_id)
|
||
|
||
def _create_comparison_image(self, unid_crop_path: str, match_crop_path: str, person_name: str, confidence: float) -> str:
|
||
"""Create a side-by-side comparison image (legacy compatibility)"""
|
||
return self.face_processor._create_comparison_image(unid_crop_path, match_crop_path, person_name, confidence)
|
||
|
||
def _calculate_face_quality_score(self, image, face_location: tuple) -> float:
|
||
"""Calculate face quality score (legacy compatibility)"""
|
||
return self.face_processor._calculate_face_quality_score(image, face_location)
|
||
|
||
def _add_person_encoding(self, person_id: int, face_id: int, encoding, quality_score: float):
|
||
"""Add a face encoding to a person's encoding collection (legacy compatibility)"""
|
||
self.face_processor.add_person_encoding(person_id, face_id, encoding, quality_score)
|
||
|
||
def _get_person_encodings(self, person_id: int, min_quality: float = 0.3):
|
||
"""Get all high-quality encodings for a person (legacy compatibility)"""
|
||
return self.face_processor.get_person_encodings(person_id, min_quality)
|
||
|
||
def _update_person_encodings(self, person_id: int):
|
||
"""Update person encodings when a face is identified (legacy compatibility)"""
|
||
self.face_processor.update_person_encodings(person_id)
|
||
|
||
def _calculate_adaptive_tolerance(self, base_tolerance: float, face_quality: float, match_confidence: float = None) -> float:
|
||
"""Calculate adaptive tolerance (legacy compatibility)"""
|
||
return self.face_processor._calculate_adaptive_tolerance(base_tolerance, face_quality, match_confidence)
|
||
|
||
def _get_filtered_similar_faces(self, face_id: int, tolerance: float, include_same_photo: bool = False, face_status: dict = None):
|
||
"""Get similar faces with filtering (legacy compatibility)"""
|
||
return self.face_processor._get_filtered_similar_faces(face_id, tolerance, include_same_photo, face_status)
|
||
|
||
def _filter_unique_faces(self, faces: List[Dict]):
|
||
"""Filter faces to show only unique ones (legacy compatibility)"""
|
||
return self.face_processor._filter_unique_faces(faces)
|
||
|
||
def _filter_unique_faces_from_list(self, faces_list: List[tuple]):
|
||
"""Filter face list to show only unique ones (legacy compatibility)"""
|
||
return self.face_processor._filter_unique_faces_from_list(faces_list)
|
||
|
||
def find_similar_faces(self, face_id: int = None, tolerance: float = DEFAULT_FACE_TOLERANCE, include_same_photo: bool = False):
|
||
"""Find similar faces across all photos"""
|
||
return self.face_processor.find_similar_faces(face_id, tolerance, include_same_photo)
|
||
|
||
def auto_identify_matches(self, tolerance: float = DEFAULT_FACE_TOLERANCE, confirm: bool = True, show_faces: bool = False, include_same_photo: bool = False) -> int:
|
||
"""Automatically identify faces that match already identified faces using GUI"""
|
||
# Get all identified faces (one per person) to use as reference faces
|
||
with self.get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute('''
|
||
SELECT f.id, f.person_id, f.photo_id, f.location, p.filename, f.quality_score
|
||
FROM faces f
|
||
JOIN photos p ON f.photo_id = p.id
|
||
WHERE f.person_id IS NOT NULL AND f.quality_score >= 0.3
|
||
ORDER BY f.person_id, f.quality_score DESC
|
||
''')
|
||
identified_faces = cursor.fetchall()
|
||
|
||
if not identified_faces:
|
||
print("🔍 No identified faces found for auto-matching")
|
||
return 0
|
||
|
||
# Group by person and get the best quality face per person
|
||
person_faces = {}
|
||
for face in identified_faces:
|
||
person_id = face[1]
|
||
if person_id not in person_faces:
|
||
person_faces[person_id] = face
|
||
|
||
# Convert to ordered list to ensure consistent ordering
|
||
# Order by person name for user-friendly consistent results across runs
|
||
person_faces_list = []
|
||
for person_id, face in person_faces.items():
|
||
# Get person name for ordering
|
||
with self.get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute('SELECT first_name, last_name FROM people WHERE id = ?', (person_id,))
|
||
result = cursor.fetchone()
|
||
if result:
|
||
first_name, last_name = result
|
||
if last_name and first_name:
|
||
person_name = f"{last_name}, {first_name}"
|
||
elif last_name:
|
||
person_name = last_name
|
||
elif first_name:
|
||
person_name = first_name
|
||
else:
|
||
person_name = "Unknown"
|
||
else:
|
||
person_name = "Unknown"
|
||
person_faces_list.append((person_id, face, person_name))
|
||
|
||
# Sort by person name for consistent, user-friendly ordering
|
||
person_faces_list.sort(key=lambda x: x[2]) # Sort by person name (index 2)
|
||
|
||
print(f"\n🎯 Found {len(person_faces)} identified people to match against")
|
||
print("📊 Confidence Guide: 🟢80%+ = Very High, 🟡70%+ = High, 🟠60%+ = Medium, 🔴50%+ = Low, ⚫<50% = Very Low")
|
||
|
||
# Find similar faces for each identified person using face-to-face comparison
|
||
matches_by_matched = {}
|
||
for person_id, reference_face, person_name in person_faces_list:
|
||
reference_face_id = reference_face[0]
|
||
|
||
# Use the same filtering and sorting logic as identify
|
||
similar_faces = self._get_filtered_similar_faces(reference_face_id, tolerance, include_same_photo, face_status=None)
|
||
|
||
# Convert to auto-match format
|
||
person_matches = []
|
||
for similar_face in similar_faces:
|
||
# Convert to auto-match format
|
||
match = {
|
||
'unidentified_id': similar_face['face_id'],
|
||
'unidentified_photo_id': similar_face['photo_id'],
|
||
'unidentified_filename': similar_face['filename'],
|
||
'unidentified_location': similar_face['location'],
|
||
'matched_id': reference_face_id,
|
||
'matched_photo_id': reference_face[2],
|
||
'matched_filename': reference_face[4],
|
||
'matched_location': reference_face[3],
|
||
'person_id': person_id,
|
||
'distance': similar_face['distance'],
|
||
'quality_score': similar_face['quality_score'],
|
||
'adaptive_tolerance': similar_face.get('adaptive_tolerance', tolerance)
|
||
}
|
||
person_matches.append(match)
|
||
|
||
matches_by_matched[person_id] = person_matches
|
||
|
||
# Flatten all matches for counting
|
||
all_matches = []
|
||
for person_matches in matches_by_matched.values():
|
||
all_matches.extend(person_matches)
|
||
|
||
if not all_matches:
|
||
print("🔍 No similar faces found for auto-identification")
|
||
return 0
|
||
|
||
print(f"\n🎯 Found {len(all_matches)} potential matches")
|
||
|
||
# Pre-fetch all needed data to avoid repeated database queries in update_display
|
||
print("📊 Pre-fetching data for optimal performance...")
|
||
data_cache = {}
|
||
|
||
with self.get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# Pre-fetch all person names and details
|
||
person_ids = list(matches_by_matched.keys())
|
||
if person_ids:
|
||
placeholders = ','.join('?' * len(person_ids))
|
||
cursor.execute(f'SELECT id, first_name, last_name, middle_name, maiden_name, date_of_birth FROM people WHERE id IN ({placeholders})', person_ids)
|
||
data_cache['person_details'] = {}
|
||
for row in cursor.fetchall():
|
||
person_id = row[0]
|
||
first_name = row[1] or ''
|
||
last_name = row[2] or ''
|
||
middle_name = row[3] or ''
|
||
maiden_name = row[4] or ''
|
||
date_of_birth = row[5] or ''
|
||
|
||
# Create full name display
|
||
name_parts = []
|
||
if first_name:
|
||
name_parts.append(first_name)
|
||
if middle_name:
|
||
name_parts.append(middle_name)
|
||
if last_name:
|
||
name_parts.append(last_name)
|
||
if maiden_name:
|
||
name_parts.append(f"({maiden_name})")
|
||
|
||
full_name = ' '.join(name_parts)
|
||
data_cache['person_details'][person_id] = {
|
||
'full_name': full_name,
|
||
'first_name': first_name,
|
||
'last_name': last_name,
|
||
'middle_name': middle_name,
|
||
'maiden_name': maiden_name,
|
||
'date_of_birth': date_of_birth
|
||
}
|
||
|
||
# Pre-fetch all photo paths (both matched and unidentified)
|
||
all_photo_ids = set()
|
||
for person_matches in matches_by_matched.values():
|
||
for match in person_matches:
|
||
all_photo_ids.add(match['matched_photo_id'])
|
||
all_photo_ids.add(match['unidentified_photo_id'])
|
||
|
||
if all_photo_ids:
|
||
photo_ids_list = list(all_photo_ids)
|
||
placeholders = ','.join('?' * len(photo_ids_list))
|
||
cursor.execute(f'SELECT id, path FROM photos WHERE id IN ({placeholders})', photo_ids_list)
|
||
data_cache['photo_paths'] = {row[0]: row[1] for row in cursor.fetchall()}
|
||
|
||
print(f"✅ Pre-fetched {len(data_cache.get('person_details', {}))} person details and {len(data_cache.get('photo_paths', {}))} photo paths")
|
||
|
||
identified_count = 0
|
||
|
||
# Use integrated GUI for auto-matching
|
||
import tkinter as tk
|
||
from tkinter import ttk, messagebox
|
||
from PIL import Image, ImageTk
|
||
import json
|
||
import os
|
||
|
||
# Create the main window
|
||
root = tk.Tk()
|
||
root.title("Auto-Match Face Identification")
|
||
root.resizable(True, True)
|
||
|
||
# Track window state to prevent multiple destroy calls
|
||
window_destroyed = False
|
||
|
||
# Hide window initially to prevent flash at corner
|
||
root.withdraw()
|
||
|
||
# Set up protocol handler for window close button (X)
|
||
def on_closing():
|
||
nonlocal window_destroyed
|
||
# Clean up face crops and caches
|
||
self._cleanup_face_crops()
|
||
self.close_db_connection()
|
||
|
||
if not window_destroyed:
|
||
window_destroyed = True
|
||
try:
|
||
root.destroy()
|
||
except tk.TclError:
|
||
pass # Window already destroyed
|
||
|
||
root.protocol("WM_DELETE_WINDOW", on_closing)
|
||
|
||
# Set up window size saving with larger default size
|
||
saved_size = self._setup_window_size_saving(root, "gui_config.json")
|
||
# Override with larger size for auto-match window
|
||
root.geometry("1000x700")
|
||
|
||
# Create main frame
|
||
main_frame = ttk.Frame(root, padding="10")
|
||
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
|
||
# Configure grid weights
|
||
root.columnconfigure(0, weight=1)
|
||
root.rowconfigure(0, weight=1)
|
||
main_frame.columnconfigure(0, weight=1)
|
||
main_frame.columnconfigure(1, weight=1)
|
||
|
||
# Left side - identified person
|
||
left_frame = ttk.LabelFrame(main_frame, text="Identified person", padding="10")
|
||
left_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 5))
|
||
|
||
# Right side - unidentified faces that match this person
|
||
right_frame = ttk.LabelFrame(main_frame, text="Unidentified Faces to Match", padding="10")
|
||
right_frame.grid(row=0, column=1, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(5, 0))
|
||
|
||
# Configure row weights
|
||
main_frame.rowconfigure(0, weight=1)
|
||
|
||
# Check if there's only one person - if so, disable search functionality
|
||
# Use matched_ids instead of person_faces_list since we only show people with potential matches
|
||
matched_ids = [person_id for person_id, _, _ in person_faces_list if person_id in matches_by_matched and matches_by_matched[person_id]]
|
||
has_only_one_person = len(matched_ids) == 1
|
||
print(f"DEBUG: person_faces_list length: {len(person_faces_list)}, matched_ids length: {len(matched_ids)}, has_only_one_person: {has_only_one_person}")
|
||
|
||
# Search controls for filtering people by last name
|
||
last_name_search_var = tk.StringVar()
|
||
# Search field with label underneath (like modifyidentified edit section)
|
||
search_frame = ttk.Frame(left_frame)
|
||
search_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
|
||
|
||
# Search input on the left
|
||
search_entry = ttk.Entry(search_frame, textvariable=last_name_search_var, width=20)
|
||
search_entry.grid(row=0, column=0, sticky=tk.W)
|
||
|
||
# Buttons on the right of the search input
|
||
buttons_row = ttk.Frame(search_frame)
|
||
buttons_row.grid(row=0, column=1, sticky=tk.W, padx=(6, 0))
|
||
|
||
search_btn = ttk.Button(buttons_row, text="Search", width=8)
|
||
search_btn.pack(side=tk.LEFT, padx=(0, 5))
|
||
clear_btn = ttk.Button(buttons_row, text="Clear", width=6)
|
||
clear_btn.pack(side=tk.LEFT)
|
||
|
||
# Helper label directly under the search input
|
||
if has_only_one_person:
|
||
print("DEBUG: Disabling search functionality - only one person found")
|
||
# Disable search functionality if there's only one person
|
||
search_entry.config(state='disabled')
|
||
search_btn.config(state='disabled')
|
||
clear_btn.config(state='disabled')
|
||
# Add a label to explain why search is disabled
|
||
disabled_label = ttk.Label(search_frame, text="(Search disabled - only one person found)",
|
||
font=("Arial", 8), foreground="gray")
|
||
disabled_label.grid(row=1, column=0, columnspan=2, sticky=tk.W, pady=(2, 0))
|
||
else:
|
||
print("DEBUG: Search functionality enabled - multiple people found")
|
||
# Normal helper label when search is enabled
|
||
last_name_label = ttk.Label(search_frame, text="Type Last Name", font=("Arial", 8), foreground="gray")
|
||
last_name_label.grid(row=1, column=0, sticky=tk.W, pady=(2, 0))
|
||
|
||
# Matched person info
|
||
matched_info_label = ttk.Label(left_frame, text="", font=("Arial", 10, "bold"))
|
||
matched_info_label.grid(row=2, column=0, pady=(0, 10), sticky=tk.W)
|
||
|
||
# Matched person image
|
||
style = ttk.Style()
|
||
canvas_bg_color = style.lookup('TFrame', 'background') or '#d9d9d9'
|
||
matched_canvas = tk.Canvas(left_frame, width=300, height=300, bg=canvas_bg_color, highlightthickness=0)
|
||
matched_canvas.grid(row=3, column=0, pady=(0, 10))
|
||
|
||
# Save button for this person (will be created after function definitions)
|
||
save_btn = None
|
||
|
||
# Matches scrollable frame
|
||
matches_frame = ttk.Frame(right_frame)
|
||
matches_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
|
||
# Control buttons for matches (Select All / Clear All)
|
||
matches_controls_frame = ttk.Frame(matches_frame)
|
||
matches_controls_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=5, pady=(5, 0))
|
||
|
||
def select_all_matches():
|
||
"""Select all match checkboxes"""
|
||
for var in match_vars:
|
||
var.set(True)
|
||
|
||
def clear_all_matches():
|
||
"""Clear all match checkboxes"""
|
||
for var in match_vars:
|
||
var.set(False)
|
||
|
||
select_all_matches_btn = ttk.Button(matches_controls_frame, text="☑️ Select All", command=select_all_matches)
|
||
select_all_matches_btn.pack(side=tk.LEFT, padx=(0, 5))
|
||
|
||
clear_all_matches_btn = ttk.Button(matches_controls_frame, text="☐ Clear All", command=clear_all_matches)
|
||
clear_all_matches_btn.pack(side=tk.LEFT)
|
||
|
||
def update_match_control_buttons_state():
|
||
"""Enable/disable Select All / Clear All based on matches presence"""
|
||
if match_vars:
|
||
select_all_matches_btn.config(state='normal')
|
||
clear_all_matches_btn.config(state='normal')
|
||
else:
|
||
select_all_matches_btn.config(state='disabled')
|
||
clear_all_matches_btn.config(state='disabled')
|
||
|
||
# Create scrollbar for matches
|
||
scrollbar = ttk.Scrollbar(right_frame, orient="vertical", command=None)
|
||
scrollbar.grid(row=0, column=1, rowspan=1, sticky=(tk.N, tk.S))
|
||
|
||
# Create canvas for matches with scrollbar
|
||
style = ttk.Style()
|
||
canvas_bg_color = style.lookup('TFrame', 'background') or '#d9d9d9'
|
||
matches_canvas = tk.Canvas(matches_frame, yscrollcommand=scrollbar.set, bg=canvas_bg_color, highlightthickness=0)
|
||
matches_canvas.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
scrollbar.config(command=matches_canvas.yview)
|
||
|
||
# Configure grid weights
|
||
right_frame.columnconfigure(0, weight=1)
|
||
right_frame.rowconfigure(0, weight=1)
|
||
matches_frame.columnconfigure(0, weight=1)
|
||
matches_frame.rowconfigure(0, weight=0) # Controls row takes minimal space
|
||
matches_frame.rowconfigure(1, weight=1) # Canvas row expandable
|
||
|
||
# Control buttons (navigation only)
|
||
control_frame = ttk.Frame(main_frame)
|
||
control_frame.grid(row=1, column=0, columnspan=2, pady=(10, 0))
|
||
|
||
# Button commands
|
||
current_matched_index = 0
|
||
matched_ids = [person_id for person_id, _, _ in person_faces_list if person_id in matches_by_matched and matches_by_matched[person_id]]
|
||
filtered_matched_ids = None # filtered subset based on last name search
|
||
|
||
match_checkboxes = []
|
||
match_vars = []
|
||
identified_faces_per_person = {} # Track which faces were identified for each person
|
||
checkbox_states_per_person = {} # Track checkbox states for each person (unsaved selections)
|
||
original_checkbox_states_per_person = {} # Track original/default checkbox states for comparison
|
||
|
||
def on_confirm_matches():
|
||
nonlocal identified_count, current_matched_index, identified_faces_per_person
|
||
if current_matched_index < len(matched_ids):
|
||
matched_id = matched_ids[current_matched_index]
|
||
matches_for_this_person = matches_by_matched[matched_id]
|
||
|
||
# Initialize identified faces for this person if not exists
|
||
if matched_id not in identified_faces_per_person:
|
||
identified_faces_per_person[matched_id] = set()
|
||
|
||
with self.get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# Process all matches (both checked and unchecked)
|
||
for i, (match, var) in enumerate(zip(matches_for_this_person, match_vars)):
|
||
if var.get():
|
||
# Face is checked - assign to person
|
||
cursor.execute(
|
||
'UPDATE faces SET person_id = ? WHERE id = ?',
|
||
(match['person_id'], match['unidentified_id'])
|
||
)
|
||
|
||
# Use cached person name instead of database query
|
||
person_details = data_cache['person_details'].get(match['person_id'], {})
|
||
person_name = person_details.get('full_name', "Unknown")
|
||
|
||
# Track this face as identified for this person
|
||
identified_faces_per_person[matched_id].add(match['unidentified_id'])
|
||
|
||
print(f"✅ Identified as: {person_name}")
|
||
identified_count += 1
|
||
else:
|
||
# Face is unchecked - check if it was previously identified for this person
|
||
if match['unidentified_id'] in identified_faces_per_person[matched_id]:
|
||
# This face was previously identified for this person, now unchecking it
|
||
cursor.execute(
|
||
'UPDATE faces SET person_id = NULL WHERE id = ?',
|
||
(match['unidentified_id'],)
|
||
)
|
||
|
||
# Remove from identified faces for this person
|
||
identified_faces_per_person[matched_id].discard(match['unidentified_id'])
|
||
|
||
print(f"❌ Unidentified: {match['unidentified_filename']}")
|
||
|
||
# Update person encodings for all affected persons after database transaction is complete
|
||
for person_id in set(match['person_id'] for match in matches_for_this_person if match['person_id']):
|
||
self._update_person_encodings(person_id)
|
||
|
||
# After saving, set original states to the current UI states so there are no unsaved changes
|
||
current_snapshot = {}
|
||
for match, var in zip(matches_for_this_person, match_vars):
|
||
unique_key = f"{matched_id}_{match['unidentified_id']}"
|
||
current_snapshot[unique_key] = var.get()
|
||
checkbox_states_per_person[matched_id] = dict(current_snapshot)
|
||
original_checkbox_states_per_person[matched_id] = dict(current_snapshot)
|
||
|
||
def on_skip_current():
|
||
nonlocal current_matched_index
|
||
# Save current checkbox states before navigating away
|
||
save_current_checkbox_states()
|
||
current_matched_index += 1
|
||
active_ids = filtered_matched_ids if filtered_matched_ids is not None else matched_ids
|
||
if current_matched_index < len(active_ids):
|
||
update_display()
|
||
else:
|
||
finish_auto_match()
|
||
|
||
def on_go_back():
|
||
nonlocal current_matched_index
|
||
if current_matched_index > 0:
|
||
# Save current checkbox states before navigating away
|
||
save_current_checkbox_states()
|
||
current_matched_index -= 1
|
||
update_display()
|
||
|
||
def has_unsaved_changes():
|
||
"""Check if there are any unsaved changes by comparing current states with original states"""
|
||
for person_id, current_states in checkbox_states_per_person.items():
|
||
if person_id in original_checkbox_states_per_person:
|
||
original_states = original_checkbox_states_per_person[person_id]
|
||
# Check if any checkbox state differs from its original state
|
||
for key, current_value in current_states.items():
|
||
if key not in original_states or original_states[key] != current_value:
|
||
return True
|
||
else:
|
||
# If person has current states but no original states, there are changes
|
||
if any(current_states.values()):
|
||
return True
|
||
return False
|
||
|
||
def apply_last_name_filter():
|
||
"""Filter people by last name and update navigation"""
|
||
nonlocal filtered_matched_ids, current_matched_index
|
||
query = last_name_search_var.get().strip().lower()
|
||
if query:
|
||
# Filter person_faces_list by last name
|
||
filtered_people = []
|
||
for person_id, face, person_name in person_faces_list:
|
||
# Extract last name from person_name (format: "Last, First")
|
||
if ',' in person_name:
|
||
last_name = person_name.split(',')[0].strip().lower()
|
||
else:
|
||
last_name = person_name.strip().lower()
|
||
|
||
if query in last_name:
|
||
filtered_people.append((person_id, face, person_name))
|
||
|
||
# Get filtered matched_ids
|
||
filtered_matched_ids = [person_id for person_id, _, _ in filtered_people if person_id in matches_by_matched and matches_by_matched[person_id]]
|
||
else:
|
||
filtered_matched_ids = None
|
||
|
||
# Reset to first person in filtered list
|
||
current_matched_index = 0
|
||
if filtered_matched_ids:
|
||
update_display()
|
||
else:
|
||
# No matches - clear display
|
||
matched_info_label.config(text="No people match filter")
|
||
matched_canvas.delete("all")
|
||
matched_canvas.create_text(150, 150, text="No matches found", fill="gray")
|
||
matches_canvas.delete("all")
|
||
update_button_states()
|
||
|
||
def clear_last_name_filter():
|
||
"""Clear filter and show all people"""
|
||
nonlocal filtered_matched_ids, current_matched_index
|
||
last_name_search_var.set("")
|
||
filtered_matched_ids = None
|
||
current_matched_index = 0
|
||
update_display()
|
||
|
||
def on_quit_auto_match():
|
||
nonlocal window_destroyed
|
||
|
||
# Check for unsaved changes before quitting
|
||
if has_unsaved_changes():
|
||
# Show warning dialog with custom width
|
||
from tkinter import messagebox
|
||
|
||
# Create a custom dialog for better width control
|
||
dialog = tk.Toplevel(root)
|
||
dialog.title("Unsaved Changes")
|
||
dialog.geometry("500x250")
|
||
dialog.resizable(True, True)
|
||
dialog.transient(root)
|
||
dialog.grab_set()
|
||
|
||
# Center the dialog
|
||
dialog.geometry("+%d+%d" % (root.winfo_rootx() + 50, root.winfo_rooty() + 50))
|
||
|
||
# Main message
|
||
message_frame = ttk.Frame(dialog, padding="20")
|
||
message_frame.pack(fill=tk.BOTH, expand=True)
|
||
|
||
# Warning icon and text
|
||
icon_label = ttk.Label(message_frame, text="⚠️", font=("Arial", 16))
|
||
icon_label.pack(anchor=tk.W)
|
||
|
||
main_text = ttk.Label(message_frame,
|
||
text="You have unsaved changes that will be lost if you quit.",
|
||
font=("Arial", 10))
|
||
main_text.pack(anchor=tk.W, pady=(5, 10))
|
||
|
||
# Options
|
||
options_text = ttk.Label(message_frame,
|
||
text="• Yes: Save current changes and quit\n"
|
||
"• No: Quit without saving\n"
|
||
"• Cancel: Return to auto-match",
|
||
font=("Arial", 9))
|
||
options_text.pack(anchor=tk.W, pady=(0, 10))
|
||
|
||
|
||
# Buttons
|
||
button_frame = ttk.Frame(dialog)
|
||
button_frame.pack(fill=tk.X, padx=20, pady=(0, 20))
|
||
|
||
result = None
|
||
|
||
def on_yes():
|
||
nonlocal result
|
||
result = True
|
||
dialog.destroy()
|
||
|
||
def on_no():
|
||
nonlocal result
|
||
result = False
|
||
dialog.destroy()
|
||
|
||
def on_cancel():
|
||
nonlocal result
|
||
result = None
|
||
dialog.destroy()
|
||
|
||
yes_btn = ttk.Button(button_frame, text="Yes", command=on_yes)
|
||
no_btn = ttk.Button(button_frame, text="No", command=on_no)
|
||
cancel_btn = ttk.Button(button_frame, text="Cancel", command=on_cancel)
|
||
|
||
yes_btn.pack(side=tk.LEFT, padx=(0, 5))
|
||
no_btn.pack(side=tk.LEFT, padx=5)
|
||
cancel_btn.pack(side=tk.RIGHT, padx=(5, 0))
|
||
|
||
# Wait for dialog to close
|
||
dialog.wait_window()
|
||
|
||
if result is None: # Cancel - don't quit
|
||
return
|
||
elif result: # Yes - save changes first
|
||
# Save current checkbox states before quitting
|
||
save_current_checkbox_states()
|
||
# Note: We don't actually save to database here, just preserve the states
|
||
# The user would need to click Save button for each person to persist changes
|
||
print("⚠️ Warning: Changes are preserved but not saved to database.")
|
||
print(" Click 'Save Changes' button for each person to persist changes.")
|
||
|
||
if not window_destroyed:
|
||
window_destroyed = True
|
||
try:
|
||
root.destroy()
|
||
except tk.TclError:
|
||
pass # Window already destroyed
|
||
|
||
def finish_auto_match():
|
||
nonlocal window_destroyed
|
||
print(f"\n✅ Auto-identified {identified_count} faces")
|
||
if not window_destroyed:
|
||
window_destroyed = True
|
||
try:
|
||
root.destroy()
|
||
except tk.TclError:
|
||
pass # Window already destroyed
|
||
|
||
# Create button references for state management
|
||
back_btn = ttk.Button(control_frame, text="⏮️ Back", command=on_go_back)
|
||
next_btn = ttk.Button(control_frame, text="⏭️ Next", command=on_skip_current)
|
||
quit_btn = ttk.Button(control_frame, text="❌ Quit", command=on_quit_auto_match)
|
||
|
||
back_btn.grid(row=0, column=0, padx=(0, 5))
|
||
next_btn.grid(row=0, column=1, padx=5)
|
||
quit_btn.grid(row=0, column=2, padx=(5, 0))
|
||
|
||
# Create save button now that functions are defined
|
||
save_btn = ttk.Button(left_frame, text="💾 Save Changes", command=on_confirm_matches)
|
||
save_btn.grid(row=4, column=0, pady=(0, 10), sticky=(tk.W, tk.E))
|
||
|
||
def update_button_states():
|
||
"""Update button states based on current position"""
|
||
active_ids = filtered_matched_ids if filtered_matched_ids is not None else matched_ids
|
||
# Enable/disable Back button based on position
|
||
if current_matched_index > 0:
|
||
back_btn.config(state='normal')
|
||
else:
|
||
back_btn.config(state='disabled')
|
||
|
||
# Enable/disable Next button based on position
|
||
if current_matched_index < len(active_ids) - 1:
|
||
next_btn.config(state='normal')
|
||
else:
|
||
next_btn.config(state='disabled')
|
||
|
||
def update_save_button_text():
|
||
"""Update save button text with current person name"""
|
||
active_ids = filtered_matched_ids if filtered_matched_ids is not None else matched_ids
|
||
if current_matched_index < len(active_ids):
|
||
matched_id = active_ids[current_matched_index]
|
||
# Get person name from the first match for this person
|
||
matches_for_current_person = matches_by_matched[matched_id]
|
||
if matches_for_current_person:
|
||
person_id = matches_for_current_person[0]['person_id']
|
||
# Use cached person name instead of database query
|
||
person_details = data_cache['person_details'].get(person_id, {})
|
||
person_name = person_details.get('full_name', "Unknown")
|
||
save_btn.config(text=f"💾 Save changes for {person_name}")
|
||
else:
|
||
save_btn.config(text="💾 Save Changes")
|
||
else:
|
||
save_btn.config(text="💾 Save Changes")
|
||
|
||
def save_current_checkbox_states():
|
||
"""Save current checkbox states for the current person.
|
||
Note: Do NOT modify original states here to avoid false positives
|
||
when a user toggles and reverts a checkbox.
|
||
"""
|
||
if current_matched_index < len(matched_ids) and match_vars:
|
||
current_matched_id = matched_ids[current_matched_index]
|
||
matches_for_current_person = matches_by_matched[current_matched_id]
|
||
|
||
if len(match_vars) == len(matches_for_current_person):
|
||
if current_matched_id not in checkbox_states_per_person:
|
||
checkbox_states_per_person[current_matched_id] = {}
|
||
|
||
# Save current checkbox states for this person
|
||
for i, (match, var) in enumerate(zip(matches_for_current_person, match_vars)):
|
||
unique_key = f"{current_matched_id}_{match['unidentified_id']}"
|
||
current_value = var.get()
|
||
checkbox_states_per_person[current_matched_id][unique_key] = current_value
|
||
|
||
if self.verbose >= 2:
|
||
print(f"DEBUG: Saved state for person {current_matched_id}, face {match['unidentified_id']}: {current_value}")
|
||
|
||
def update_display():
|
||
nonlocal current_matched_index
|
||
active_ids = filtered_matched_ids if filtered_matched_ids is not None else matched_ids
|
||
if current_matched_index >= len(active_ids):
|
||
finish_auto_match()
|
||
return
|
||
|
||
matched_id = active_ids[current_matched_index]
|
||
matches_for_this_person = matches_by_matched[matched_id]
|
||
|
||
# Update button states
|
||
update_button_states()
|
||
|
||
# Update save button text with person name
|
||
update_save_button_text()
|
||
|
||
# Update title
|
||
active_ids = filtered_matched_ids if filtered_matched_ids is not None else matched_ids
|
||
root.title(f"Auto-Match Face Identification - {current_matched_index + 1}/{len(active_ids)}")
|
||
|
||
# Get the first match to get matched person info
|
||
if not matches_for_this_person:
|
||
print(f"❌ Error: No matches found for current person {matched_id}")
|
||
# No items on the right panel – disable Select All / Clear All
|
||
match_checkboxes.clear()
|
||
match_vars.clear()
|
||
update_match_control_buttons_state()
|
||
# Skip to next person if available
|
||
if current_matched_index < len(matched_ids) - 1:
|
||
current_matched_index += 1
|
||
update_display()
|
||
else:
|
||
finish_auto_match()
|
||
return
|
||
|
||
first_match = matches_for_this_person[0]
|
||
|
||
# Use cached data instead of database queries
|
||
person_details = data_cache['person_details'].get(first_match['person_id'], {})
|
||
person_name = person_details.get('full_name', "Unknown")
|
||
date_of_birth = person_details.get('date_of_birth', '')
|
||
matched_photo_path = data_cache['photo_paths'].get(first_match['matched_photo_id'], None)
|
||
|
||
# Create detailed person info display
|
||
person_info_lines = [f"👤 Person: {person_name}"]
|
||
if date_of_birth:
|
||
person_info_lines.append(f"📅 Born: {date_of_birth}")
|
||
person_info_lines.extend([
|
||
f"📁 Photo: {first_match['matched_filename']}",
|
||
f"📍 Face location: {first_match['matched_location']}"
|
||
])
|
||
|
||
# Update matched person info
|
||
matched_info_label.config(text="\n".join(person_info_lines))
|
||
|
||
# Display matched person face
|
||
matched_canvas.delete("all")
|
||
if show_faces:
|
||
matched_crop_path = self._extract_face_crop(
|
||
matched_photo_path,
|
||
first_match['matched_location'],
|
||
f"matched_{first_match['person_id']}"
|
||
)
|
||
|
||
if matched_crop_path and os.path.exists(matched_crop_path):
|
||
try:
|
||
pil_image = Image.open(matched_crop_path)
|
||
pil_image.thumbnail((300, 300), Image.Resampling.LANCZOS)
|
||
photo = ImageTk.PhotoImage(pil_image)
|
||
matched_canvas.create_image(150, 150, image=photo)
|
||
matched_canvas.image = photo
|
||
|
||
# Add photo icon to the matched person face - exactly in corner
|
||
# Use actual image dimensions instead of assuming 300x300
|
||
actual_width, actual_height = pil_image.size
|
||
self._create_photo_icon(matched_canvas, matched_photo_path, icon_size=20,
|
||
face_x=150, face_y=150,
|
||
face_width=actual_width, face_height=actual_height,
|
||
canvas_width=300, canvas_height=300)
|
||
except Exception as e:
|
||
matched_canvas.create_text(150, 150, text=f"❌ Could not load image: {e}", fill="red")
|
||
else:
|
||
matched_canvas.create_text(150, 150, text="🖼️ No face crop available", fill="gray")
|
||
|
||
# Clear and populate unidentified faces
|
||
matches_canvas.delete("all")
|
||
match_checkboxes.clear()
|
||
match_vars.clear()
|
||
update_match_control_buttons_state()
|
||
|
||
# Create frame for unidentified faces inside canvas
|
||
matches_inner_frame = ttk.Frame(matches_canvas)
|
||
matches_canvas.create_window((0, 0), window=matches_inner_frame, anchor="nw")
|
||
|
||
# Use cached photo paths instead of database queries
|
||
photo_paths = data_cache['photo_paths']
|
||
|
||
# Create all checkboxes
|
||
for i, match in enumerate(matches_for_this_person):
|
||
# Get unidentified face info from cached data
|
||
unidentified_photo_path = photo_paths.get(match['unidentified_photo_id'], '')
|
||
|
||
# Calculate confidence
|
||
confidence_pct = (1 - match['distance']) * 100
|
||
confidence_desc = self._get_confidence_description(confidence_pct)
|
||
|
||
# Create match frame
|
||
match_frame = ttk.Frame(matches_inner_frame)
|
||
match_frame.grid(row=i, column=0, sticky=(tk.W, tk.E), pady=5)
|
||
|
||
# Checkbox for this match
|
||
match_var = tk.BooleanVar()
|
||
|
||
# Restore previous checkbox state if available
|
||
unique_key = f"{matched_id}_{match['unidentified_id']}"
|
||
if matched_id in checkbox_states_per_person and unique_key in checkbox_states_per_person[matched_id]:
|
||
saved_state = checkbox_states_per_person[matched_id][unique_key]
|
||
match_var.set(saved_state)
|
||
if self.verbose >= 2:
|
||
print(f"DEBUG: Restored state for person {matched_id}, face {match['unidentified_id']}: {saved_state}")
|
||
# Otherwise, pre-select if this face was previously identified for this person
|
||
elif matched_id in identified_faces_per_person and match['unidentified_id'] in identified_faces_per_person[matched_id]:
|
||
match_var.set(True)
|
||
if self.verbose >= 2:
|
||
print(f"DEBUG: Pre-selected identified face for person {matched_id}, face {match['unidentified_id']}")
|
||
|
||
match_vars.append(match_var)
|
||
|
||
# Capture original state at render time (once per person per face)
|
||
if matched_id not in original_checkbox_states_per_person:
|
||
original_checkbox_states_per_person[matched_id] = {}
|
||
if unique_key not in original_checkbox_states_per_person[matched_id]:
|
||
original_checkbox_states_per_person[matched_id][unique_key] = match_var.get()
|
||
|
||
# Add callback to save state immediately when checkbox changes
|
||
def on_checkbox_change(var, person_id, face_id):
|
||
unique_key = f"{person_id}_{face_id}"
|
||
if person_id not in checkbox_states_per_person:
|
||
checkbox_states_per_person[person_id] = {}
|
||
|
||
current_value = var.get()
|
||
checkbox_states_per_person[person_id][unique_key] = current_value
|
||
|
||
if self.verbose >= 2:
|
||
print(f"DEBUG: Checkbox changed for person {person_id}, face {face_id}: {current_value}")
|
||
|
||
# Bind the callback to the variable
|
||
match_var.trace('w', lambda *args: on_checkbox_change(match_var, matched_id, match['unidentified_id']))
|
||
|
||
# Configure match frame for grid layout
|
||
match_frame.columnconfigure(0, weight=0) # Checkbox column - fixed width
|
||
match_frame.columnconfigure(1, weight=1) # Text column - expandable
|
||
match_frame.columnconfigure(2, weight=0) # Image column - fixed width
|
||
|
||
# Checkbox without text
|
||
checkbox = ttk.Checkbutton(match_frame, variable=match_var)
|
||
checkbox.grid(row=0, column=0, rowspan=2, sticky=(tk.W, tk.N), padx=(0, 5))
|
||
match_checkboxes.append(checkbox)
|
||
|
||
# Create labels for confidence and filename
|
||
confidence_label = ttk.Label(match_frame, text=f"{confidence_pct:.1f}% {confidence_desc}", font=("Arial", 9, "bold"))
|
||
confidence_label.grid(row=0, column=1, sticky=tk.W, padx=(0, 10))
|
||
|
||
filename_label = ttk.Label(match_frame, text=f"📁 {match['unidentified_filename']}", font=("Arial", 8), foreground="gray")
|
||
filename_label.grid(row=1, column=1, sticky=tk.W, padx=(0, 10))
|
||
|
||
# Unidentified face image
|
||
if show_faces:
|
||
style = ttk.Style()
|
||
canvas_bg_color = style.lookup('TFrame', 'background') or '#d9d9d9'
|
||
match_canvas = tk.Canvas(match_frame, width=100, height=100, bg=canvas_bg_color, highlightthickness=0)
|
||
match_canvas.grid(row=0, column=2, rowspan=2, sticky=(tk.W, tk.N), padx=(10, 0))
|
||
|
||
unidentified_crop_path = self._extract_face_crop(
|
||
unidentified_photo_path,
|
||
match['unidentified_location'],
|
||
f"unid_{match['unidentified_id']}"
|
||
)
|
||
|
||
if unidentified_crop_path and os.path.exists(unidentified_crop_path):
|
||
try:
|
||
pil_image = Image.open(unidentified_crop_path)
|
||
pil_image.thumbnail((100, 100), Image.Resampling.LANCZOS)
|
||
photo = ImageTk.PhotoImage(pil_image)
|
||
match_canvas.create_image(50, 50, image=photo)
|
||
match_canvas.image = photo
|
||
|
||
# Add photo icon to the unidentified face
|
||
self._create_photo_icon(match_canvas, unidentified_photo_path, icon_size=15,
|
||
face_x=50, face_y=50,
|
||
face_width=100, face_height=100,
|
||
canvas_width=100, canvas_height=100)
|
||
except Exception as e:
|
||
match_canvas.create_text(50, 50, text="❌", fill="red")
|
||
else:
|
||
match_canvas.create_text(50, 50, text="🖼️", fill="gray")
|
||
|
||
# Update Select All / Clear All button states after populating
|
||
update_match_control_buttons_state()
|
||
|
||
# Update scroll region
|
||
matches_canvas.update_idletasks()
|
||
matches_canvas.configure(scrollregion=matches_canvas.bbox("all"))
|
||
|
||
# Show the window
|
||
try:
|
||
root.deiconify()
|
||
root.lift()
|
||
root.focus_force()
|
||
except tk.TclError:
|
||
# Window was destroyed before we could show it
|
||
return 0
|
||
|
||
# Wire up search controls now that helper functions exist
|
||
try:
|
||
search_btn.config(command=lambda: apply_last_name_filter())
|
||
clear_btn.config(command=lambda: clear_last_name_filter())
|
||
search_entry.bind('<Return>', lambda e: apply_last_name_filter())
|
||
except Exception:
|
||
pass
|
||
|
||
# Start with first matched person
|
||
update_display()
|
||
|
||
# Main event loop
|
||
try:
|
||
root.mainloop()
|
||
except tk.TclError:
|
||
pass # Window was destroyed
|
||
|
||
return identified_count
|
||
|
||
# Tag management methods (delegated)
|
||
def add_tags(self, photo_pattern: str = None, batch_size: int = DEFAULT_BATCH_SIZE) -> int:
|
||
"""Add custom tags to photos"""
|
||
return self.tag_manager.add_tags_to_photos(photo_pattern, batch_size)
|
||
|
||
def _deduplicate_tags(self, tag_list):
|
||
"""Remove duplicate tags from a list (legacy compatibility)"""
|
||
return self.tag_manager.deduplicate_tags(tag_list)
|
||
|
||
def _parse_tags_string(self, tags_string):
|
||
"""Parse a comma-separated tags string (legacy compatibility)"""
|
||
return self.tag_manager.parse_tags_string(tags_string)
|
||
|
||
def _get_tag_id_by_name(self, tag_name, tag_name_to_id_map):
|
||
"""Get tag ID by name (legacy compatibility)"""
|
||
return self.db.get_tag_id_by_name(tag_name, tag_name_to_id_map)
|
||
|
||
def _get_tag_name_by_id(self, tag_id, tag_id_to_name_map):
|
||
"""Get tag name by ID (legacy compatibility)"""
|
||
return self.db.get_tag_name_by_id(tag_id, tag_id_to_name_map)
|
||
|
||
def _load_tag_mappings(self):
|
||
"""Load tag name to ID and ID to name mappings (legacy compatibility)"""
|
||
return self.db.load_tag_mappings()
|
||
|
||
def _get_existing_tag_ids_for_photo(self, photo_id):
|
||
"""Get list of tag IDs for a photo (legacy compatibility)"""
|
||
return self.db.get_existing_tag_ids_for_photo(photo_id)
|
||
|
||
def _show_people_list(self, cursor=None):
|
||
"""Show list of people in database (legacy compatibility)"""
|
||
return self.db.show_people_list(cursor)
|
||
|
||
# Search and statistics methods (delegated)
|
||
def search_faces(self, person_name: str):
|
||
"""Search for photos containing a specific person"""
|
||
return self.search_stats.search_faces(person_name)
|
||
|
||
def stats(self):
|
||
"""Show database statistics"""
|
||
return self.search_stats.print_statistics()
|
||
|
||
# GUI methods (legacy compatibility - these would need to be implemented)
|
||
def identify_faces(self, batch_size: int = DEFAULT_BATCH_SIZE, show_faces: bool = False, tolerance: float = DEFAULT_FACE_TOLERANCE,
|
||
date_from: str = None, date_to: str = None, date_processed_from: str = None, date_processed_to: str = None) -> int:
|
||
"""Interactive face identification with GUI"""
|
||
return self.identify_gui.identify_faces(batch_size, show_faces, tolerance,
|
||
date_from, date_to, date_processed_from, date_processed_to)
|
||
|
||
def tag_management(self) -> int:
|
||
"""Tag management GUI"""
|
||
print("⚠️ Tag management GUI not yet implemented in refactored version")
|
||
return 0
|
||
|
||
def modifyidentified(self) -> int:
|
||
"""Modify identified faces GUI"""
|
||
print("⚠️ Face modification GUI not yet implemented in refactored version")
|
||
return 0
|
||
|
||
def _setup_window_size_saving(self, root, config_file="gui_config.json"):
|
||
"""Set up window size saving functionality (legacy compatibility)"""
|
||
return self.gui_core.setup_window_size_saving(root, config_file)
|
||
|
||
def _display_similar_faces_in_panel(self, parent_frame, similar_faces_data, face_vars, face_images, face_crops, current_face_id=None, face_selection_states=None, data_cache=None):
|
||
"""Display similar faces in panel (legacy compatibility)"""
|
||
print("⚠️ Similar faces panel not yet implemented in refactored version")
|
||
return None
|
||
|
||
def _create_photo_icon(self, canvas, photo_path, icon_size=20, icon_x=None, icon_y=None, callback=None):
|
||
"""Create a small photo icon on a canvas (legacy compatibility)"""
|
||
return self.gui_core.create_photo_icon(canvas, photo_path, icon_size, icon_x, icon_y, callback)
|
||
|
||
def _get_confidence_description(self, confidence_pct: float) -> str:
|
||
"""Get human-readable confidence description (legacy compatibility)"""
|
||
return self.face_processor._get_confidence_description(confidence_pct)
|
||
|
||
# Cache management (legacy compatibility)
|
||
def _clear_caches(self):
|
||
"""Clear all caches to free memory (legacy compatibility)"""
|
||
self.face_processor._clear_caches()
|
||
|
||
def _cleanup_face_crops(self, current_face_crop_path=None):
|
||
"""Clean up face crop files and caches (legacy compatibility)"""
|
||
self.face_processor.cleanup_face_crops(current_face_crop_path)
|
||
|
||
@property
|
||
def _face_encoding_cache(self):
|
||
"""Face encoding cache (legacy compatibility)"""
|
||
return self.face_processor._face_encoding_cache
|
||
|
||
@property
|
||
def _image_cache(self):
|
||
"""Image cache (legacy compatibility)"""
|
||
return self.face_processor._image_cache
|
||
|
||
def _get_filtered_similar_faces(self, face_id: int, tolerance: float, include_same_photo: bool = False, face_status: dict = None) -> List[Dict]:
|
||
"""Get similar faces with consistent filtering and sorting logic used by both auto-match and identify"""
|
||
# Find similar faces using the core function
|
||
similar_faces_data = self.find_similar_faces(face_id, tolerance=tolerance, include_same_photo=include_same_photo)
|
||
|
||
# Filter to only show unidentified faces with confidence filtering
|
||
filtered_faces = []
|
||
for face in similar_faces_data:
|
||
# For auto-match: only filter by database state (keep existing behavior)
|
||
# For identify: also filter by current session state
|
||
is_identified_in_db = face.get('person_id') is not None
|
||
is_identified_in_session = face_status and face.get('face_id') in face_status and face_status[face.get('face_id')] == 'identified'
|
||
|
||
# If face_status is provided (identify mode), use both filters
|
||
# If face_status is None (auto-match mode), only use database filter
|
||
if face_status is not None:
|
||
# Identify mode: filter out both database and session identified faces
|
||
if not is_identified_in_db and not is_identified_in_session:
|
||
# Calculate confidence percentage
|
||
confidence_pct = (1 - face['distance']) * 100
|
||
|
||
# Only include matches with reasonable confidence (at least 40%)
|
||
if confidence_pct >= 40:
|
||
filtered_faces.append(face)
|
||
else:
|
||
# Auto-match mode: only filter by database state (keep existing behavior)
|
||
if not is_identified_in_db:
|
||
# Calculate confidence percentage
|
||
confidence_pct = (1 - face['distance']) * 100
|
||
|
||
# Only include matches with reasonable confidence (at least 40%)
|
||
if confidence_pct >= 40:
|
||
filtered_faces.append(face)
|
||
|
||
# Sort by confidence (distance) - highest confidence first
|
||
filtered_faces.sort(key=lambda x: x['distance'])
|
||
|
||
return filtered_faces
|
||
|
||
def _get_confidence_description(self, confidence_pct: float) -> str:
|
||
"""Get human-readable confidence description"""
|
||
if confidence_pct >= 80:
|
||
return "🟢 (Very High - Almost Certain)"
|
||
elif confidence_pct >= 70:
|
||
return "🟡 (High - Likely Match)"
|
||
elif confidence_pct >= 60:
|
||
return "🟠 (Medium - Possible Match)"
|
||
elif confidence_pct >= 50:
|
||
return "🔴 (Low - Questionable)"
|
||
else:
|
||
return "⚫ (Very Low - Unlikely)"
|
||
|
||
def _extract_face_crop(self, photo_path: str, location: tuple, face_id: int) -> str:
|
||
"""Extract and save individual face crop for identification with caching"""
|
||
import tempfile
|
||
from PIL import Image
|
||
|
||
try:
|
||
# Check cache first
|
||
cache_key = f"{photo_path}_{location}_{face_id}"
|
||
if cache_key in self._image_cache:
|
||
cached_path = self._image_cache[cache_key]
|
||
# Verify the cached file still exists
|
||
if os.path.exists(cached_path):
|
||
return cached_path
|
||
else:
|
||
# Remove from cache if file doesn't exist
|
||
del self._image_cache[cache_key]
|
||
|
||
# Parse location tuple from string format
|
||
if isinstance(location, str):
|
||
location = eval(location)
|
||
|
||
top, right, bottom, left = location
|
||
|
||
# Load the image
|
||
image = Image.open(photo_path)
|
||
|
||
# Add padding around the face (20% of face size)
|
||
face_width = right - left
|
||
face_height = bottom - top
|
||
padding_x = int(face_width * 0.2)
|
||
padding_y = int(face_height * 0.2)
|
||
|
||
# Calculate crop bounds with padding
|
||
crop_left = max(0, left - padding_x)
|
||
crop_top = max(0, top - padding_y)
|
||
crop_right = min(image.width, right + padding_x)
|
||
crop_bottom = min(image.height, bottom + padding_y)
|
||
|
||
# Crop the face
|
||
face_crop = image.crop((crop_left, crop_top, crop_right, crop_bottom))
|
||
|
||
# Create temporary file for the face crop
|
||
temp_dir = tempfile.gettempdir()
|
||
face_filename = f"face_{face_id}_crop.jpg"
|
||
face_path = os.path.join(temp_dir, face_filename)
|
||
|
||
# Resize for better viewing (minimum 200px width)
|
||
if face_crop.width < 200:
|
||
ratio = 200 / face_crop.width
|
||
new_width = 200
|
||
new_height = int(face_crop.height * ratio)
|
||
face_crop = face_crop.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||
|
||
face_crop.save(face_path, "JPEG", quality=95)
|
||
|
||
# Cache the result
|
||
self._image_cache[cache_key] = face_path
|
||
return face_path
|
||
|
||
except Exception as e:
|
||
if self.verbose >= 1:
|
||
print(f"⚠️ Could not extract face crop: {e}")
|
||
return None
|
||
|
||
def _create_photo_icon(self, canvas, photo_path, icon_size=20, icon_x=None, icon_y=None,
|
||
canvas_width=None, canvas_height=None, face_x=None, face_y=None,
|
||
face_width=None, face_height=None):
|
||
"""Create a reusable photo icon with tooltip on a canvas"""
|
||
import tkinter as tk
|
||
import subprocess
|
||
import platform
|
||
import os
|
||
|
||
def open_source_photo(event):
|
||
"""Open the source photo in a properly sized window"""
|
||
try:
|
||
system = platform.system()
|
||
if system == "Windows":
|
||
# Try to open with a specific image viewer that supports window sizing
|
||
try:
|
||
subprocess.run(["mspaint", photo_path], check=False)
|
||
except:
|
||
os.startfile(photo_path)
|
||
elif system == "Darwin": # macOS
|
||
# Use Preview with specific window size
|
||
subprocess.run(["open", "-a", "Preview", photo_path])
|
||
else: # Linux and others
|
||
# Try common image viewers with window sizing options
|
||
viewers_to_try = [
|
||
["eog", "--new-window", photo_path], # Eye of GNOME
|
||
["gwenview", photo_path], # KDE image viewer
|
||
["feh", "--geometry", "800x600", photo_path], # feh with specific size
|
||
["gimp", photo_path], # GIMP
|
||
["xdg-open", photo_path] # Fallback to default
|
||
]
|
||
|
||
opened = False
|
||
for viewer_cmd in viewers_to_try:
|
||
try:
|
||
result = subprocess.run(viewer_cmd, check=False, capture_output=True)
|
||
if result.returncode == 0:
|
||
opened = True
|
||
break
|
||
except:
|
||
continue
|
||
|
||
if not opened:
|
||
# Final fallback
|
||
subprocess.run(["xdg-open", photo_path])
|
||
except Exception as e:
|
||
print(f"❌ Could not open photo: {e}")
|
||
|
||
# Create tooltip for the icon
|
||
tooltip = None
|
||
|
||
def show_tooltip(event):
|
||
nonlocal tooltip
|
||
if tooltip:
|
||
tooltip.destroy()
|
||
tooltip = tk.Toplevel()
|
||
tooltip.wm_overrideredirect(True)
|
||
tooltip.wm_geometry(f"+{event.x_root+10}+{event.y_root+10}")
|
||
label = tk.Label(tooltip, text="Show original photo",
|
||
background="lightyellow", relief="solid", borderwidth=1,
|
||
font=("Arial", 9))
|
||
label.pack()
|
||
|
||
def hide_tooltip(event):
|
||
nonlocal tooltip
|
||
if tooltip:
|
||
tooltip.destroy()
|
||
tooltip = None
|
||
|
||
# Calculate icon position
|
||
if icon_x is None or icon_y is None:
|
||
if face_x is not None and face_y is not None and face_width is not None and face_height is not None:
|
||
# Position relative to face image - exactly in the corner
|
||
face_right = face_x + face_width // 2
|
||
face_top = face_y - face_height // 2
|
||
icon_x = face_right - icon_size
|
||
icon_y = face_top
|
||
else:
|
||
# Position relative to canvas - exactly in the corner
|
||
if canvas_width is None:
|
||
canvas_width = canvas.winfo_width()
|
||
if canvas_height is None:
|
||
canvas_height = canvas.winfo_height()
|
||
icon_x = canvas_width - icon_size
|
||
icon_y = 0
|
||
|
||
# Ensure icon stays within canvas bounds
|
||
if canvas_width is None:
|
||
canvas_width = canvas.winfo_width()
|
||
if canvas_height is None:
|
||
canvas_height = canvas.winfo_height()
|
||
icon_x = min(icon_x, canvas_width - icon_size)
|
||
icon_y = max(icon_y, 0)
|
||
|
||
# Draw the photo icon
|
||
canvas.create_rectangle(icon_x, icon_y, icon_x + icon_size, icon_y + icon_size,
|
||
fill="white", outline="black", width=1, tags="photo_icon")
|
||
canvas.create_text(icon_x + icon_size//2, icon_y + icon_size//2,
|
||
text="📷", font=("Arial", 10), tags="photo_icon")
|
||
|
||
# Bind events to the icon
|
||
canvas.tag_bind("photo_icon", "<Button-1>", open_source_photo)
|
||
canvas.tag_bind("photo_icon", "<Enter>", show_tooltip)
|
||
canvas.tag_bind("photo_icon", "<Leave>", hide_tooltip)
|
||
|
||
def _setup_window_size_saving(self, root, config_file="gui_config.json"):
|
||
"""Set up window size saving functionality"""
|
||
import json
|
||
import tkinter as tk
|
||
|
||
# Load saved window size
|
||
default_size = "600x500"
|
||
saved_size = default_size
|
||
|
||
if os.path.exists(config_file):
|
||
try:
|
||
with open(config_file, 'r') as f:
|
||
config = json.load(f)
|
||
saved_size = config.get('window_size', default_size)
|
||
except:
|
||
saved_size = default_size
|
||
|
||
# Calculate center position before showing window
|
||
try:
|
||
width = int(saved_size.split('x')[0])
|
||
height = int(saved_size.split('x')[1])
|
||
x = (root.winfo_screenwidth() // 2) - (width // 2)
|
||
y = (root.winfo_screenheight() // 2) - (height // 2)
|
||
root.geometry(f"{saved_size}+{x}+{y}")
|
||
except tk.TclError:
|
||
# Fallback to default geometry if positioning fails
|
||
root.geometry(saved_size)
|
||
|
||
# Track previous size to detect actual resizing
|
||
last_size = None
|
||
|
||
def save_window_size(event=None):
|
||
nonlocal last_size
|
||
if event and event.widget == root:
|
||
current_size = f"{root.winfo_width()}x{root.winfo_height()}"
|
||
# Only save if size actually changed
|
||
if current_size != last_size:
|
||
last_size = current_size
|
||
try:
|
||
config = {'window_size': current_size}
|
||
with open(config_file, 'w') as f:
|
||
json.dump(config, f)
|
||
except:
|
||
pass # Ignore save errors
|
||
|
||
# Bind resize event
|
||
root.bind('<Configure>', save_window_size)
|
||
return saved_size
|
||
|
||
def _cleanup_face_crops(self, current_face_crop_path=None):
|
||
"""Clean up face crop files and caches"""
|
||
# Clean up current face crop if provided
|
||
if current_face_crop_path and os.path.exists(current_face_crop_path):
|
||
try:
|
||
os.remove(current_face_crop_path)
|
||
except:
|
||
pass # Ignore cleanup errors
|
||
|
||
# Clean up all cached face crop files
|
||
for cache_key, cached_path in list(self._image_cache.items()):
|
||
if os.path.exists(cached_path):
|
||
try:
|
||
os.remove(cached_path)
|
||
except:
|
||
pass # Ignore cleanup errors
|
||
|
||
# Clear caches
|
||
self._clear_caches()
|
||
|
||
def _update_person_encodings(self, person_id: int):
|
||
"""Update person encodings when a face is identified"""
|
||
with self.get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# Get all faces for this person
|
||
cursor.execute(
|
||
'SELECT id, encoding, quality_score FROM faces WHERE person_id = ? ORDER BY quality_score DESC',
|
||
(person_id,)
|
||
)
|
||
faces = cursor.fetchall()
|
||
|
||
# Clear existing person encodings
|
||
cursor.execute('DELETE FROM person_encodings WHERE person_id = ?', (person_id,))
|
||
|
||
# Add all faces as person encodings
|
||
for face_id, encoding, quality_score in faces:
|
||
cursor.execute(
|
||
'INSERT INTO person_encodings (person_id, face_id, encoding, quality_score) VALUES (?, ?, ?, ?)',
|
||
(person_id, face_id, encoding, quality_score)
|
||
)
|
||
|
||
def _clear_caches(self):
|
||
"""Clear all caches"""
|
||
if hasattr(self.face_processor, '_image_cache'):
|
||
self.face_processor._image_cache.clear()
|
||
|
||
|
||
def main():
|
||
"""Main CLI interface"""
|
||
parser = argparse.ArgumentParser(
|
||
description="PunimTag CLI - Simple photo face tagger (Refactored)",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Examples:
|
||
photo_tagger_refactored.py scan /path/to/photos # Scan folder for photos
|
||
photo_tagger_refactored.py process --limit 20 # Process 20 photos for faces
|
||
photo_tagger_refactored.py identify --batch 10 # Identify 10 faces interactively
|
||
photo_tagger_refactored.py auto-match # Auto-identify matching faces
|
||
photo_tagger_refactored.py modifyidentified # Show and Modify identified faces
|
||
photo_tagger_refactored.py match 15 # Find faces similar to face ID 15
|
||
photo_tagger_refactored.py tag --pattern "vacation" # Tag photos matching pattern
|
||
photo_tagger_refactored.py search "John" # Find photos with John
|
||
photo_tagger_refactored.py tag-manager # Open tag management GUI
|
||
photo_tagger_refactored.py stats # Show statistics
|
||
"""
|
||
)
|
||
|
||
parser.add_argument('command',
|
||
choices=['scan', 'process', 'identify', 'tag', 'search', 'stats', 'match', 'auto-match', 'modifyidentified', 'tag-manager'],
|
||
help='Command to execute')
|
||
|
||
parser.add_argument('target', nargs='?',
|
||
help='Target folder (scan), person name (search), or pattern (tag)')
|
||
|
||
parser.add_argument('--db', default=DEFAULT_DB_PATH,
|
||
help=f'Database file path (default: {DEFAULT_DB_PATH})')
|
||
|
||
parser.add_argument('--limit', type=int, default=DEFAULT_PROCESSING_LIMIT,
|
||
help=f'Batch size limit for processing (default: {DEFAULT_PROCESSING_LIMIT})')
|
||
|
||
parser.add_argument('--batch', type=int, default=DEFAULT_BATCH_SIZE,
|
||
help=f'Batch size for identification (default: {DEFAULT_BATCH_SIZE})')
|
||
|
||
parser.add_argument('--pattern',
|
||
help='Pattern for filtering photos when tagging')
|
||
|
||
parser.add_argument('--model', choices=['hog', 'cnn'], default=DEFAULT_FACE_DETECTION_MODEL,
|
||
help=f'Face detection model: hog (faster) or cnn (more accurate) (default: {DEFAULT_FACE_DETECTION_MODEL})')
|
||
|
||
parser.add_argument('--recursive', action='store_true',
|
||
help='Scan folders recursively')
|
||
|
||
parser.add_argument('--show-faces', action='store_true',
|
||
help='Show individual face crops during identification')
|
||
|
||
parser.add_argument('--tolerance', type=float, default=DEFAULT_FACE_TOLERANCE,
|
||
help=f'Face matching tolerance (0.0-1.0, lower = stricter, default: {DEFAULT_FACE_TOLERANCE})')
|
||
|
||
parser.add_argument('--auto', action='store_true',
|
||
help='Auto-identify high-confidence matches without confirmation')
|
||
|
||
parser.add_argument('--include-twins', action='store_true',
|
||
help='Include same-photo matching (for twins or multiple instances)')
|
||
|
||
parser.add_argument('--date-from',
|
||
help='Filter by photo taken date (from) in YYYY-MM-DD format')
|
||
|
||
parser.add_argument('--date-to',
|
||
help='Filter by photo taken date (to) in YYYY-MM-DD format')
|
||
|
||
parser.add_argument('--date-processed-from',
|
||
help='Filter by photo processed date (from) in YYYY-MM-DD format')
|
||
|
||
parser.add_argument('--date-processed-to',
|
||
help='Filter by photo processed date (to) in YYYY-MM-DD format')
|
||
|
||
parser.add_argument('-v', '--verbose', action='count', default=0,
|
||
help='Increase verbosity (-v, -vv, -vvv for more detail)')
|
||
|
||
parser.add_argument('--debug', action='store_true',
|
||
help='Enable line-by-line debugging with pdb')
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Initialize tagger
|
||
tagger = PhotoTagger(args.db, args.verbose, args.debug)
|
||
|
||
try:
|
||
if args.command == 'scan':
|
||
if not args.target:
|
||
print("❌ Please specify a folder to scan")
|
||
return 1
|
||
tagger.scan_folder(args.target, args.recursive)
|
||
|
||
elif args.command == 'process':
|
||
tagger.process_faces(args.limit, args.model)
|
||
|
||
elif args.command == 'identify':
|
||
show_faces = getattr(args, 'show_faces', False)
|
||
tagger.identify_faces(args.batch, show_faces, args.tolerance,
|
||
args.date_from, args.date_to,
|
||
args.date_processed_from, args.date_processed_to)
|
||
|
||
elif args.command == 'tag':
|
||
tagger.add_tags(args.pattern or args.target, args.batch)
|
||
|
||
elif args.command == 'search':
|
||
if not args.target:
|
||
print("❌ Please specify a person name to search for")
|
||
return 1
|
||
tagger.search_faces(args.target)
|
||
|
||
elif args.command == 'stats':
|
||
tagger.stats()
|
||
|
||
elif args.command == 'match':
|
||
if args.target and args.target.isdigit():
|
||
face_id = int(args.target)
|
||
matches = tagger.find_similar_faces(face_id, args.tolerance)
|
||
if matches:
|
||
print(f"\n🎯 Found {len(matches)} similar faces:")
|
||
for match in matches:
|
||
person_name = "Unknown" if match.get('person_id') is None else f"Person ID {match.get('person_id')}"
|
||
print(f" 📸 {match.get('filename', 'Unknown')} - {person_name} (confidence: {(1-match.get('distance', 1)):.1%})")
|
||
else:
|
||
print("🔍 No similar faces found")
|
||
else:
|
||
print("❌ Please specify a face ID number to find matches for")
|
||
|
||
elif args.command == 'auto-match':
|
||
show_faces = getattr(args, 'show_faces', False)
|
||
include_twins = getattr(args, 'include_twins', False)
|
||
tagger.auto_identify_matches(args.tolerance, not args.auto, show_faces, include_twins)
|
||
|
||
elif args.command == 'modifyidentified':
|
||
tagger.modifyidentified()
|
||
|
||
elif args.command == 'tag-manager':
|
||
tagger.tag_management()
|
||
|
||
return 0
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\n⚠️ Interrupted by user")
|
||
return 1
|
||
except Exception as e:
|
||
print(f"❌ Error: {e}")
|
||
if args.debug:
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
finally:
|
||
# Always cleanup resources
|
||
tagger.cleanup()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|