480 lines
17 KiB
Python
480 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PunimTag CLI - Minimal Photo Face Tagger
|
|
Simple command-line tool for face recognition and photo tagging
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
import argparse
|
|
import face_recognition
|
|
from pathlib import Path
|
|
from PIL import Image
|
|
import pickle
|
|
import numpy as np
|
|
from typing import List, Dict, Tuple, Optional
|
|
import sys
|
|
|
|
|
|
class PhotoTagger:
|
|
def __init__(self, db_path: str = "photos.db"):
|
|
"""Initialize the photo tagger with database"""
|
|
self.db_path = db_path
|
|
self.init_database()
|
|
|
|
def init_database(self):
|
|
"""Create database tables if they don't exist"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Photos table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS photos (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
path TEXT UNIQUE NOT NULL,
|
|
filename TEXT NOT NULL,
|
|
date_added DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
processed BOOLEAN DEFAULT 0
|
|
)
|
|
''')
|
|
|
|
# People table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS people (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL,
|
|
created_date DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
# Faces table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS faces (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
photo_id INTEGER NOT NULL,
|
|
person_id INTEGER,
|
|
encoding BLOB NOT NULL,
|
|
location TEXT NOT NULL,
|
|
confidence REAL DEFAULT 0.0,
|
|
FOREIGN KEY (photo_id) REFERENCES photos (id),
|
|
FOREIGN KEY (person_id) REFERENCES people (id)
|
|
)
|
|
''')
|
|
|
|
# Tags table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS tags (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
photo_id INTEGER NOT NULL,
|
|
tag_name TEXT NOT NULL,
|
|
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (photo_id) REFERENCES photos (id)
|
|
)
|
|
''')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"✅ Database initialized: {self.db_path}")
|
|
|
|
def scan_folder(self, folder_path: str, recursive: bool = True) -> int:
|
|
"""Scan folder for photos and add to database"""
|
|
if not os.path.exists(folder_path):
|
|
print(f"❌ Folder not found: {folder_path}")
|
|
return 0
|
|
|
|
photo_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff'}
|
|
found_photos = []
|
|
|
|
if recursive:
|
|
for root, dirs, files in os.walk(folder_path):
|
|
for file in files:
|
|
if Path(file).suffix.lower() in photo_extensions:
|
|
photo_path = os.path.join(root, file)
|
|
found_photos.append((photo_path, file))
|
|
else:
|
|
for file in os.listdir(folder_path):
|
|
if Path(file).suffix.lower() in photo_extensions:
|
|
photo_path = os.path.join(folder_path, file)
|
|
found_photos.append((photo_path, file))
|
|
|
|
if not found_photos:
|
|
print(f"📁 No photos found in {folder_path}")
|
|
return 0
|
|
|
|
# Add to database
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
added_count = 0
|
|
|
|
for photo_path, filename in found_photos:
|
|
try:
|
|
cursor.execute(
|
|
'INSERT OR IGNORE INTO photos (path, filename) VALUES (?, ?)',
|
|
(photo_path, filename)
|
|
)
|
|
if cursor.rowcount > 0:
|
|
added_count += 1
|
|
except Exception as e:
|
|
print(f"⚠️ Error adding {filename}: {e}")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"📁 Found {len(found_photos)} photos, added {added_count} new photos")
|
|
return added_count
|
|
|
|
def process_faces(self, limit: int = 50, model: str = "hog") -> int:
|
|
"""Process unprocessed photos for faces"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
'SELECT id, path, filename FROM photos WHERE processed = 0 LIMIT ?',
|
|
(limit,)
|
|
)
|
|
unprocessed = cursor.fetchall()
|
|
|
|
if not unprocessed:
|
|
print("✅ No unprocessed photos found")
|
|
conn.close()
|
|
return 0
|
|
|
|
print(f"🔍 Processing {len(unprocessed)} photos for faces...")
|
|
processed_count = 0
|
|
|
|
for photo_id, photo_path, filename in unprocessed:
|
|
if not os.path.exists(photo_path):
|
|
print(f"❌ File not found: {filename}")
|
|
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
|
|
continue
|
|
|
|
try:
|
|
# Load image and find faces
|
|
print(f"📸 Processing: {filename}")
|
|
image = face_recognition.load_image_file(photo_path)
|
|
face_locations = face_recognition.face_locations(image, model=model)
|
|
|
|
if face_locations:
|
|
face_encodings = face_recognition.face_encodings(image, face_locations)
|
|
print(f" 👤 Found {len(face_locations)} faces")
|
|
|
|
# Save faces to database
|
|
for encoding, location in zip(face_encodings, face_locations):
|
|
cursor.execute(
|
|
'INSERT INTO faces (photo_id, encoding, location) VALUES (?, ?, ?)',
|
|
(photo_id, encoding.tobytes(), str(location))
|
|
)
|
|
else:
|
|
print(f" 👤 No faces found")
|
|
|
|
# Mark as processed
|
|
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
|
|
processed_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error processing {filename}: {e}")
|
|
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ Processed {processed_count} photos")
|
|
return processed_count
|
|
|
|
def identify_faces(self, batch_size: int = 20) -> int:
|
|
"""Interactive face identification"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT f.id, f.photo_id, p.path, p.filename, f.location
|
|
FROM faces f
|
|
JOIN photos p ON f.photo_id = p.id
|
|
WHERE f.person_id IS NULL
|
|
LIMIT ?
|
|
''', (batch_size,))
|
|
|
|
unidentified = cursor.fetchall()
|
|
|
|
if not unidentified:
|
|
print("🎉 All faces have been identified!")
|
|
conn.close()
|
|
return 0
|
|
|
|
print(f"\n👤 Found {len(unidentified)} unidentified faces")
|
|
print("Commands: [name] = identify, 's' = skip, 'q' = quit, 'list' = show people\n")
|
|
|
|
identified_count = 0
|
|
|
|
for i, (face_id, photo_id, photo_path, filename, location) in enumerate(unidentified):
|
|
print(f"\n--- Face {i+1}/{len(unidentified)} ---")
|
|
print(f"📁 Photo: {filename}")
|
|
print(f"📍 Face location: {location}")
|
|
|
|
while True:
|
|
command = input("👤 Person name (or command): ").strip()
|
|
|
|
if command.lower() == 'q':
|
|
print("Quitting...")
|
|
conn.close()
|
|
return identified_count
|
|
|
|
elif command.lower() == 's':
|
|
print("⏭️ Skipped")
|
|
break
|
|
|
|
elif command.lower() == 'list':
|
|
self._show_people_list(cursor)
|
|
continue
|
|
|
|
elif command:
|
|
try:
|
|
# Add person if doesn't exist
|
|
cursor.execute('INSERT OR IGNORE INTO people (name) VALUES (?)', (command,))
|
|
cursor.execute('SELECT id FROM people WHERE name = ?', (command,))
|
|
person_id = cursor.fetchone()[0]
|
|
|
|
# Assign face to person
|
|
cursor.execute(
|
|
'UPDATE faces SET person_id = ? WHERE id = ?',
|
|
(person_id, face_id)
|
|
)
|
|
|
|
print(f"✅ Identified as: {command}")
|
|
identified_count += 1
|
|
break
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
else:
|
|
print("Please enter a name, 's' to skip, 'q' to quit, or 'list' to see people")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"\n✅ Identified {identified_count} faces")
|
|
return identified_count
|
|
|
|
def _show_people_list(self, cursor):
|
|
"""Show list of known people"""
|
|
cursor.execute('SELECT name FROM people ORDER BY name')
|
|
people = cursor.fetchall()
|
|
if people:
|
|
print("👥 Known people:", ", ".join([p[0] for p in people]))
|
|
else:
|
|
print("👥 No people identified yet")
|
|
|
|
def add_tags(self, photo_pattern: str = None, batch_size: int = 10) -> int:
|
|
"""Add custom tags to photos"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
if photo_pattern:
|
|
cursor.execute(
|
|
'SELECT id, filename FROM photos WHERE filename LIKE ? LIMIT ?',
|
|
(f'%{photo_pattern}%', batch_size)
|
|
)
|
|
else:
|
|
cursor.execute('SELECT id, filename FROM photos LIMIT ?', (batch_size,))
|
|
|
|
photos = cursor.fetchall()
|
|
|
|
if not photos:
|
|
print("No photos found")
|
|
conn.close()
|
|
return 0
|
|
|
|
print(f"🏷️ Tagging {len(photos)} photos (enter comma-separated tags)")
|
|
tagged_count = 0
|
|
|
|
for photo_id, filename in photos:
|
|
print(f"\n📸 {filename}")
|
|
tags_input = input("🏷️ Tags: ").strip()
|
|
|
|
if tags_input.lower() == 'q':
|
|
break
|
|
|
|
if tags_input:
|
|
tags = [tag.strip() for tag in tags_input.split(',') if tag.strip()]
|
|
for tag in tags:
|
|
cursor.execute(
|
|
'INSERT INTO tags (photo_id, tag_name) VALUES (?, ?)',
|
|
(photo_id, tag)
|
|
)
|
|
print(f" ✅ Added {len(tags)} tags")
|
|
tagged_count += 1
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ Tagged {tagged_count} photos")
|
|
return tagged_count
|
|
|
|
def stats(self) -> Dict:
|
|
"""Show database statistics"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
stats = {}
|
|
|
|
# Basic counts
|
|
cursor.execute('SELECT COUNT(*) FROM photos')
|
|
stats['total_photos'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM photos WHERE processed = 1')
|
|
stats['processed_photos'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM faces')
|
|
stats['total_faces'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL')
|
|
stats['identified_faces'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM people')
|
|
stats['total_people'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(DISTINCT tag_name) FROM tags')
|
|
stats['unique_tags'] = cursor.fetchone()[0]
|
|
|
|
# Top people
|
|
cursor.execute('''
|
|
SELECT p.name, COUNT(f.id) as face_count
|
|
FROM people p
|
|
LEFT JOIN faces f ON p.id = f.person_id
|
|
GROUP BY p.id
|
|
ORDER BY face_count DESC
|
|
LIMIT 5
|
|
''')
|
|
stats['top_people'] = cursor.fetchall()
|
|
|
|
conn.close()
|
|
|
|
# Display stats
|
|
print(f"\n📊 Database Statistics")
|
|
print("=" * 40)
|
|
print(f"Photos: {stats['processed_photos']}/{stats['total_photos']} processed")
|
|
print(f"Faces: {stats['identified_faces']}/{stats['total_faces']} identified")
|
|
print(f"People: {stats['total_people']} unique")
|
|
print(f"Tags: {stats['unique_tags']} unique")
|
|
|
|
if stats['top_people']:
|
|
print(f"\n👥 Top People:")
|
|
for name, count in stats['top_people']:
|
|
print(f" {name}: {count} faces")
|
|
|
|
unidentified = stats['total_faces'] - stats['identified_faces']
|
|
if unidentified > 0:
|
|
print(f"\n⚠️ {unidentified} faces still need identification")
|
|
|
|
return stats
|
|
|
|
def search_faces(self, person_name: str) -> List[str]:
|
|
"""Search for photos containing a specific person"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT DISTINCT p.filename, p.path
|
|
FROM photos p
|
|
JOIN faces f ON p.id = f.photo_id
|
|
JOIN people pe ON f.person_id = pe.id
|
|
WHERE pe.name LIKE ?
|
|
''', (f'%{person_name}%',))
|
|
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
|
|
if results:
|
|
print(f"\n🔍 Found {len(results)} photos with '{person_name}':")
|
|
for filename, path in results:
|
|
print(f" 📸 {filename}")
|
|
else:
|
|
print(f"🔍 No photos found with '{person_name}'")
|
|
|
|
return [path for filename, path in results]
|
|
|
|
|
|
def main():
|
|
"""Main CLI interface"""
|
|
parser = argparse.ArgumentParser(
|
|
description="PunimTag CLI - Simple photo face tagger",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
photo_tagger.py scan /path/to/photos # Scan folder for photos
|
|
photo_tagger.py process --limit 20 # Process 20 photos for faces
|
|
photo_tagger.py identify --batch 10 # Identify 10 faces interactively
|
|
photo_tagger.py tag --pattern "vacation" # Tag photos matching pattern
|
|
photo_tagger.py search "John" # Find photos with John
|
|
photo_tagger.py stats # Show statistics
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('command',
|
|
choices=['scan', 'process', 'identify', 'tag', 'search', 'stats'],
|
|
help='Command to execute')
|
|
|
|
parser.add_argument('target', nargs='?',
|
|
help='Target folder (scan), person name (search), or pattern (tag)')
|
|
|
|
parser.add_argument('--db', default='photos.db',
|
|
help='Database file path (default: photos.db)')
|
|
|
|
parser.add_argument('--limit', type=int, default=50,
|
|
help='Batch size limit for processing (default: 50)')
|
|
|
|
parser.add_argument('--batch', type=int, default=20,
|
|
help='Batch size for identification (default: 20)')
|
|
|
|
parser.add_argument('--pattern',
|
|
help='Pattern for filtering photos when tagging')
|
|
|
|
parser.add_argument('--model', choices=['hog', 'cnn'], default='hog',
|
|
help='Face detection model: hog (faster) or cnn (more accurate)')
|
|
|
|
parser.add_argument('--recursive', action='store_true',
|
|
help='Scan folders recursively')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize tagger
|
|
tagger = PhotoTagger(args.db)
|
|
|
|
try:
|
|
if args.command == 'scan':
|
|
if not args.target:
|
|
print("❌ Please specify a folder to scan")
|
|
return 1
|
|
tagger.scan_folder(args.target, args.recursive)
|
|
|
|
elif args.command == 'process':
|
|
tagger.process_faces(args.limit, args.model)
|
|
|
|
elif args.command == 'identify':
|
|
tagger.identify_faces(args.batch)
|
|
|
|
elif args.command == 'tag':
|
|
tagger.add_tags(args.pattern or args.target, args.batch)
|
|
|
|
elif args.command == 'search':
|
|
if not args.target:
|
|
print("❌ Please specify a person name to search for")
|
|
return 1
|
|
tagger.search_faces(args.target)
|
|
|
|
elif args.command == 'stats':
|
|
tagger.stats()
|
|
|
|
return 0
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Interrupted by user")
|
|
return 1
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|