This commit deletes the `photo_tagger_refactored.py`, `run.sh`, and test files (`test_basic.py`, `test_deepface_gui.py`, `test_face_recognition.py`) that are no longer in use. The removal of these files streamlines the project structure and eliminates legacy code, paving the way for future enhancements and a cleaner codebase. The README has been updated to reflect these changes, ensuring clarity on the current state of the project.
463 lines
22 KiB
Python
463 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PunimTag CLI - Minimal Photo Face Tagger (Refactored)
|
|
Simple command-line tool for face recognition and photo tagging
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import threading
|
|
from typing import List, Dict, Tuple, Optional
|
|
|
|
# Import our new modules
|
|
from config import (
|
|
DEFAULT_DB_PATH, DEFAULT_FACE_DETECTION_MODEL, DEFAULT_FACE_TOLERANCE,
|
|
DEFAULT_BATCH_SIZE, DEFAULT_PROCESSING_LIMIT
|
|
)
|
|
from database import DatabaseManager
|
|
from face_processing import FaceProcessor
|
|
from photo_management import PhotoManager
|
|
from tag_management import TagManager
|
|
from search_stats import SearchStats
|
|
from gui_core import GUICore
|
|
from identify_gui import IdentifyGUI
|
|
from auto_match_gui import AutoMatchGUI
|
|
from modify_identified_gui import ModifyIdentifiedGUI
|
|
from tag_manager_gui import TagManagerGUI
|
|
from search_gui import SearchGUI
|
|
from dashboard_gui import DashboardGUI
|
|
|
|
|
|
class PhotoTagger:
|
|
"""Main PhotoTagger class - orchestrates all functionality"""
|
|
|
|
def __init__(self, db_path: str = DEFAULT_DB_PATH, verbose: int = 0, debug: bool = False):
|
|
"""Initialize the photo tagger with database and all managers"""
|
|
self.db_path = db_path
|
|
self.verbose = verbose
|
|
self.debug = debug
|
|
|
|
# Initialize all managers
|
|
self.db = DatabaseManager(db_path, verbose)
|
|
self.face_processor = FaceProcessor(self.db, verbose)
|
|
self.photo_manager = PhotoManager(self.db, verbose)
|
|
self.tag_manager = TagManager(self.db, verbose)
|
|
self.search_stats = SearchStats(self.db, verbose)
|
|
self.gui_core = GUICore()
|
|
self.identify_gui = IdentifyGUI(self.db, self.face_processor, verbose)
|
|
self.auto_match_gui = AutoMatchGUI(self.db, self.face_processor, verbose)
|
|
self.modify_identified_gui = ModifyIdentifiedGUI(self.db, self.face_processor, verbose)
|
|
self.tag_manager_gui = TagManagerGUI(self.db, self.gui_core, self.tag_manager, self.face_processor, verbose)
|
|
self.search_gui = SearchGUI(self.db, self.search_stats, self.gui_core, self.tag_manager, verbose)
|
|
self.dashboard_gui = DashboardGUI(self.gui_core, self.db, self.face_processor, on_scan=self._dashboard_scan, on_process=self._dashboard_process, on_identify=self._dashboard_identify, search_stats=self.search_stats, tag_manager=self.tag_manager)
|
|
|
|
# Legacy compatibility - expose some methods directly
|
|
self._db_connection = None
|
|
self._db_lock = threading.Lock()
|
|
|
|
def cleanup(self):
|
|
"""Clean up resources and close connections"""
|
|
self.face_processor.cleanup_face_crops()
|
|
self.db.close_db_connection()
|
|
|
|
# Database methods (delegated)
|
|
def get_db_connection(self):
|
|
"""Get database connection (legacy compatibility)"""
|
|
return self.db.get_db_connection()
|
|
|
|
def close_db_connection(self):
|
|
"""Close database connection (legacy compatibility)"""
|
|
self.db.close_db_connection()
|
|
|
|
def init_database(self):
|
|
"""Initialize database (legacy compatibility)"""
|
|
self.db.init_database()
|
|
|
|
# Photo management methods (delegated)
|
|
def scan_folder(self, folder_path: str, recursive: bool = True) -> int:
|
|
"""Scan folder for photos and add to database"""
|
|
return self.photo_manager.scan_folder(folder_path, recursive)
|
|
|
|
def _extract_photo_date(self, photo_path: str) -> Optional[str]:
|
|
"""Extract date taken from photo EXIF data (legacy compatibility)"""
|
|
return self.photo_manager.extract_photo_date(photo_path)
|
|
|
|
# Face processing methods (delegated)
|
|
def process_faces(self, limit: int = DEFAULT_PROCESSING_LIMIT, model: str = DEFAULT_FACE_DETECTION_MODEL, progress_callback=None, stop_event=None) -> int:
|
|
"""Process unprocessed photos for faces with optional progress and cancellation"""
|
|
return self.face_processor.process_faces(limit, model, progress_callback, stop_event)
|
|
|
|
def _extract_face_crop(self, photo_path: str, location: tuple, face_id: int) -> str:
|
|
"""Extract and save individual face crop for identification (legacy compatibility)"""
|
|
return self.face_processor._extract_face_crop(photo_path, location, face_id)
|
|
|
|
def _create_comparison_image(self, unid_crop_path: str, match_crop_path: str, person_name: str, confidence: float) -> str:
|
|
"""Create a side-by-side comparison image (legacy compatibility)"""
|
|
return self.face_processor._create_comparison_image(unid_crop_path, match_crop_path, person_name, confidence)
|
|
|
|
def _calculate_face_quality_score(self, image, face_location: tuple) -> float:
|
|
"""Calculate face quality score (legacy compatibility)"""
|
|
return self.face_processor._calculate_face_quality_score(image, face_location)
|
|
|
|
def _add_person_encoding(self, person_id: int, face_id: int, encoding, quality_score: float):
|
|
"""Add a face encoding to a person's encoding collection (legacy compatibility)"""
|
|
self.face_processor.add_person_encoding(person_id, face_id, encoding, quality_score)
|
|
|
|
def _get_person_encodings(self, person_id: int, min_quality: float = 0.3):
|
|
"""Get all high-quality encodings for a person (legacy compatibility)"""
|
|
return self.face_processor.get_person_encodings(person_id, min_quality)
|
|
|
|
def _update_person_encodings(self, person_id: int):
|
|
"""Update person encodings when a face is identified (legacy compatibility)"""
|
|
self.face_processor.update_person_encodings(person_id)
|
|
|
|
def _calculate_adaptive_tolerance(self, base_tolerance: float, face_quality: float, match_confidence: float = None) -> float:
|
|
"""Calculate adaptive tolerance (legacy compatibility)"""
|
|
return self.face_processor._calculate_adaptive_tolerance(base_tolerance, face_quality, match_confidence)
|
|
|
|
def _get_filtered_similar_faces(self, face_id: int, tolerance: float, include_same_photo: bool = False, face_status: dict = None):
|
|
"""Get similar faces with filtering (legacy compatibility)"""
|
|
return self.face_processor._get_filtered_similar_faces(face_id, tolerance, include_same_photo, face_status)
|
|
|
|
def _filter_unique_faces(self, faces: List[Dict]):
|
|
"""Filter faces to show only unique ones (legacy compatibility)"""
|
|
return self.face_processor._filter_unique_faces(faces)
|
|
|
|
def _filter_unique_faces_from_list(self, faces_list: List[tuple]):
|
|
"""Filter face list to show only unique ones (legacy compatibility)"""
|
|
return self.face_processor._filter_unique_faces_from_list(faces_list)
|
|
|
|
def find_similar_faces(self, face_id: int = None, tolerance: float = DEFAULT_FACE_TOLERANCE, include_same_photo: bool = False):
|
|
"""Find similar faces across all photos"""
|
|
return self.face_processor.find_similar_faces(face_id, tolerance, include_same_photo)
|
|
|
|
def auto_identify_matches(self, tolerance: float = DEFAULT_FACE_TOLERANCE, confirm: bool = True, show_faces: bool = False, include_same_photo: bool = False) -> int:
|
|
"""Automatically identify faces that match already identified faces using GUI"""
|
|
return self.auto_match_gui.auto_identify_matches(tolerance, confirm, show_faces, include_same_photo)
|
|
|
|
# Tag management methods (delegated)
|
|
def add_tags(self, photo_pattern: str = None, batch_size: int = DEFAULT_BATCH_SIZE) -> int:
|
|
"""Add custom tags to photos"""
|
|
return self.tag_manager.add_tags_to_photos(photo_pattern, batch_size)
|
|
|
|
def _deduplicate_tags(self, tag_list):
|
|
"""Remove duplicate tags from a list (legacy compatibility)"""
|
|
return self.tag_manager.deduplicate_tags(tag_list)
|
|
|
|
def _parse_tags_string(self, tags_string):
|
|
"""Parse a comma-separated tags string (legacy compatibility)"""
|
|
return self.tag_manager.parse_tags_string(tags_string)
|
|
|
|
def _get_tag_id_by_name(self, tag_name, tag_name_to_id_map):
|
|
"""Get tag ID by name (legacy compatibility)"""
|
|
return self.db.get_tag_id_by_name(tag_name, tag_name_to_id_map)
|
|
|
|
def _get_tag_name_by_id(self, tag_id, tag_id_to_name_map):
|
|
"""Get tag name by ID (legacy compatibility)"""
|
|
return self.db.get_tag_name_by_id(tag_id, tag_id_to_name_map)
|
|
|
|
def _load_tag_mappings(self):
|
|
"""Load tag name to ID and ID to name mappings (legacy compatibility)"""
|
|
return self.db.load_tag_mappings()
|
|
|
|
def _get_existing_tag_ids_for_photo(self, photo_id):
|
|
"""Get list of tag IDs for a photo (legacy compatibility)"""
|
|
return self.db.get_existing_tag_ids_for_photo(photo_id)
|
|
|
|
def _show_people_list(self, cursor=None):
|
|
"""Show list of people in database (legacy compatibility)"""
|
|
return self.db.show_people_list(cursor)
|
|
|
|
# Search and statistics methods (delegated)
|
|
def search_faces(self, person_name: str):
|
|
"""Search for photos containing a specific person"""
|
|
return self.search_stats.search_faces(person_name)
|
|
|
|
def stats(self):
|
|
"""Show database statistics"""
|
|
return self.search_stats.print_statistics()
|
|
|
|
# GUI methods (legacy compatibility - these would need to be implemented)
|
|
def identify_faces(self, batch_size: int = DEFAULT_BATCH_SIZE, tolerance: float = DEFAULT_FACE_TOLERANCE,
|
|
date_from: str = None, date_to: str = None, date_processed_from: str = None, date_processed_to: str = None) -> int:
|
|
"""Interactive face identification with GUI (show_faces is always True)"""
|
|
return self.identify_gui.identify_faces(batch_size, True, tolerance,
|
|
date_from, date_to, date_processed_from, date_processed_to)
|
|
|
|
def tag_management(self) -> int:
|
|
"""Tag management GUI"""
|
|
return self.tag_manager_gui.tag_management()
|
|
|
|
def modifyidentified(self) -> int:
|
|
return self.modify_identified_gui.modifyidentified()
|
|
|
|
def searchgui(self) -> int:
|
|
"""Open the Search GUI."""
|
|
return self.search_gui.search_gui()
|
|
|
|
def dashboard(self) -> int:
|
|
"""Open the Dashboard GUI (placeholders only)."""
|
|
return self.dashboard_gui.open()
|
|
|
|
# Dashboard callbacks
|
|
def _dashboard_scan(self, folder_path: str, recursive: bool) -> int:
|
|
"""Callback to scan a folder from the dashboard."""
|
|
return self.scan_folder(folder_path, recursive)
|
|
|
|
def _dashboard_process(self, limit_value: Optional[int], progress_callback=None, stop_event=None) -> int:
|
|
"""Callback to process faces from the dashboard with optional limit, progress, cancel."""
|
|
if limit_value is None:
|
|
return self.process_faces(progress_callback=progress_callback, stop_event=stop_event)
|
|
return self.process_faces(limit=limit_value, progress_callback=progress_callback, stop_event=stop_event)
|
|
|
|
def _dashboard_identify(self, batch_value: Optional[int]) -> int:
|
|
"""Callback to identify faces from the dashboard with optional batch (show_faces is always True)."""
|
|
if batch_value is None:
|
|
return self.identify_faces()
|
|
return self.identify_faces(batch_size=batch_value)
|
|
|
|
def _setup_window_size_saving(self, root, config_file="gui_config.json"):
|
|
"""Set up window size saving functionality (legacy compatibility)"""
|
|
return self.gui_core.setup_window_size_saving(root, config_file)
|
|
|
|
def _display_similar_faces_in_panel(self, parent_frame, similar_faces_data, face_vars, face_images, face_crops, current_face_id=None, face_selection_states=None, data_cache=None):
|
|
"""Display similar faces in panel (legacy compatibility)"""
|
|
print("⚠️ Similar faces panel not yet implemented in refactored version")
|
|
return None
|
|
|
|
def _create_photo_icon(self, canvas, photo_path, icon_size=20, icon_x=None, icon_y=None, callback=None):
|
|
"""Create a small photo icon on a canvas (legacy compatibility)"""
|
|
return self.gui_core.create_photo_icon(canvas, photo_path, icon_size, icon_x, icon_y, callback)
|
|
|
|
def _get_confidence_description(self, confidence_pct: float) -> str:
|
|
"""Get human-readable confidence description (legacy compatibility)"""
|
|
return self.face_processor._get_confidence_description(confidence_pct)
|
|
|
|
# Cache management (legacy compatibility)
|
|
def _clear_caches(self):
|
|
"""Clear all caches to free memory (legacy compatibility)"""
|
|
self.face_processor._clear_caches()
|
|
|
|
def _cleanup_face_crops(self, current_face_crop_path=None):
|
|
"""Clean up face crop files and caches (legacy compatibility)"""
|
|
self.face_processor.cleanup_face_crops(current_face_crop_path)
|
|
|
|
@property
|
|
def _face_encoding_cache(self):
|
|
"""Face encoding cache (legacy compatibility)"""
|
|
return self.face_processor._face_encoding_cache
|
|
|
|
@property
|
|
def _image_cache(self):
|
|
"""Image cache (legacy compatibility)"""
|
|
return self.face_processor._image_cache
|
|
|
|
def _get_filtered_similar_faces(self, face_id: int, tolerance: float, include_same_photo: bool = False, face_status: dict = None) -> List[Dict]:
|
|
"""Get similar faces with consistent filtering and sorting logic used by both auto-match and identify"""
|
|
# Find similar faces using the core function
|
|
similar_faces_data = self.find_similar_faces(face_id, tolerance=tolerance, include_same_photo=include_same_photo)
|
|
|
|
# Filter to only show unidentified faces with confidence filtering
|
|
filtered_faces = []
|
|
for face in similar_faces_data:
|
|
# For auto-match: only filter by database state (keep existing behavior)
|
|
# For identify: also filter by current session state
|
|
is_identified_in_db = face.get('person_id') is not None
|
|
is_identified_in_session = face_status and face.get('face_id') in face_status and face_status[face.get('face_id')] == 'identified'
|
|
|
|
# If face_status is provided (identify mode), use both filters
|
|
# If face_status is None (auto-match mode), only use database filter
|
|
if face_status is not None:
|
|
# Identify mode: filter out both database and session identified faces
|
|
if not is_identified_in_db and not is_identified_in_session:
|
|
# Calculate confidence percentage
|
|
confidence_pct = (1 - face['distance']) * 100
|
|
|
|
# Only include matches with reasonable confidence (at least 40%)
|
|
if confidence_pct >= 40:
|
|
filtered_faces.append(face)
|
|
else:
|
|
# Auto-match mode: only filter by database state (keep existing behavior)
|
|
if not is_identified_in_db:
|
|
# Calculate confidence percentage
|
|
confidence_pct = (1 - face['distance']) * 100
|
|
|
|
# Only include matches with reasonable confidence (at least 40%)
|
|
if confidence_pct >= 40:
|
|
filtered_faces.append(face)
|
|
|
|
# Sort by confidence (distance) - highest confidence first
|
|
filtered_faces.sort(key=lambda x: x['distance'])
|
|
|
|
return filtered_faces
|
|
|
|
|
|
def main():
|
|
"""Main CLI interface"""
|
|
# Suppress pkg_resources deprecation warning from face_recognition library
|
|
import warnings
|
|
warnings.filterwarnings("ignore", message="pkg_resources is deprecated", category=UserWarning)
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="PunimTag CLI - Simple photo face tagger (Refactored)",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
photo_tagger_refactored.py scan /path/to/photos # Scan folder for photos
|
|
photo_tagger_refactored.py process --limit 20 # Process 20 photos for faces
|
|
photo_tagger_refactored.py identify --batch 10 # Identify 10 faces interactively
|
|
photo_tagger_refactored.py auto-match # Auto-identify matching faces
|
|
photo_tagger_refactored.py modifyidentified # Show and Modify identified faces
|
|
photo_tagger_refactored.py match 15 # Find faces similar to face ID 15
|
|
photo_tagger_refactored.py tag --pattern "vacation" # Tag photos matching pattern
|
|
photo_tagger_refactored.py search "John" # Find photos with John
|
|
photo_tagger_refactored.py tag-manager # Open tag management GUI
|
|
photo_tagger_refactored.py stats # Show statistics
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('command',
|
|
choices=['scan', 'process', 'identify', 'tag', 'search', 'search-gui', 'dashboard', 'stats', 'match', 'auto-match', 'modifyidentified', 'tag-manager'],
|
|
help='Command to execute')
|
|
|
|
parser.add_argument('target', nargs='?',
|
|
help='Target folder (scan), person name (search), or pattern (tag)')
|
|
|
|
parser.add_argument('--db', default=DEFAULT_DB_PATH,
|
|
help=f'Database file path (default: {DEFAULT_DB_PATH})')
|
|
|
|
parser.add_argument('--limit', type=int, default=DEFAULT_PROCESSING_LIMIT,
|
|
help=f'Batch size limit for processing (default: {DEFAULT_PROCESSING_LIMIT})')
|
|
|
|
parser.add_argument('--batch', type=int, default=DEFAULT_BATCH_SIZE,
|
|
help=f'Batch size for identification (default: {DEFAULT_BATCH_SIZE})')
|
|
|
|
parser.add_argument('--pattern',
|
|
help='Pattern for filtering photos when tagging')
|
|
|
|
parser.add_argument('--model', choices=['hog', 'cnn'], default=DEFAULT_FACE_DETECTION_MODEL,
|
|
help=f'Face detection model: hog (faster) or cnn (more accurate) (default: {DEFAULT_FACE_DETECTION_MODEL})')
|
|
|
|
parser.add_argument('--recursive', action='store_true',
|
|
help='Scan folders recursively')
|
|
|
|
|
|
parser.add_argument('--tolerance', type=float, default=DEFAULT_FACE_TOLERANCE,
|
|
help=f'Face matching tolerance (0.0-1.0, lower = stricter, default: {DEFAULT_FACE_TOLERANCE})')
|
|
|
|
parser.add_argument('--auto', action='store_true',
|
|
help='Auto-identify high-confidence matches without confirmation')
|
|
|
|
parser.add_argument('--include-twins', action='store_true',
|
|
help='Include same-photo matching (for twins or multiple instances)')
|
|
|
|
parser.add_argument('--date-from',
|
|
help='Filter by photo taken date (from) in YYYY-MM-DD format')
|
|
|
|
parser.add_argument('--date-to',
|
|
help='Filter by photo taken date (to) in YYYY-MM-DD format')
|
|
|
|
parser.add_argument('--date-processed-from',
|
|
help='Filter by photo processed date (from) in YYYY-MM-DD format')
|
|
|
|
parser.add_argument('--date-processed-to',
|
|
help='Filter by photo processed date (to) in YYYY-MM-DD format')
|
|
|
|
parser.add_argument('-v', '--verbose', action='count', default=0,
|
|
help='Increase verbosity (-v, -vv, -vvv for more detail)')
|
|
|
|
parser.add_argument('--debug', action='store_true',
|
|
help='Enable line-by-line debugging with pdb')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize tagger
|
|
tagger = PhotoTagger(args.db, args.verbose, args.debug)
|
|
|
|
try:
|
|
if args.command == 'scan':
|
|
if not args.target:
|
|
print("❌ Please specify a folder to scan")
|
|
return 1
|
|
|
|
# Normalize path to absolute path
|
|
from path_utils import normalize_path
|
|
try:
|
|
normalized_path = normalize_path(args.target)
|
|
print(f"📁 Scanning folder: {normalized_path}")
|
|
tagger.scan_folder(normalized_path, args.recursive)
|
|
except ValueError as e:
|
|
print(f"❌ Invalid path: {e}")
|
|
return 1
|
|
|
|
elif args.command == 'process':
|
|
tagger.process_faces(args.limit, args.model)
|
|
|
|
elif args.command == 'identify':
|
|
tagger.identify_faces(args.batch, args.tolerance,
|
|
args.date_from, args.date_to,
|
|
args.date_processed_from, args.date_processed_to)
|
|
|
|
elif args.command == 'tag':
|
|
tagger.add_tags(args.pattern or args.target, args.batch)
|
|
|
|
elif args.command == 'search':
|
|
if not args.target:
|
|
print("❌ Please specify a person name to search for")
|
|
return 1
|
|
tagger.search_faces(args.target)
|
|
|
|
elif args.command == 'search-gui':
|
|
tagger.searchgui()
|
|
|
|
elif args.command == 'dashboard':
|
|
tagger.dashboard()
|
|
|
|
elif args.command == 'stats':
|
|
tagger.stats()
|
|
|
|
elif args.command == 'match':
|
|
if args.target and args.target.isdigit():
|
|
face_id = int(args.target)
|
|
matches = tagger.find_similar_faces(face_id, args.tolerance)
|
|
if matches:
|
|
print(f"\n🎯 Found {len(matches)} similar faces:")
|
|
for match in matches:
|
|
person_name = "Unknown" if match.get('person_id') is None else f"Person ID {match.get('person_id')}"
|
|
print(f" 📸 {match.get('filename', 'Unknown')} - {person_name} (confidence: {(1-match.get('distance', 1)):.1%})")
|
|
else:
|
|
print("🔍 No similar faces found")
|
|
else:
|
|
print("❌ Please specify a face ID number to find matches for")
|
|
|
|
elif args.command == 'auto-match':
|
|
show_faces = getattr(args, 'show_faces', False)
|
|
include_twins = getattr(args, 'include_twins', False)
|
|
tagger.auto_identify_matches(args.tolerance, not args.auto, show_faces, include_twins)
|
|
|
|
elif args.command == 'modifyidentified':
|
|
tagger.modifyidentified()
|
|
|
|
elif args.command == 'tag-manager':
|
|
tagger.tag_management()
|
|
|
|
return 0
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Interrupted by user")
|
|
return 1
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
if args.debug:
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
finally:
|
|
# Always cleanup resources
|
|
tagger.cleanup()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|