This commit introduces the IdentifyPanel class into the Dashboard GUI, allowing for a fully integrated face identification interface. The Dashboard now requires a database manager and face processor to create the Identify panel, which includes features for face browsing, identification, and management. Additionally, the DatabaseManager has been updated to support case-insensitive person additions, improving data consistency. The PhotoTagger class has also been modified to accommodate these changes, ensuring seamless interaction between components.
463 lines
22 KiB
Python
463 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PunimTag CLI - Minimal Photo Face Tagger (Refactored)
|
|
Simple command-line tool for face recognition and photo tagging
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import threading
|
|
from typing import List, Dict, Tuple, Optional
|
|
|
|
# Import our new modules
|
|
from config import (
|
|
DEFAULT_DB_PATH, DEFAULT_FACE_DETECTION_MODEL, DEFAULT_FACE_TOLERANCE,
|
|
DEFAULT_BATCH_SIZE, DEFAULT_PROCESSING_LIMIT
|
|
)
|
|
from database import DatabaseManager
|
|
from face_processing import FaceProcessor
|
|
from photo_management import PhotoManager
|
|
from tag_management import TagManager
|
|
from search_stats import SearchStats
|
|
from gui_core import GUICore
|
|
from identify_gui import IdentifyGUI
|
|
from auto_match_gui import AutoMatchGUI
|
|
from modify_identified_gui import ModifyIdentifiedGUI
|
|
from tag_manager_gui import TagManagerGUI
|
|
from search_gui import SearchGUI
|
|
from dashboard_gui import DashboardGUI
|
|
|
|
|
|
class PhotoTagger:
|
|
"""Main PhotoTagger class - orchestrates all functionality"""
|
|
|
|
def __init__(self, db_path: str = DEFAULT_DB_PATH, verbose: int = 0, debug: bool = False):
|
|
"""Initialize the photo tagger with database and all managers"""
|
|
self.db_path = db_path
|
|
self.verbose = verbose
|
|
self.debug = debug
|
|
|
|
# Initialize all managers
|
|
self.db = DatabaseManager(db_path, verbose)
|
|
self.face_processor = FaceProcessor(self.db, verbose)
|
|
self.photo_manager = PhotoManager(self.db, verbose)
|
|
self.tag_manager = TagManager(self.db, verbose)
|
|
self.search_stats = SearchStats(self.db, verbose)
|
|
self.gui_core = GUICore()
|
|
self.identify_gui = IdentifyGUI(self.db, self.face_processor, verbose)
|
|
self.auto_match_gui = AutoMatchGUI(self.db, self.face_processor, verbose)
|
|
self.modify_identified_gui = ModifyIdentifiedGUI(self.db, self.face_processor, verbose)
|
|
self.tag_manager_gui = TagManagerGUI(self.db, self.gui_core, self.tag_manager, self.face_processor, verbose)
|
|
self.search_gui = SearchGUI(self.db, self.search_stats, self.gui_core, self.tag_manager, verbose)
|
|
self.dashboard_gui = DashboardGUI(self.gui_core, self.db, self.face_processor, on_scan=self._dashboard_scan, on_process=self._dashboard_process, on_identify=self._dashboard_identify)
|
|
|
|
# Legacy compatibility - expose some methods directly
|
|
self._db_connection = None
|
|
self._db_lock = threading.Lock()
|
|
|
|
def cleanup(self):
|
|
"""Clean up resources and close connections"""
|
|
self.face_processor.cleanup_face_crops()
|
|
self.db.close_db_connection()
|
|
|
|
# Database methods (delegated)
|
|
def get_db_connection(self):
|
|
"""Get database connection (legacy compatibility)"""
|
|
return self.db.get_db_connection()
|
|
|
|
def close_db_connection(self):
|
|
"""Close database connection (legacy compatibility)"""
|
|
self.db.close_db_connection()
|
|
|
|
def init_database(self):
|
|
"""Initialize database (legacy compatibility)"""
|
|
self.db.init_database()
|
|
|
|
# Photo management methods (delegated)
|
|
def scan_folder(self, folder_path: str, recursive: bool = True) -> int:
|
|
"""Scan folder for photos and add to database"""
|
|
return self.photo_manager.scan_folder(folder_path, recursive)
|
|
|
|
def _extract_photo_date(self, photo_path: str) -> Optional[str]:
|
|
"""Extract date taken from photo EXIF data (legacy compatibility)"""
|
|
return self.photo_manager.extract_photo_date(photo_path)
|
|
|
|
# Face processing methods (delegated)
|
|
def process_faces(self, limit: int = DEFAULT_PROCESSING_LIMIT, model: str = DEFAULT_FACE_DETECTION_MODEL) -> int:
|
|
"""Process unprocessed photos for faces"""
|
|
return self.face_processor.process_faces(limit, model)
|
|
|
|
def _extract_face_crop(self, photo_path: str, location: tuple, face_id: int) -> str:
|
|
"""Extract and save individual face crop for identification (legacy compatibility)"""
|
|
return self.face_processor._extract_face_crop(photo_path, location, face_id)
|
|
|
|
def _create_comparison_image(self, unid_crop_path: str, match_crop_path: str, person_name: str, confidence: float) -> str:
|
|
"""Create a side-by-side comparison image (legacy compatibility)"""
|
|
return self.face_processor._create_comparison_image(unid_crop_path, match_crop_path, person_name, confidence)
|
|
|
|
def _calculate_face_quality_score(self, image, face_location: tuple) -> float:
|
|
"""Calculate face quality score (legacy compatibility)"""
|
|
return self.face_processor._calculate_face_quality_score(image, face_location)
|
|
|
|
def _add_person_encoding(self, person_id: int, face_id: int, encoding, quality_score: float):
|
|
"""Add a face encoding to a person's encoding collection (legacy compatibility)"""
|
|
self.face_processor.add_person_encoding(person_id, face_id, encoding, quality_score)
|
|
|
|
def _get_person_encodings(self, person_id: int, min_quality: float = 0.3):
|
|
"""Get all high-quality encodings for a person (legacy compatibility)"""
|
|
return self.face_processor.get_person_encodings(person_id, min_quality)
|
|
|
|
def _update_person_encodings(self, person_id: int):
|
|
"""Update person encodings when a face is identified (legacy compatibility)"""
|
|
self.face_processor.update_person_encodings(person_id)
|
|
|
|
def _calculate_adaptive_tolerance(self, base_tolerance: float, face_quality: float, match_confidence: float = None) -> float:
|
|
"""Calculate adaptive tolerance (legacy compatibility)"""
|
|
return self.face_processor._calculate_adaptive_tolerance(base_tolerance, face_quality, match_confidence)
|
|
|
|
def _get_filtered_similar_faces(self, face_id: int, tolerance: float, include_same_photo: bool = False, face_status: dict = None):
|
|
"""Get similar faces with filtering (legacy compatibility)"""
|
|
return self.face_processor._get_filtered_similar_faces(face_id, tolerance, include_same_photo, face_status)
|
|
|
|
def _filter_unique_faces(self, faces: List[Dict]):
|
|
"""Filter faces to show only unique ones (legacy compatibility)"""
|
|
return self.face_processor._filter_unique_faces(faces)
|
|
|
|
def _filter_unique_faces_from_list(self, faces_list: List[tuple]):
|
|
"""Filter face list to show only unique ones (legacy compatibility)"""
|
|
return self.face_processor._filter_unique_faces_from_list(faces_list)
|
|
|
|
def find_similar_faces(self, face_id: int = None, tolerance: float = DEFAULT_FACE_TOLERANCE, include_same_photo: bool = False):
|
|
"""Find similar faces across all photos"""
|
|
return self.face_processor.find_similar_faces(face_id, tolerance, include_same_photo)
|
|
|
|
def auto_identify_matches(self, tolerance: float = DEFAULT_FACE_TOLERANCE, confirm: bool = True, show_faces: bool = False, include_same_photo: bool = False) -> int:
|
|
"""Automatically identify faces that match already identified faces using GUI"""
|
|
return self.auto_match_gui.auto_identify_matches(tolerance, confirm, show_faces, include_same_photo)
|
|
|
|
# Tag management methods (delegated)
|
|
def add_tags(self, photo_pattern: str = None, batch_size: int = DEFAULT_BATCH_SIZE) -> int:
|
|
"""Add custom tags to photos"""
|
|
return self.tag_manager.add_tags_to_photos(photo_pattern, batch_size)
|
|
|
|
def _deduplicate_tags(self, tag_list):
|
|
"""Remove duplicate tags from a list (legacy compatibility)"""
|
|
return self.tag_manager.deduplicate_tags(tag_list)
|
|
|
|
def _parse_tags_string(self, tags_string):
|
|
"""Parse a comma-separated tags string (legacy compatibility)"""
|
|
return self.tag_manager.parse_tags_string(tags_string)
|
|
|
|
def _get_tag_id_by_name(self, tag_name, tag_name_to_id_map):
|
|
"""Get tag ID by name (legacy compatibility)"""
|
|
return self.db.get_tag_id_by_name(tag_name, tag_name_to_id_map)
|
|
|
|
def _get_tag_name_by_id(self, tag_id, tag_id_to_name_map):
|
|
"""Get tag name by ID (legacy compatibility)"""
|
|
return self.db.get_tag_name_by_id(tag_id, tag_id_to_name_map)
|
|
|
|
def _load_tag_mappings(self):
|
|
"""Load tag name to ID and ID to name mappings (legacy compatibility)"""
|
|
return self.db.load_tag_mappings()
|
|
|
|
def _get_existing_tag_ids_for_photo(self, photo_id):
|
|
"""Get list of tag IDs for a photo (legacy compatibility)"""
|
|
return self.db.get_existing_tag_ids_for_photo(photo_id)
|
|
|
|
def _show_people_list(self, cursor=None):
|
|
"""Show list of people in database (legacy compatibility)"""
|
|
return self.db.show_people_list(cursor)
|
|
|
|
# Search and statistics methods (delegated)
|
|
def search_faces(self, person_name: str):
|
|
"""Search for photos containing a specific person"""
|
|
return self.search_stats.search_faces(person_name)
|
|
|
|
def stats(self):
|
|
"""Show database statistics"""
|
|
return self.search_stats.print_statistics()
|
|
|
|
# GUI methods (legacy compatibility - these would need to be implemented)
|
|
def identify_faces(self, batch_size: int = DEFAULT_BATCH_SIZE, tolerance: float = DEFAULT_FACE_TOLERANCE,
|
|
date_from: str = None, date_to: str = None, date_processed_from: str = None, date_processed_to: str = None) -> int:
|
|
"""Interactive face identification with GUI (show_faces is always True)"""
|
|
return self.identify_gui.identify_faces(batch_size, True, tolerance,
|
|
date_from, date_to, date_processed_from, date_processed_to)
|
|
|
|
def tag_management(self) -> int:
|
|
"""Tag management GUI"""
|
|
return self.tag_manager_gui.tag_management()
|
|
|
|
def modifyidentified(self) -> int:
|
|
return self.modify_identified_gui.modifyidentified()
|
|
|
|
def searchgui(self) -> int:
|
|
"""Open the Search GUI."""
|
|
return self.search_gui.search_gui()
|
|
|
|
def dashboard(self) -> int:
|
|
"""Open the Dashboard GUI (placeholders only)."""
|
|
return self.dashboard_gui.open()
|
|
|
|
# Dashboard callbacks
|
|
def _dashboard_scan(self, folder_path: str, recursive: bool) -> int:
|
|
"""Callback to scan a folder from the dashboard."""
|
|
return self.scan_folder(folder_path, recursive)
|
|
|
|
def _dashboard_process(self, limit_value: Optional[int]) -> int:
|
|
"""Callback to process faces from the dashboard with optional limit."""
|
|
if limit_value is None:
|
|
return self.process_faces()
|
|
return self.process_faces(limit=limit_value)
|
|
|
|
def _dashboard_identify(self, batch_value: Optional[int]) -> int:
|
|
"""Callback to identify faces from the dashboard with optional batch (show_faces is always True)."""
|
|
if batch_value is None:
|
|
return self.identify_faces()
|
|
return self.identify_faces(batch_size=batch_value)
|
|
|
|
def _setup_window_size_saving(self, root, config_file="gui_config.json"):
|
|
"""Set up window size saving functionality (legacy compatibility)"""
|
|
return self.gui_core.setup_window_size_saving(root, config_file)
|
|
|
|
def _display_similar_faces_in_panel(self, parent_frame, similar_faces_data, face_vars, face_images, face_crops, current_face_id=None, face_selection_states=None, data_cache=None):
|
|
"""Display similar faces in panel (legacy compatibility)"""
|
|
print("⚠️ Similar faces panel not yet implemented in refactored version")
|
|
return None
|
|
|
|
def _create_photo_icon(self, canvas, photo_path, icon_size=20, icon_x=None, icon_y=None, callback=None):
|
|
"""Create a small photo icon on a canvas (legacy compatibility)"""
|
|
return self.gui_core.create_photo_icon(canvas, photo_path, icon_size, icon_x, icon_y, callback)
|
|
|
|
def _get_confidence_description(self, confidence_pct: float) -> str:
|
|
"""Get human-readable confidence description (legacy compatibility)"""
|
|
return self.face_processor._get_confidence_description(confidence_pct)
|
|
|
|
# Cache management (legacy compatibility)
|
|
def _clear_caches(self):
|
|
"""Clear all caches to free memory (legacy compatibility)"""
|
|
self.face_processor._clear_caches()
|
|
|
|
def _cleanup_face_crops(self, current_face_crop_path=None):
|
|
"""Clean up face crop files and caches (legacy compatibility)"""
|
|
self.face_processor.cleanup_face_crops(current_face_crop_path)
|
|
|
|
@property
|
|
def _face_encoding_cache(self):
|
|
"""Face encoding cache (legacy compatibility)"""
|
|
return self.face_processor._face_encoding_cache
|
|
|
|
@property
|
|
def _image_cache(self):
|
|
"""Image cache (legacy compatibility)"""
|
|
return self.face_processor._image_cache
|
|
|
|
def _get_filtered_similar_faces(self, face_id: int, tolerance: float, include_same_photo: bool = False, face_status: dict = None) -> List[Dict]:
|
|
"""Get similar faces with consistent filtering and sorting logic used by both auto-match and identify"""
|
|
# Find similar faces using the core function
|
|
similar_faces_data = self.find_similar_faces(face_id, tolerance=tolerance, include_same_photo=include_same_photo)
|
|
|
|
# Filter to only show unidentified faces with confidence filtering
|
|
filtered_faces = []
|
|
for face in similar_faces_data:
|
|
# For auto-match: only filter by database state (keep existing behavior)
|
|
# For identify: also filter by current session state
|
|
is_identified_in_db = face.get('person_id') is not None
|
|
is_identified_in_session = face_status and face.get('face_id') in face_status and face_status[face.get('face_id')] == 'identified'
|
|
|
|
# If face_status is provided (identify mode), use both filters
|
|
# If face_status is None (auto-match mode), only use database filter
|
|
if face_status is not None:
|
|
# Identify mode: filter out both database and session identified faces
|
|
if not is_identified_in_db and not is_identified_in_session:
|
|
# Calculate confidence percentage
|
|
confidence_pct = (1 - face['distance']) * 100
|
|
|
|
# Only include matches with reasonable confidence (at least 40%)
|
|
if confidence_pct >= 40:
|
|
filtered_faces.append(face)
|
|
else:
|
|
# Auto-match mode: only filter by database state (keep existing behavior)
|
|
if not is_identified_in_db:
|
|
# Calculate confidence percentage
|
|
confidence_pct = (1 - face['distance']) * 100
|
|
|
|
# Only include matches with reasonable confidence (at least 40%)
|
|
if confidence_pct >= 40:
|
|
filtered_faces.append(face)
|
|
|
|
# Sort by confidence (distance) - highest confidence first
|
|
filtered_faces.sort(key=lambda x: x['distance'])
|
|
|
|
return filtered_faces
|
|
|
|
|
|
def main():
|
|
"""Main CLI interface"""
|
|
# Suppress pkg_resources deprecation warning from face_recognition library
|
|
import warnings
|
|
warnings.filterwarnings("ignore", message="pkg_resources is deprecated", category=UserWarning)
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="PunimTag CLI - Simple photo face tagger (Refactored)",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
photo_tagger_refactored.py scan /path/to/photos # Scan folder for photos
|
|
photo_tagger_refactored.py process --limit 20 # Process 20 photos for faces
|
|
photo_tagger_refactored.py identify --batch 10 # Identify 10 faces interactively
|
|
photo_tagger_refactored.py auto-match # Auto-identify matching faces
|
|
photo_tagger_refactored.py modifyidentified # Show and Modify identified faces
|
|
photo_tagger_refactored.py match 15 # Find faces similar to face ID 15
|
|
photo_tagger_refactored.py tag --pattern "vacation" # Tag photos matching pattern
|
|
photo_tagger_refactored.py search "John" # Find photos with John
|
|
photo_tagger_refactored.py tag-manager # Open tag management GUI
|
|
photo_tagger_refactored.py stats # Show statistics
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('command',
|
|
choices=['scan', 'process', 'identify', 'tag', 'search', 'search-gui', 'dashboard', 'stats', 'match', 'auto-match', 'modifyidentified', 'tag-manager'],
|
|
help='Command to execute')
|
|
|
|
parser.add_argument('target', nargs='?',
|
|
help='Target folder (scan), person name (search), or pattern (tag)')
|
|
|
|
parser.add_argument('--db', default=DEFAULT_DB_PATH,
|
|
help=f'Database file path (default: {DEFAULT_DB_PATH})')
|
|
|
|
parser.add_argument('--limit', type=int, default=DEFAULT_PROCESSING_LIMIT,
|
|
help=f'Batch size limit for processing (default: {DEFAULT_PROCESSING_LIMIT})')
|
|
|
|
parser.add_argument('--batch', type=int, default=DEFAULT_BATCH_SIZE,
|
|
help=f'Batch size for identification (default: {DEFAULT_BATCH_SIZE})')
|
|
|
|
parser.add_argument('--pattern',
|
|
help='Pattern for filtering photos when tagging')
|
|
|
|
parser.add_argument('--model', choices=['hog', 'cnn'], default=DEFAULT_FACE_DETECTION_MODEL,
|
|
help=f'Face detection model: hog (faster) or cnn (more accurate) (default: {DEFAULT_FACE_DETECTION_MODEL})')
|
|
|
|
parser.add_argument('--recursive', action='store_true',
|
|
help='Scan folders recursively')
|
|
|
|
|
|
parser.add_argument('--tolerance', type=float, default=DEFAULT_FACE_TOLERANCE,
|
|
help=f'Face matching tolerance (0.0-1.0, lower = stricter, default: {DEFAULT_FACE_TOLERANCE})')
|
|
|
|
parser.add_argument('--auto', action='store_true',
|
|
help='Auto-identify high-confidence matches without confirmation')
|
|
|
|
parser.add_argument('--include-twins', action='store_true',
|
|
help='Include same-photo matching (for twins or multiple instances)')
|
|
|
|
parser.add_argument('--date-from',
|
|
help='Filter by photo taken date (from) in YYYY-MM-DD format')
|
|
|
|
parser.add_argument('--date-to',
|
|
help='Filter by photo taken date (to) in YYYY-MM-DD format')
|
|
|
|
parser.add_argument('--date-processed-from',
|
|
help='Filter by photo processed date (from) in YYYY-MM-DD format')
|
|
|
|
parser.add_argument('--date-processed-to',
|
|
help='Filter by photo processed date (to) in YYYY-MM-DD format')
|
|
|
|
parser.add_argument('-v', '--verbose', action='count', default=0,
|
|
help='Increase verbosity (-v, -vv, -vvv for more detail)')
|
|
|
|
parser.add_argument('--debug', action='store_true',
|
|
help='Enable line-by-line debugging with pdb')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize tagger
|
|
tagger = PhotoTagger(args.db, args.verbose, args.debug)
|
|
|
|
try:
|
|
if args.command == 'scan':
|
|
if not args.target:
|
|
print("❌ Please specify a folder to scan")
|
|
return 1
|
|
|
|
# Normalize path to absolute path
|
|
from path_utils import normalize_path
|
|
try:
|
|
normalized_path = normalize_path(args.target)
|
|
print(f"📁 Scanning folder: {normalized_path}")
|
|
tagger.scan_folder(normalized_path, args.recursive)
|
|
except ValueError as e:
|
|
print(f"❌ Invalid path: {e}")
|
|
return 1
|
|
|
|
elif args.command == 'process':
|
|
tagger.process_faces(args.limit, args.model)
|
|
|
|
elif args.command == 'identify':
|
|
tagger.identify_faces(args.batch, args.tolerance,
|
|
args.date_from, args.date_to,
|
|
args.date_processed_from, args.date_processed_to)
|
|
|
|
elif args.command == 'tag':
|
|
tagger.add_tags(args.pattern or args.target, args.batch)
|
|
|
|
elif args.command == 'search':
|
|
if not args.target:
|
|
print("❌ Please specify a person name to search for")
|
|
return 1
|
|
tagger.search_faces(args.target)
|
|
|
|
elif args.command == 'search-gui':
|
|
tagger.searchgui()
|
|
|
|
elif args.command == 'dashboard':
|
|
tagger.dashboard()
|
|
|
|
elif args.command == 'stats':
|
|
tagger.stats()
|
|
|
|
elif args.command == 'match':
|
|
if args.target and args.target.isdigit():
|
|
face_id = int(args.target)
|
|
matches = tagger.find_similar_faces(face_id, args.tolerance)
|
|
if matches:
|
|
print(f"\n🎯 Found {len(matches)} similar faces:")
|
|
for match in matches:
|
|
person_name = "Unknown" if match.get('person_id') is None else f"Person ID {match.get('person_id')}"
|
|
print(f" 📸 {match.get('filename', 'Unknown')} - {person_name} (confidence: {(1-match.get('distance', 1)):.1%})")
|
|
else:
|
|
print("🔍 No similar faces found")
|
|
else:
|
|
print("❌ Please specify a face ID number to find matches for")
|
|
|
|
elif args.command == 'auto-match':
|
|
show_faces = getattr(args, 'show_faces', False)
|
|
include_twins = getattr(args, 'include_twins', False)
|
|
tagger.auto_identify_matches(args.tolerance, not args.auto, show_faces, include_twins)
|
|
|
|
elif args.command == 'modifyidentified':
|
|
tagger.modifyidentified()
|
|
|
|
elif args.command == 'tag-manager':
|
|
tagger.tag_management()
|
|
|
|
return 0
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Interrupted by user")
|
|
return 1
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
if args.debug:
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
finally:
|
|
# Always cleanup resources
|
|
tagger.cleanup()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|