#!/usr/bin/env python3 """ PunimTag CLI - Minimal Photo Face Tagger Simple command-line tool for face recognition and photo tagging """ import os import sqlite3 import argparse import face_recognition from pathlib import Path from PIL import Image import pickle import numpy as np from typing import List, Dict, Tuple, Optional import sys class PhotoTagger: def __init__(self, db_path: str = "photos.db"): """Initialize the photo tagger with database""" self.db_path = db_path self.init_database() def init_database(self): """Create database tables if they don't exist""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Photos table cursor.execute(''' CREATE TABLE IF NOT EXISTS photos ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT UNIQUE NOT NULL, filename TEXT NOT NULL, date_added DATETIME DEFAULT CURRENT_TIMESTAMP, processed BOOLEAN DEFAULT 0 ) ''') # People table cursor.execute(''' CREATE TABLE IF NOT EXISTS people ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, created_date DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # Faces table cursor.execute(''' CREATE TABLE IF NOT EXISTS faces ( id INTEGER PRIMARY KEY AUTOINCREMENT, photo_id INTEGER NOT NULL, person_id INTEGER, encoding BLOB NOT NULL, location TEXT NOT NULL, confidence REAL DEFAULT 0.0, FOREIGN KEY (photo_id) REFERENCES photos (id), FOREIGN KEY (person_id) REFERENCES people (id) ) ''') # Tags table cursor.execute(''' CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, photo_id INTEGER NOT NULL, tag_name TEXT NOT NULL, created_date DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (photo_id) REFERENCES photos (id) ) ''') conn.commit() conn.close() print(f"āœ… Database initialized: {self.db_path}") def scan_folder(self, folder_path: str, recursive: bool = True) -> int: """Scan folder for photos and add to database""" if not os.path.exists(folder_path): print(f"āŒ Folder not found: {folder_path}") return 0 photo_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff'} found_photos = [] if recursive: for root, dirs, files in os.walk(folder_path): for file in files: if Path(file).suffix.lower() in photo_extensions: photo_path = os.path.join(root, file) found_photos.append((photo_path, file)) else: for file in os.listdir(folder_path): if Path(file).suffix.lower() in photo_extensions: photo_path = os.path.join(folder_path, file) found_photos.append((photo_path, file)) if not found_photos: print(f"šŸ“ No photos found in {folder_path}") return 0 # Add to database conn = sqlite3.connect(self.db_path) cursor = conn.cursor() added_count = 0 for photo_path, filename in found_photos: try: cursor.execute( 'INSERT OR IGNORE INTO photos (path, filename) VALUES (?, ?)', (photo_path, filename) ) if cursor.rowcount > 0: added_count += 1 except Exception as e: print(f"āš ļø Error adding {filename}: {e}") conn.commit() conn.close() print(f"šŸ“ Found {len(found_photos)} photos, added {added_count} new photos") return added_count def process_faces(self, limit: int = 50, model: str = "hog") -> int: """Process unprocessed photos for faces""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( 'SELECT id, path, filename FROM photos WHERE processed = 0 LIMIT ?', (limit,) ) unprocessed = cursor.fetchall() if not unprocessed: print("āœ… No unprocessed photos found") conn.close() return 0 print(f"šŸ” Processing {len(unprocessed)} photos for faces...") processed_count = 0 for photo_id, photo_path, filename in unprocessed: if not os.path.exists(photo_path): print(f"āŒ File not found: {filename}") cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,)) continue try: # Load image and find faces print(f"šŸ“ø Processing: {filename}") image = face_recognition.load_image_file(photo_path) face_locations = face_recognition.face_locations(image, model=model) if face_locations: face_encodings = face_recognition.face_encodings(image, face_locations) print(f" šŸ‘¤ Found {len(face_locations)} faces") # Save faces to database for encoding, location in zip(face_encodings, face_locations): cursor.execute( 'INSERT INTO faces (photo_id, encoding, location) VALUES (?, ?, ?)', (photo_id, encoding.tobytes(), str(location)) ) else: print(f" šŸ‘¤ No faces found") # Mark as processed cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,)) processed_count += 1 except Exception as e: print(f"āŒ Error processing {filename}: {e}") cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,)) conn.commit() conn.close() print(f"āœ… Processed {processed_count} photos") return processed_count def identify_faces(self, batch_size: int = 20) -> int: """Interactive face identification""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT f.id, f.photo_id, p.path, p.filename, f.location FROM faces f JOIN photos p ON f.photo_id = p.id WHERE f.person_id IS NULL LIMIT ? ''', (batch_size,)) unidentified = cursor.fetchall() if not unidentified: print("šŸŽ‰ All faces have been identified!") conn.close() return 0 print(f"\nšŸ‘¤ Found {len(unidentified)} unidentified faces") print("Commands: [name] = identify, 's' = skip, 'q' = quit, 'list' = show people\n") identified_count = 0 for i, (face_id, photo_id, photo_path, filename, location) in enumerate(unidentified): print(f"\n--- Face {i+1}/{len(unidentified)} ---") print(f"šŸ“ Photo: {filename}") print(f"šŸ“ Face location: {location}") while True: command = input("šŸ‘¤ Person name (or command): ").strip() if command.lower() == 'q': print("Quitting...") conn.close() return identified_count elif command.lower() == 's': print("ā­ļø Skipped") break elif command.lower() == 'list': self._show_people_list(cursor) continue elif command: try: # Add person if doesn't exist cursor.execute('INSERT OR IGNORE INTO people (name) VALUES (?)', (command,)) cursor.execute('SELECT id FROM people WHERE name = ?', (command,)) person_id = cursor.fetchone()[0] # Assign face to person cursor.execute( 'UPDATE faces SET person_id = ? WHERE id = ?', (person_id, face_id) ) print(f"āœ… Identified as: {command}") identified_count += 1 break except Exception as e: print(f"āŒ Error: {e}") else: print("Please enter a name, 's' to skip, 'q' to quit, or 'list' to see people") conn.commit() conn.close() print(f"\nāœ… Identified {identified_count} faces") return identified_count def _show_people_list(self, cursor): """Show list of known people""" cursor.execute('SELECT name FROM people ORDER BY name') people = cursor.fetchall() if people: print("šŸ‘„ Known people:", ", ".join([p[0] for p in people])) else: print("šŸ‘„ No people identified yet") def add_tags(self, photo_pattern: str = None, batch_size: int = 10) -> int: """Add custom tags to photos""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if photo_pattern: cursor.execute( 'SELECT id, filename FROM photos WHERE filename LIKE ? LIMIT ?', (f'%{photo_pattern}%', batch_size) ) else: cursor.execute('SELECT id, filename FROM photos LIMIT ?', (batch_size,)) photos = cursor.fetchall() if not photos: print("No photos found") conn.close() return 0 print(f"šŸ·ļø Tagging {len(photos)} photos (enter comma-separated tags)") tagged_count = 0 for photo_id, filename in photos: print(f"\nšŸ“ø {filename}") tags_input = input("šŸ·ļø Tags: ").strip() if tags_input.lower() == 'q': break if tags_input: tags = [tag.strip() for tag in tags_input.split(',') if tag.strip()] for tag in tags: cursor.execute( 'INSERT INTO tags (photo_id, tag_name) VALUES (?, ?)', (photo_id, tag) ) print(f" āœ… Added {len(tags)} tags") tagged_count += 1 conn.commit() conn.close() print(f"āœ… Tagged {tagged_count} photos") return tagged_count def stats(self) -> Dict: """Show database statistics""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() stats = {} # Basic counts cursor.execute('SELECT COUNT(*) FROM photos') stats['total_photos'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM photos WHERE processed = 1') stats['processed_photos'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM faces') stats['total_faces'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL') stats['identified_faces'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM people') stats['total_people'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(DISTINCT tag_name) FROM tags') stats['unique_tags'] = cursor.fetchone()[0] # Top people cursor.execute(''' SELECT p.name, COUNT(f.id) as face_count FROM people p LEFT JOIN faces f ON p.id = f.person_id GROUP BY p.id ORDER BY face_count DESC LIMIT 5 ''') stats['top_people'] = cursor.fetchall() conn.close() # Display stats print(f"\nšŸ“Š Database Statistics") print("=" * 40) print(f"Photos: {stats['processed_photos']}/{stats['total_photos']} processed") print(f"Faces: {stats['identified_faces']}/{stats['total_faces']} identified") print(f"People: {stats['total_people']} unique") print(f"Tags: {stats['unique_tags']} unique") if stats['top_people']: print(f"\nšŸ‘„ Top People:") for name, count in stats['top_people']: print(f" {name}: {count} faces") unidentified = stats['total_faces'] - stats['identified_faces'] if unidentified > 0: print(f"\nāš ļø {unidentified} faces still need identification") return stats def search_faces(self, person_name: str) -> List[str]: """Search for photos containing a specific person""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT DISTINCT p.filename, p.path FROM photos p JOIN faces f ON p.id = f.photo_id JOIN people pe ON f.person_id = pe.id WHERE pe.name LIKE ? ''', (f'%{person_name}%',)) results = cursor.fetchall() conn.close() if results: print(f"\nšŸ” Found {len(results)} photos with '{person_name}':") for filename, path in results: print(f" šŸ“ø {filename}") else: print(f"šŸ” No photos found with '{person_name}'") return [path for filename, path in results] def main(): """Main CLI interface""" parser = argparse.ArgumentParser( description="PunimTag CLI - Simple photo face tagger", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: photo_tagger.py scan /path/to/photos # Scan folder for photos photo_tagger.py process --limit 20 # Process 20 photos for faces photo_tagger.py identify --batch 10 # Identify 10 faces interactively photo_tagger.py tag --pattern "vacation" # Tag photos matching pattern photo_tagger.py search "John" # Find photos with John photo_tagger.py stats # Show statistics """ ) parser.add_argument('command', choices=['scan', 'process', 'identify', 'tag', 'search', 'stats'], help='Command to execute') parser.add_argument('target', nargs='?', help='Target folder (scan), person name (search), or pattern (tag)') parser.add_argument('--db', default='photos.db', help='Database file path (default: photos.db)') parser.add_argument('--limit', type=int, default=50, help='Batch size limit for processing (default: 50)') parser.add_argument('--batch', type=int, default=20, help='Batch size for identification (default: 20)') parser.add_argument('--pattern', help='Pattern for filtering photos when tagging') parser.add_argument('--model', choices=['hog', 'cnn'], default='hog', help='Face detection model: hog (faster) or cnn (more accurate)') parser.add_argument('--recursive', action='store_true', help='Scan folders recursively') args = parser.parse_args() # Initialize tagger tagger = PhotoTagger(args.db) try: if args.command == 'scan': if not args.target: print("āŒ Please specify a folder to scan") return 1 tagger.scan_folder(args.target, args.recursive) elif args.command == 'process': tagger.process_faces(args.limit, args.model) elif args.command == 'identify': tagger.identify_faces(args.batch) elif args.command == 'tag': tagger.add_tags(args.pattern or args.target, args.batch) elif args.command == 'search': if not args.target: print("āŒ Please specify a person name to search for") return 1 tagger.search_faces(args.target) elif args.command == 'stats': tagger.stats() return 0 except KeyboardInterrupt: print("\n\nāš ļø Interrupted by user") return 1 except Exception as e: print(f"āŒ Error: {e}") return 1 if __name__ == "__main__": sys.exit(main())