944 lines
39 KiB
Python
944 lines
39 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PunimTag CLI - Minimal Photo Face Tagger
|
|
Simple command-line tool for face recognition and photo tagging
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
import argparse
|
|
import face_recognition
|
|
from pathlib import Path
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
import pickle
|
|
import numpy as np
|
|
from typing import List, Dict, Tuple, Optional
|
|
import sys
|
|
import tempfile
|
|
import subprocess
|
|
|
|
|
|
class PhotoTagger:
|
|
def __init__(self, db_path: str = "photos.db", verbose: int = 0):
|
|
"""Initialize the photo tagger with database"""
|
|
self.db_path = db_path
|
|
self.verbose = verbose
|
|
self.init_database()
|
|
|
|
def init_database(self):
|
|
"""Create database tables if they don't exist"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Photos table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS photos (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
path TEXT UNIQUE NOT NULL,
|
|
filename TEXT NOT NULL,
|
|
date_added DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
processed BOOLEAN DEFAULT 0
|
|
)
|
|
''')
|
|
|
|
# People table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS people (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL,
|
|
created_date DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
# Faces table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS faces (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
photo_id INTEGER NOT NULL,
|
|
person_id INTEGER,
|
|
encoding BLOB NOT NULL,
|
|
location TEXT NOT NULL,
|
|
confidence REAL DEFAULT 0.0,
|
|
FOREIGN KEY (photo_id) REFERENCES photos (id),
|
|
FOREIGN KEY (person_id) REFERENCES people (id)
|
|
)
|
|
''')
|
|
|
|
# Tags table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS tags (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
photo_id INTEGER NOT NULL,
|
|
tag_name TEXT NOT NULL,
|
|
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (photo_id) REFERENCES photos (id)
|
|
)
|
|
''')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
if self.verbose >= 1:
|
|
print(f"✅ Database initialized: {self.db_path}")
|
|
|
|
def scan_folder(self, folder_path: str, recursive: bool = True) -> int:
|
|
"""Scan folder for photos and add to database"""
|
|
if not os.path.exists(folder_path):
|
|
print(f"❌ Folder not found: {folder_path}")
|
|
return 0
|
|
|
|
photo_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff'}
|
|
found_photos = []
|
|
|
|
if recursive:
|
|
for root, dirs, files in os.walk(folder_path):
|
|
for file in files:
|
|
if Path(file).suffix.lower() in photo_extensions:
|
|
photo_path = os.path.join(root, file)
|
|
found_photos.append((photo_path, file))
|
|
else:
|
|
for file in os.listdir(folder_path):
|
|
if Path(file).suffix.lower() in photo_extensions:
|
|
photo_path = os.path.join(folder_path, file)
|
|
found_photos.append((photo_path, file))
|
|
|
|
if not found_photos:
|
|
print(f"📁 No photos found in {folder_path}")
|
|
return 0
|
|
|
|
# Add to database
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
added_count = 0
|
|
|
|
for photo_path, filename in found_photos:
|
|
try:
|
|
cursor.execute(
|
|
'INSERT OR IGNORE INTO photos (path, filename) VALUES (?, ?)',
|
|
(photo_path, filename)
|
|
)
|
|
if cursor.rowcount > 0:
|
|
added_count += 1
|
|
if self.verbose >= 2:
|
|
print(f" 📸 Added: {filename}")
|
|
elif self.verbose >= 3:
|
|
print(f" 📸 Already exists: {filename}")
|
|
except Exception as e:
|
|
print(f"⚠️ Error adding {filename}: {e}")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"📁 Found {len(found_photos)} photos, added {added_count} new photos")
|
|
return added_count
|
|
|
|
def process_faces(self, limit: int = 50, model: str = "hog") -> int:
|
|
"""Process unprocessed photos for faces"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
'SELECT id, path, filename FROM photos WHERE processed = 0 LIMIT ?',
|
|
(limit,)
|
|
)
|
|
unprocessed = cursor.fetchall()
|
|
|
|
if not unprocessed:
|
|
print("✅ No unprocessed photos found")
|
|
conn.close()
|
|
return 0
|
|
|
|
print(f"🔍 Processing {len(unprocessed)} photos for faces...")
|
|
processed_count = 0
|
|
|
|
for photo_id, photo_path, filename in unprocessed:
|
|
if not os.path.exists(photo_path):
|
|
print(f"❌ File not found: {filename}")
|
|
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
|
|
continue
|
|
|
|
try:
|
|
# Load image and find faces
|
|
if self.verbose >= 1:
|
|
print(f"📸 Processing: {filename}")
|
|
elif self.verbose == 0:
|
|
print(".", end="", flush=True)
|
|
|
|
if self.verbose >= 2:
|
|
print(f" 🔍 Loading image: {photo_path}")
|
|
|
|
image = face_recognition.load_image_file(photo_path)
|
|
face_locations = face_recognition.face_locations(image, model=model)
|
|
|
|
if face_locations:
|
|
face_encodings = face_recognition.face_encodings(image, face_locations)
|
|
if self.verbose >= 1:
|
|
print(f" 👤 Found {len(face_locations)} faces")
|
|
|
|
# Save faces to database
|
|
for i, (encoding, location) in enumerate(zip(face_encodings, face_locations)):
|
|
cursor.execute(
|
|
'INSERT INTO faces (photo_id, encoding, location) VALUES (?, ?, ?)',
|
|
(photo_id, encoding.tobytes(), str(location))
|
|
)
|
|
if self.verbose >= 3:
|
|
print(f" Face {i+1}: {location}")
|
|
else:
|
|
if self.verbose >= 1:
|
|
print(f" 👤 No faces found")
|
|
elif self.verbose >= 2:
|
|
print(f" 👤 {filename}: No faces found")
|
|
|
|
# Mark as processed
|
|
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
|
|
processed_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error processing {filename}: {e}")
|
|
cursor.execute('UPDATE photos SET processed = 1 WHERE id = ?', (photo_id,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
if self.verbose == 0:
|
|
print() # New line after dots
|
|
print(f"✅ Processed {processed_count} photos")
|
|
return processed_count
|
|
|
|
def identify_faces(self, batch_size: int = 20, show_faces: bool = False) -> int:
|
|
"""Interactive face identification"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT f.id, f.photo_id, p.path, p.filename, f.location
|
|
FROM faces f
|
|
JOIN photos p ON f.photo_id = p.id
|
|
WHERE f.person_id IS NULL
|
|
LIMIT ?
|
|
''', (batch_size,))
|
|
|
|
unidentified = cursor.fetchall()
|
|
|
|
if not unidentified:
|
|
print("🎉 All faces have been identified!")
|
|
conn.close()
|
|
return 0
|
|
|
|
print(f"\n👤 Found {len(unidentified)} unidentified faces")
|
|
print("Commands: [name] = identify, 's' = skip, 'q' = quit, 'list' = show people\n")
|
|
|
|
identified_count = 0
|
|
|
|
for i, (face_id, photo_id, photo_path, filename, location) in enumerate(unidentified):
|
|
print(f"\n--- Face {i+1}/{len(unidentified)} ---")
|
|
print(f"📁 Photo: {filename}")
|
|
print(f"📍 Face location: {location}")
|
|
|
|
# Extract and display face crop if enabled
|
|
face_crop_path = None
|
|
if show_faces:
|
|
face_crop_path = self._extract_face_crop(photo_path, location, face_id)
|
|
if face_crop_path:
|
|
print(f"🖼️ Face crop saved: {face_crop_path}")
|
|
try:
|
|
# Try to open the face crop with feh
|
|
subprocess.run(['feh', '--title', f'Face {i+1}/{len(unidentified)} - {filename}', face_crop_path],
|
|
check=False, capture_output=True)
|
|
except:
|
|
print(f"💡 Open face crop manually: feh {face_crop_path}")
|
|
else:
|
|
print("💡 Use --show-faces flag to display individual face crops")
|
|
|
|
while True:
|
|
command = input("👤 Person name (or command): ").strip()
|
|
|
|
if command.lower() == 'q':
|
|
print("Quitting...")
|
|
conn.close()
|
|
return identified_count
|
|
|
|
elif command.lower() == 's':
|
|
print("⏭️ Skipped")
|
|
# Clean up temporary face crop
|
|
if face_crop_path and os.path.exists(face_crop_path):
|
|
try:
|
|
os.remove(face_crop_path)
|
|
except:
|
|
pass # Ignore cleanup errors
|
|
break
|
|
|
|
elif command.lower() == 'list':
|
|
self._show_people_list(cursor)
|
|
continue
|
|
|
|
elif command:
|
|
try:
|
|
# Add person if doesn't exist
|
|
cursor.execute('INSERT OR IGNORE INTO people (name) VALUES (?)', (command,))
|
|
cursor.execute('SELECT id FROM people WHERE name = ?', (command,))
|
|
person_id = cursor.fetchone()[0]
|
|
|
|
# Assign face to person
|
|
cursor.execute(
|
|
'UPDATE faces SET person_id = ? WHERE id = ?',
|
|
(person_id, face_id)
|
|
)
|
|
|
|
print(f"✅ Identified as: {command}")
|
|
identified_count += 1
|
|
|
|
# Clean up temporary face crop
|
|
if face_crop_path and os.path.exists(face_crop_path):
|
|
try:
|
|
os.remove(face_crop_path)
|
|
except:
|
|
pass # Ignore cleanup errors
|
|
break
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
else:
|
|
print("Please enter a name, 's' to skip, 'q' to quit, or 'list' to see people")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"\n✅ Identified {identified_count} faces")
|
|
return identified_count
|
|
|
|
def _extract_face_crop(self, photo_path: str, location: tuple, face_id: int) -> str:
|
|
"""Extract and save individual face crop for identification"""
|
|
try:
|
|
# Parse location tuple from string format
|
|
if isinstance(location, str):
|
|
location = eval(location)
|
|
|
|
top, right, bottom, left = location
|
|
|
|
# Load the image
|
|
image = Image.open(photo_path)
|
|
|
|
# Add padding around the face (20% of face size)
|
|
face_width = right - left
|
|
face_height = bottom - top
|
|
padding_x = int(face_width * 0.2)
|
|
padding_y = int(face_height * 0.2)
|
|
|
|
# Calculate crop bounds with padding
|
|
crop_left = max(0, left - padding_x)
|
|
crop_top = max(0, top - padding_y)
|
|
crop_right = min(image.width, right + padding_x)
|
|
crop_bottom = min(image.height, bottom + padding_y)
|
|
|
|
# Crop the face
|
|
face_crop = image.crop((crop_left, crop_top, crop_right, crop_bottom))
|
|
|
|
# Create temporary file for the face crop
|
|
temp_dir = tempfile.gettempdir()
|
|
face_filename = f"face_{face_id}_crop.jpg"
|
|
face_path = os.path.join(temp_dir, face_filename)
|
|
|
|
# Resize for better viewing (minimum 200px width)
|
|
if face_crop.width < 200:
|
|
ratio = 200 / face_crop.width
|
|
new_width = 200
|
|
new_height = int(face_crop.height * ratio)
|
|
face_crop = face_crop.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
|
face_crop.save(face_path, "JPEG", quality=95)
|
|
return face_path
|
|
|
|
except Exception as e:
|
|
if self.verbose >= 1:
|
|
print(f"⚠️ Could not extract face crop: {e}")
|
|
return None
|
|
|
|
def _create_comparison_image(self, unid_crop_path: str, match_crop_path: str, person_name: str, confidence: float) -> str:
|
|
"""Create a side-by-side comparison image"""
|
|
try:
|
|
# Load both face crops
|
|
unid_img = Image.open(unid_crop_path)
|
|
match_img = Image.open(match_crop_path)
|
|
|
|
# Resize both to same height for better comparison
|
|
target_height = 300
|
|
unid_ratio = target_height / unid_img.height
|
|
match_ratio = target_height / match_img.height
|
|
|
|
unid_resized = unid_img.resize((int(unid_img.width * unid_ratio), target_height), Image.Resampling.LANCZOS)
|
|
match_resized = match_img.resize((int(match_img.width * match_ratio), target_height), Image.Resampling.LANCZOS)
|
|
|
|
# Create comparison image
|
|
total_width = unid_resized.width + match_resized.width + 20 # 20px gap
|
|
comparison = Image.new('RGB', (total_width, target_height + 60), 'white')
|
|
|
|
# Paste images
|
|
comparison.paste(unid_resized, (0, 30))
|
|
comparison.paste(match_resized, (unid_resized.width + 20, 30))
|
|
|
|
# Add labels
|
|
draw = ImageDraw.Draw(comparison)
|
|
try:
|
|
# Try to use a font
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 16)
|
|
except:
|
|
font = ImageFont.load_default()
|
|
|
|
draw.text((10, 5), "UNKNOWN", fill='red', font=font)
|
|
draw.text((unid_resized.width + 30, 5), f"{person_name.upper()}", fill='green', font=font)
|
|
draw.text((10, target_height + 35), f"Confidence: {confidence:.1%}", fill='blue', font=font)
|
|
|
|
# Save comparison image
|
|
temp_dir = tempfile.gettempdir()
|
|
comparison_path = os.path.join(temp_dir, f"face_comparison_{person_name}.jpg")
|
|
comparison.save(comparison_path, "JPEG", quality=95)
|
|
|
|
return comparison_path
|
|
|
|
except Exception as e:
|
|
if self.verbose >= 1:
|
|
print(f"⚠️ Could not create comparison image: {e}")
|
|
return None
|
|
|
|
def _get_confidence_description(self, confidence_pct: float) -> str:
|
|
"""Get human-readable confidence description"""
|
|
if confidence_pct >= 80:
|
|
return "🟢 (Very High - Almost Certain)"
|
|
elif confidence_pct >= 70:
|
|
return "🟡 (High - Likely Match)"
|
|
elif confidence_pct >= 60:
|
|
return "🟠 (Medium - Possible Match)"
|
|
elif confidence_pct >= 50:
|
|
return "🔴 (Low - Questionable)"
|
|
else:
|
|
return "⚫ (Very Low - Unlikely)"
|
|
|
|
def _show_people_list(self, cursor):
|
|
"""Show list of known people"""
|
|
cursor.execute('SELECT name FROM people ORDER BY name')
|
|
people = cursor.fetchall()
|
|
if people:
|
|
print("👥 Known people:", ", ".join([p[0] for p in people]))
|
|
else:
|
|
print("👥 No people identified yet")
|
|
|
|
def add_tags(self, photo_pattern: str = None, batch_size: int = 10) -> int:
|
|
"""Add custom tags to photos"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
if photo_pattern:
|
|
cursor.execute(
|
|
'SELECT id, filename FROM photos WHERE filename LIKE ? LIMIT ?',
|
|
(f'%{photo_pattern}%', batch_size)
|
|
)
|
|
else:
|
|
cursor.execute('SELECT id, filename FROM photos LIMIT ?', (batch_size,))
|
|
|
|
photos = cursor.fetchall()
|
|
|
|
if not photos:
|
|
print("No photos found")
|
|
conn.close()
|
|
return 0
|
|
|
|
print(f"🏷️ Tagging {len(photos)} photos (enter comma-separated tags)")
|
|
tagged_count = 0
|
|
|
|
for photo_id, filename in photos:
|
|
print(f"\n📸 {filename}")
|
|
tags_input = input("🏷️ Tags: ").strip()
|
|
|
|
if tags_input.lower() == 'q':
|
|
break
|
|
|
|
if tags_input:
|
|
tags = [tag.strip() for tag in tags_input.split(',') if tag.strip()]
|
|
for tag in tags:
|
|
cursor.execute(
|
|
'INSERT INTO tags (photo_id, tag_name) VALUES (?, ?)',
|
|
(photo_id, tag)
|
|
)
|
|
print(f" ✅ Added {len(tags)} tags")
|
|
tagged_count += 1
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ Tagged {tagged_count} photos")
|
|
return tagged_count
|
|
|
|
def stats(self) -> Dict:
|
|
"""Show database statistics"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
stats = {}
|
|
|
|
# Basic counts
|
|
cursor.execute('SELECT COUNT(*) FROM photos')
|
|
stats['total_photos'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM photos WHERE processed = 1')
|
|
stats['processed_photos'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM faces')
|
|
stats['total_faces'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM faces WHERE person_id IS NOT NULL')
|
|
stats['identified_faces'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(*) FROM people')
|
|
stats['total_people'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT COUNT(DISTINCT tag_name) FROM tags')
|
|
stats['unique_tags'] = cursor.fetchone()[0]
|
|
|
|
# Top people
|
|
cursor.execute('''
|
|
SELECT p.name, COUNT(f.id) as face_count
|
|
FROM people p
|
|
LEFT JOIN faces f ON p.id = f.person_id
|
|
GROUP BY p.id
|
|
ORDER BY face_count DESC
|
|
LIMIT 15
|
|
''')
|
|
stats['top_people'] = cursor.fetchall()
|
|
|
|
conn.close()
|
|
|
|
# Display stats
|
|
print(f"\n📊 Database Statistics")
|
|
print("=" * 40)
|
|
print(f"Photos: {stats['processed_photos']}/{stats['total_photos']} processed")
|
|
print(f"Faces: {stats['identified_faces']}/{stats['total_faces']} identified")
|
|
print(f"People: {stats['total_people']} unique")
|
|
print(f"Tags: {stats['unique_tags']} unique")
|
|
|
|
if stats['top_people']:
|
|
print(f"\n👥 Top People:")
|
|
for name, count in stats['top_people']:
|
|
print(f" {name}: {count} faces")
|
|
|
|
unidentified = stats['total_faces'] - stats['identified_faces']
|
|
if unidentified > 0:
|
|
print(f"\n⚠️ {unidentified} faces still need identification")
|
|
|
|
return stats
|
|
|
|
def search_faces(self, person_name: str) -> List[str]:
|
|
"""Search for photos containing a specific person"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT DISTINCT p.filename, p.path
|
|
FROM photos p
|
|
JOIN faces f ON p.id = f.photo_id
|
|
JOIN people pe ON f.person_id = pe.id
|
|
WHERE pe.name LIKE ?
|
|
''', (f'%{person_name}%',))
|
|
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
|
|
if results:
|
|
print(f"\n🔍 Found {len(results)} photos with '{person_name}':")
|
|
for filename, path in results:
|
|
print(f" 📸 {filename}")
|
|
else:
|
|
print(f"🔍 No photos found with '{person_name}'")
|
|
|
|
return [path for filename, path in results]
|
|
|
|
def find_similar_faces(self, face_id: int = None, tolerance: float = 0.6, include_same_photo: bool = False) -> List[Dict]:
|
|
"""Find similar faces across all photos"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
if face_id:
|
|
# Find faces similar to a specific face
|
|
cursor.execute('''
|
|
SELECT id, photo_id, encoding, location
|
|
FROM faces
|
|
WHERE id = ?
|
|
''', (face_id,))
|
|
target_face = cursor.fetchone()
|
|
|
|
if not target_face:
|
|
print(f"❌ Face ID {face_id} not found")
|
|
conn.close()
|
|
return []
|
|
|
|
target_encoding = np.frombuffer(target_face[2], dtype=np.float64)
|
|
|
|
# Get all other faces
|
|
cursor.execute('''
|
|
SELECT f.id, f.photo_id, f.encoding, f.location, p.filename, f.person_id
|
|
FROM faces f
|
|
JOIN photos p ON f.photo_id = p.id
|
|
WHERE f.id != ?
|
|
''', (face_id,))
|
|
|
|
else:
|
|
# Find all unidentified faces and try to match them with identified ones
|
|
cursor.execute('''
|
|
SELECT f.id, f.photo_id, f.encoding, f.location, p.filename, f.person_id
|
|
FROM faces f
|
|
JOIN photos p ON f.photo_id = p.id
|
|
ORDER BY f.id
|
|
''')
|
|
|
|
all_faces = cursor.fetchall()
|
|
matches = []
|
|
|
|
if face_id:
|
|
# Compare target face with all other faces
|
|
for face_data in all_faces:
|
|
other_id, other_photo_id, other_encoding, other_location, other_filename, other_person_id = face_data
|
|
other_enc = np.frombuffer(other_encoding, dtype=np.float64)
|
|
|
|
distance = face_recognition.face_distance([target_encoding], other_enc)[0]
|
|
if distance <= tolerance:
|
|
matches.append({
|
|
'face_id': other_id,
|
|
'photo_id': other_photo_id,
|
|
'filename': other_filename,
|
|
'location': other_location,
|
|
'distance': distance,
|
|
'person_id': other_person_id
|
|
})
|
|
|
|
# Get target photo info
|
|
cursor.execute('SELECT filename FROM photos WHERE id = ?', (target_face[1],))
|
|
target_filename = cursor.fetchone()[0]
|
|
|
|
print(f"\n🔍 Finding faces similar to face in: {target_filename}")
|
|
print(f"📍 Target face location: {target_face[3]}")
|
|
|
|
else:
|
|
# Auto-match unidentified faces with identified ones
|
|
identified_faces = [f for f in all_faces if f[5] is not None] # person_id is not None
|
|
unidentified_faces = [f for f in all_faces if f[5] is None] # person_id is None
|
|
|
|
print(f"\n🔍 Auto-matching {len(unidentified_faces)} unidentified faces with {len(identified_faces)} known faces...")
|
|
|
|
for unid_face in unidentified_faces:
|
|
unid_id, unid_photo_id, unid_encoding, unid_location, unid_filename, _ = unid_face
|
|
unid_enc = np.frombuffer(unid_encoding, dtype=np.float64)
|
|
|
|
best_match = None
|
|
best_distance = float('inf')
|
|
|
|
for id_face in identified_faces:
|
|
id_id, id_photo_id, id_encoding, id_location, id_filename, id_person_id = id_face
|
|
id_enc = np.frombuffer(id_encoding, dtype=np.float64)
|
|
|
|
# Skip if same photo (unless specifically requested for twins detection)
|
|
if not include_same_photo and unid_photo_id == id_photo_id:
|
|
continue
|
|
|
|
distance = face_recognition.face_distance([unid_enc], id_enc)[0]
|
|
if distance <= tolerance and distance < best_distance:
|
|
best_distance = distance
|
|
best_match = {
|
|
'unidentified_id': unid_id,
|
|
'unidentified_photo_id': unid_photo_id,
|
|
'unidentified_filename': unid_filename,
|
|
'unidentified_location': unid_location,
|
|
'matched_id': id_id,
|
|
'matched_photo_id': id_photo_id,
|
|
'matched_filename': id_filename,
|
|
'matched_location': id_location,
|
|
'person_id': id_person_id,
|
|
'distance': distance
|
|
}
|
|
|
|
if best_match:
|
|
matches.append(best_match)
|
|
|
|
conn.close()
|
|
return matches
|
|
|
|
def auto_identify_matches(self, tolerance: float = 0.6, confirm: bool = True, show_faces: bool = False, include_same_photo: bool = False) -> int:
|
|
"""Automatically identify faces that match already identified faces"""
|
|
matches = self.find_similar_faces(tolerance=tolerance, include_same_photo=include_same_photo)
|
|
|
|
if not matches:
|
|
print("🔍 No similar faces found for auto-identification")
|
|
return 0
|
|
|
|
print(f"\n🎯 Found {len(matches)} potential matches:")
|
|
print("📊 Confidence Guide: 🟢80%+ = Very High, 🟡70%+ = High, 🟠60%+ = Medium, 🔴50%+ = Low, ⚫<50% = Very Low")
|
|
identified_count = 0
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
for i, match in enumerate(matches):
|
|
# Get person name and photo paths
|
|
cursor.execute('SELECT name FROM people WHERE id = ?', (match['person_id'],))
|
|
person_name = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT path FROM photos WHERE id = ?', (match['matched_photo_id'],))
|
|
matched_photo_path = cursor.fetchone()[0]
|
|
|
|
cursor.execute('SELECT path FROM photos WHERE filename = ?', (match['unidentified_filename'],))
|
|
unidentified_photo_path = cursor.fetchone()[0]
|
|
|
|
print(f"\n--- Match {i+1}/{len(matches)} ---")
|
|
print(f"🆔 Unidentified face in: {match['unidentified_filename']}")
|
|
print(f"📍 Location: {match['unidentified_location']}")
|
|
print(f"👥 Potential match: {person_name}")
|
|
print(f"📸 From photo: {match['matched_filename']}")
|
|
confidence_pct = (1-match['distance']) * 100
|
|
confidence_desc = self._get_confidence_description(confidence_pct)
|
|
print(f"🎯 Confidence: {confidence_pct:.1f}% {confidence_desc} (distance: {match['distance']:.3f})")
|
|
|
|
# Show face crops if enabled
|
|
unidentified_crop_path = None
|
|
matched_crop_path = None
|
|
|
|
if show_faces:
|
|
# Extract unidentified face crop
|
|
unidentified_crop_path = self._extract_face_crop(
|
|
unidentified_photo_path,
|
|
match['unidentified_location'],
|
|
f"unid_{match['unidentified_id']}"
|
|
)
|
|
|
|
# Extract matched face crop
|
|
matched_crop_path = self._extract_face_crop(
|
|
matched_photo_path,
|
|
match['matched_location'],
|
|
f"match_{match['matched_id']}"
|
|
)
|
|
|
|
if unidentified_crop_path and matched_crop_path:
|
|
print(f"🖼️ Extracting faces for comparison...")
|
|
|
|
# Create side-by-side comparison image
|
|
comparison_path = self._create_comparison_image(
|
|
unidentified_crop_path,
|
|
matched_crop_path,
|
|
person_name,
|
|
1 - match['distance']
|
|
)
|
|
|
|
if comparison_path:
|
|
print(f"🔍 Comparison image: {comparison_path}")
|
|
try:
|
|
# Open the comparison image in background (non-blocking)
|
|
subprocess.Popen(['feh', '--title', f'Face Comparison: Unknown vs {person_name}',
|
|
comparison_path],
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
print("👀 Check the image window to compare faces!")
|
|
print("💡 The image window won't block your input - you can decide while viewing!")
|
|
except Exception as e:
|
|
print(f"⚠️ Could not auto-open comparison: {e}")
|
|
print(f"💡 Open manually: feh {comparison_path}")
|
|
else:
|
|
# Fallback to separate images
|
|
print(f"🖼️ Unidentified: {unidentified_crop_path}")
|
|
print(f"🖼️ Known ({person_name}): {matched_crop_path}")
|
|
print(f"💡 Compare manually: feh {unidentified_crop_path} {matched_crop_path}")
|
|
|
|
elif unidentified_crop_path:
|
|
print(f"🖼️ Unidentified face only: {unidentified_crop_path}")
|
|
print(f"💡 Open with: feh {unidentified_crop_path}")
|
|
else:
|
|
print("⚠️ Could not extract face crops for comparison")
|
|
|
|
if confirm:
|
|
while True:
|
|
response = input("🤔 Identify as this person? (y/n/s=skip/q=quit): ").strip().lower()
|
|
|
|
if response == 'q':
|
|
# Clean up face crops before quitting
|
|
if unidentified_crop_path and os.path.exists(unidentified_crop_path):
|
|
try: os.remove(unidentified_crop_path)
|
|
except: pass
|
|
if matched_crop_path and os.path.exists(matched_crop_path):
|
|
try: os.remove(matched_crop_path)
|
|
except: pass
|
|
conn.close()
|
|
return identified_count
|
|
elif response == 's':
|
|
break
|
|
elif response == 'y':
|
|
# Assign the face to the person
|
|
cursor.execute(
|
|
'UPDATE faces SET person_id = ? WHERE id = ?',
|
|
(match['person_id'], match['unidentified_id'])
|
|
)
|
|
print(f"✅ Identified as: {person_name}")
|
|
identified_count += 1
|
|
break
|
|
elif response == 'n':
|
|
print("⏭️ Not a match")
|
|
break
|
|
else:
|
|
print("Please enter 'y' (yes), 'n' (no), 's' (skip), or 'q' (quit)")
|
|
|
|
# Clean up face crops after each match
|
|
if unidentified_crop_path and os.path.exists(unidentified_crop_path):
|
|
try: os.remove(unidentified_crop_path)
|
|
except: pass
|
|
if matched_crop_path and os.path.exists(matched_crop_path):
|
|
try: os.remove(matched_crop_path)
|
|
except: pass
|
|
# Clean up comparison image
|
|
comparison_files = [f"/tmp/face_comparison_{person_name}.jpg"]
|
|
for comp_file in comparison_files:
|
|
if os.path.exists(comp_file):
|
|
try: os.remove(comp_file)
|
|
except: pass
|
|
|
|
else:
|
|
# Auto-identify without confirmation if confidence is high
|
|
if match['distance'] <= 0.4: # High confidence threshold
|
|
cursor.execute(
|
|
'UPDATE faces SET person_id = ? WHERE id = ?',
|
|
(match['person_id'], match['unidentified_id'])
|
|
)
|
|
print(f"✅ Auto-identified as: {person_name} (high confidence)")
|
|
identified_count += 1
|
|
else:
|
|
print(f"⚠️ Skipped (low confidence: {(1-match['distance']):.1%})")
|
|
|
|
# Clean up face crops for auto mode too
|
|
if unidentified_crop_path and os.path.exists(unidentified_crop_path):
|
|
try: os.remove(unidentified_crop_path)
|
|
except: pass
|
|
if matched_crop_path and os.path.exists(matched_crop_path):
|
|
try: os.remove(matched_crop_path)
|
|
except: pass
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"\n✅ Auto-identified {identified_count} faces")
|
|
return identified_count
|
|
|
|
|
|
def main():
|
|
"""Main CLI interface"""
|
|
parser = argparse.ArgumentParser(
|
|
description="PunimTag CLI - Simple photo face tagger",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
photo_tagger.py scan /path/to/photos # Scan folder for photos
|
|
photo_tagger.py process --limit 20 # Process 20 photos for faces
|
|
photo_tagger.py identify --batch 10 # Identify 10 faces interactively
|
|
photo_tagger.py auto-match # Auto-identify matching faces
|
|
photo_tagger.py match 15 # Find faces similar to face ID 15
|
|
photo_tagger.py tag --pattern "vacation" # Tag photos matching pattern
|
|
photo_tagger.py search "John" # Find photos with John
|
|
photo_tagger.py stats # Show statistics
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('command',
|
|
choices=['scan', 'process', 'identify', 'tag', 'search', 'stats', 'match', 'auto-match'],
|
|
help='Command to execute')
|
|
|
|
parser.add_argument('target', nargs='?',
|
|
help='Target folder (scan), person name (search), or pattern (tag)')
|
|
|
|
parser.add_argument('--db', default='photos.db',
|
|
help='Database file path (default: photos.db)')
|
|
|
|
parser.add_argument('--limit', type=int, default=50,
|
|
help='Batch size limit for processing (default: 50)')
|
|
|
|
parser.add_argument('--batch', type=int, default=20,
|
|
help='Batch size for identification (default: 20)')
|
|
|
|
parser.add_argument('--pattern',
|
|
help='Pattern for filtering photos when tagging')
|
|
|
|
parser.add_argument('--model', choices=['hog', 'cnn'], default='hog',
|
|
help='Face detection model: hog (faster) or cnn (more accurate)')
|
|
|
|
parser.add_argument('--recursive', action='store_true',
|
|
help='Scan folders recursively')
|
|
|
|
parser.add_argument('--show-faces', action='store_true',
|
|
help='Show individual face crops during identification')
|
|
|
|
parser.add_argument('--tolerance', type=float, default=0.5,
|
|
help='Face matching tolerance (0.0-1.0, lower = stricter, default: 0.5)')
|
|
|
|
parser.add_argument('--auto', action='store_true',
|
|
help='Auto-identify high-confidence matches without confirmation')
|
|
|
|
parser.add_argument('--include-twins', action='store_true',
|
|
help='Include same-photo matching (for twins or multiple instances)')
|
|
|
|
parser.add_argument('-v', '--verbose', action='count', default=0,
|
|
help='Increase verbosity (-v, -vv, -vvv for more detail)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize tagger
|
|
tagger = PhotoTagger(args.db, args.verbose)
|
|
|
|
try:
|
|
if args.command == 'scan':
|
|
if not args.target:
|
|
print("❌ Please specify a folder to scan")
|
|
return 1
|
|
tagger.scan_folder(args.target, args.recursive)
|
|
|
|
elif args.command == 'process':
|
|
tagger.process_faces(args.limit, args.model)
|
|
|
|
elif args.command == 'identify':
|
|
show_faces = getattr(args, 'show_faces', False)
|
|
tagger.identify_faces(args.batch, show_faces)
|
|
|
|
elif args.command == 'tag':
|
|
tagger.add_tags(args.pattern or args.target, args.batch)
|
|
|
|
elif args.command == 'search':
|
|
if not args.target:
|
|
print("❌ Please specify a person name to search for")
|
|
return 1
|
|
tagger.search_faces(args.target)
|
|
|
|
elif args.command == 'stats':
|
|
tagger.stats()
|
|
|
|
elif args.command == 'match':
|
|
if args.target and args.target.isdigit():
|
|
face_id = int(args.target)
|
|
matches = tagger.find_similar_faces(face_id, args.tolerance)
|
|
if matches:
|
|
print(f"\n🎯 Found {len(matches)} similar faces:")
|
|
for match in matches:
|
|
person_name = "Unknown" if match['person_id'] is None else f"Person ID {match['person_id']}"
|
|
print(f" 📸 {match['filename']} - {person_name} (confidence: {(1-match['distance']):.1%})")
|
|
else:
|
|
print("🔍 No similar faces found")
|
|
else:
|
|
print("❌ Please specify a face ID number to find matches for")
|
|
|
|
elif args.command == 'auto-match':
|
|
show_faces = getattr(args, 'show_faces', False)
|
|
include_twins = getattr(args, 'include_twins', False)
|
|
tagger.auto_identify_matches(args.tolerance, not args.auto, show_faces, include_twins)
|
|
|
|
return 0
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Interrupted by user")
|
|
return 1
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|