punimtag/face_processing.py
tanyar09 f410e60e66 Add core functionality for PunimTag with modular architecture
This commit introduces a comprehensive set of modules for the PunimTag application, including configuration management, database operations, face processing, photo management, and tag management. Each module is designed to encapsulate specific functionalities, enhancing maintainability and scalability. The GUI components are also integrated, allowing for a cohesive user experience. This foundational work sets the stage for future enhancements and features, ensuring a robust framework for photo tagging and face recognition tasks.
2025-10-03 12:25:41 -04:00

516 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Face detection, encoding, and matching functionality for PunimTag
"""
import os
import tempfile
import numpy as np
import face_recognition
from PIL import Image, ImageDraw, ImageFont
from typing import List, Dict, Tuple, Optional
from functools import lru_cache
from config import DEFAULT_FACE_DETECTION_MODEL, DEFAULT_FACE_TOLERANCE, MIN_FACE_QUALITY
from database import DatabaseManager
class FaceProcessor:
"""Handles face detection, encoding, and matching operations"""
def __init__(self, db_manager: DatabaseManager, verbose: int = 0):
"""Initialize face processor"""
self.db = db_manager
self.verbose = verbose
self._face_encoding_cache = {}
self._image_cache = {}
@lru_cache(maxsize=1000)
def _get_cached_face_encoding(self, face_id: int, encoding_bytes: bytes) -> np.ndarray:
"""Cache face encodings to avoid repeated numpy conversions"""
return np.frombuffer(encoding_bytes, dtype=np.float64)
def _clear_caches(self):
"""Clear all caches to free memory"""
self._face_encoding_cache.clear()
self._image_cache.clear()
self._get_cached_face_encoding.cache_clear()
def cleanup_face_crops(self, current_face_crop_path=None):
"""Clean up face crop files and caches"""
# Clean up current face crop if provided
if current_face_crop_path and os.path.exists(current_face_crop_path):
try:
os.remove(current_face_crop_path)
except:
pass # Ignore cleanup errors
# Clean up all cached face crop files
for cache_key, cached_path in list(self._image_cache.items()):
if os.path.exists(cached_path):
try:
os.remove(cached_path)
except:
pass # Ignore cleanup errors
# Clear caches
self._clear_caches()
def process_faces(self, limit: int = 50, model: str = DEFAULT_FACE_DETECTION_MODEL) -> int:
"""Process unprocessed photos for faces"""
unprocessed = self.db.get_unprocessed_photos(limit)
if not unprocessed:
print("✅ No unprocessed photos found")
return 0
print(f"🔍 Processing {len(unprocessed)} photos for faces...")
processed_count = 0
for photo_id, photo_path, filename, date_taken in unprocessed:
if not os.path.exists(photo_path):
print(f"❌ File not found: {filename}")
self.db.mark_photo_processed(photo_id)
continue
try:
# Load image and find faces
if self.verbose >= 1:
print(f"📸 Processing: {filename}")
elif self.verbose == 0:
print(".", end="", flush=True)
if self.verbose >= 2:
print(f" 🔍 Loading image: {photo_path}")
image = face_recognition.load_image_file(photo_path)
face_locations = face_recognition.face_locations(image, model=model)
if face_locations:
face_encodings = face_recognition.face_encodings(image, face_locations)
if self.verbose >= 1:
print(f" 👤 Found {len(face_locations)} faces")
# Save faces to database with quality scores
for i, (encoding, location) in enumerate(zip(face_encodings, face_locations)):
# Calculate face quality score
quality_score = self._calculate_face_quality_score(image, location)
self.db.add_face(
photo_id=photo_id,
encoding=encoding.tobytes(),
location=str(location),
quality_score=quality_score
)
if self.verbose >= 3:
print(f" Face {i+1}: {location} (quality: {quality_score:.2f})")
else:
if self.verbose >= 1:
print(f" 👤 No faces found")
elif self.verbose >= 2:
print(f" 👤 {filename}: No faces found")
# Mark as processed
self.db.mark_photo_processed(photo_id)
processed_count += 1
except Exception as e:
print(f"❌ Error processing {filename}: {e}")
self.db.mark_photo_processed(photo_id)
if self.verbose == 0:
print() # New line after dots
print(f"✅ Processed {processed_count} photos")
return processed_count
def _calculate_face_quality_score(self, image: np.ndarray, face_location: tuple) -> float:
"""Calculate face quality score based on multiple factors"""
try:
top, right, bottom, left = face_location
face_height = bottom - top
face_width = right - left
# Basic size check - faces too small get lower scores
min_face_size = 50
size_score = min(1.0, (face_height * face_width) / (min_face_size * min_face_size))
# Extract face region
face_region = image[top:bottom, left:right]
if face_region.size == 0:
return 0.0
# Convert to grayscale for analysis
if len(face_region.shape) == 3:
gray_face = np.mean(face_region, axis=2)
else:
gray_face = face_region
# Calculate sharpness (Laplacian variance)
laplacian_var = np.var(np.array([[0, -1, 0], [-1, 4, -1], [0, -1, 0]]).astype(np.float32))
if laplacian_var > 0:
sharpness = np.var(np.array([[0, -1, 0], [-1, 4, -1], [0, -1, 0]]).astype(np.float32))
else:
sharpness = 0.0
sharpness_score = min(1.0, sharpness / 1000.0) # Normalize sharpness
# Calculate brightness and contrast
mean_brightness = np.mean(gray_face)
brightness_score = 1.0 - abs(mean_brightness - 128) / 128.0 # Prefer middle brightness
contrast = np.std(gray_face)
contrast_score = min(1.0, contrast / 64.0) # Prefer good contrast
# Calculate aspect ratio (faces should be roughly square)
aspect_ratio = face_width / face_height if face_height > 0 else 1.0
aspect_score = 1.0 - abs(aspect_ratio - 1.0) # Prefer square faces
# Calculate position in image (centered faces are better)
image_height, image_width = image.shape[:2]
center_x = (left + right) / 2
center_y = (top + bottom) / 2
position_x_score = 1.0 - abs(center_x - image_width / 2) / (image_width / 2)
position_y_score = 1.0 - abs(center_y - image_height / 2) / (image_height / 2)
position_score = (position_x_score + position_y_score) / 2.0
# Weighted combination of all factors
quality_score = (
size_score * 0.25 +
sharpness_score * 0.25 +
brightness_score * 0.15 +
contrast_score * 0.15 +
aspect_score * 0.10 +
position_score * 0.10
)
return max(0.0, min(1.0, quality_score))
except Exception as e:
if self.verbose >= 2:
print(f"⚠️ Error calculating face quality: {e}")
return 0.5 # Default medium quality on error
def _extract_face_crop(self, photo_path: str, location: tuple, face_id: int) -> str:
"""Extract and save individual face crop for identification with caching"""
try:
# Check cache first
cache_key = f"{photo_path}_{location}_{face_id}"
if cache_key in self._image_cache:
cached_path = self._image_cache[cache_key]
# Verify the cached file still exists
if os.path.exists(cached_path):
return cached_path
else:
# Remove from cache if file doesn't exist
del self._image_cache[cache_key]
# Parse location tuple from string format
if isinstance(location, str):
location = eval(location)
top, right, bottom, left = location
# Load the image
image = Image.open(photo_path)
# Add padding around the face (20% of face size)
face_width = right - left
face_height = bottom - top
padding_x = int(face_width * 0.2)
padding_y = int(face_height * 0.2)
# Calculate crop bounds with padding
crop_left = max(0, left - padding_x)
crop_top = max(0, top - padding_y)
crop_right = min(image.width, right + padding_x)
crop_bottom = min(image.height, bottom + padding_y)
# Crop the face
face_crop = image.crop((crop_left, crop_top, crop_right, crop_bottom))
# Create temporary file for the face crop
temp_dir = tempfile.gettempdir()
face_filename = f"face_{face_id}_crop.jpg"
face_path = os.path.join(temp_dir, face_filename)
# Resize for better viewing (minimum 200px width)
if face_crop.width < 200:
ratio = 200 / face_crop.width
new_width = 200
new_height = int(face_crop.height * ratio)
face_crop = face_crop.resize((new_width, new_height), Image.Resampling.LANCZOS)
face_crop.save(face_path, "JPEG", quality=95)
# Cache the result
self._image_cache[cache_key] = face_path
return face_path
except Exception as e:
if self.verbose >= 1:
print(f"⚠️ Could not extract face crop: {e}")
return None
def _create_comparison_image(self, unid_crop_path: str, match_crop_path: str, person_name: str, confidence: float) -> str:
"""Create a side-by-side comparison image"""
try:
# Load both face crops
unid_img = Image.open(unid_crop_path)
match_img = Image.open(match_crop_path)
# Resize both to same height for better comparison
target_height = 300
unid_ratio = target_height / unid_img.height
match_ratio = target_height / match_img.height
unid_resized = unid_img.resize((int(unid_img.width * unid_ratio), target_height), Image.Resampling.LANCZOS)
match_resized = match_img.resize((int(match_img.width * match_ratio), target_height), Image.Resampling.LANCZOS)
# Create comparison image
total_width = unid_resized.width + match_resized.width + 20 # 20px gap
comparison = Image.new('RGB', (total_width, target_height + 60), 'white')
# Paste images
comparison.paste(unid_resized, (0, 30))
comparison.paste(match_resized, (unid_resized.width + 20, 30))
# Add labels
draw = ImageDraw.Draw(comparison)
try:
# Try to use a font
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 16)
except:
font = ImageFont.load_default()
draw.text((10, 5), "UNKNOWN", fill='red', font=font)
draw.text((unid_resized.width + 30, 5), f"{person_name.upper()}", fill='green', font=font)
draw.text((10, target_height + 35), f"Confidence: {confidence:.1%}", fill='blue', font=font)
# Save comparison image
temp_dir = tempfile.gettempdir()
comparison_path = os.path.join(temp_dir, f"face_comparison_{person_name}.jpg")
comparison.save(comparison_path, "JPEG", quality=95)
return comparison_path
except Exception as e:
if self.verbose >= 1:
print(f"⚠️ Could not create comparison image: {e}")
return None
def _get_confidence_description(self, confidence_pct: float) -> str:
"""Get human-readable confidence description"""
if confidence_pct >= 80:
return "🟢 (Very High - Almost Certain)"
elif confidence_pct >= 70:
return "🟡 (High - Likely Match)"
elif confidence_pct >= 60:
return "🟠 (Medium - Possible Match)"
elif confidence_pct >= 50:
return "🔴 (Low - Questionable)"
else:
return "⚫ (Very Low - Unlikely)"
def _calculate_adaptive_tolerance(self, base_tolerance: float, face_quality: float, match_confidence: float = None) -> float:
"""Calculate adaptive tolerance based on face quality and match confidence"""
# Start with base tolerance
tolerance = base_tolerance
# Adjust based on face quality (higher quality = stricter tolerance)
# More conservative: range 0.9 to 1.1 instead of 0.8 to 1.2
quality_factor = 0.9 + (face_quality * 0.2) # Range: 0.9 to 1.1
tolerance *= quality_factor
# If we have match confidence, adjust further
if match_confidence is not None:
# Higher confidence matches can use stricter tolerance
# More conservative: range 0.95 to 1.05 instead of 0.9 to 1.1
confidence_factor = 0.95 + (match_confidence * 0.1) # Range: 0.95 to 1.05
tolerance *= confidence_factor
# Ensure tolerance stays within reasonable bounds
return max(0.3, min(0.8, tolerance)) # Reduced max from 0.9 to 0.8
def _get_filtered_similar_faces(self, face_id: int, tolerance: float, include_same_photo: bool = False, face_status: dict = None) -> List[Dict]:
"""Get similar faces with consistent filtering and sorting logic used by both auto-match and identify"""
# Find similar faces using the core function
similar_faces_data = self.find_similar_faces(face_id, tolerance=tolerance, include_same_photo=include_same_photo)
# Filter to only show unidentified faces with confidence filtering
filtered_faces = []
for face in similar_faces_data:
# For auto-match: only filter by database state (keep existing behavior)
# For identify: also filter by current session state
is_identified_in_db = face.get('person_id') is not None
is_identified_in_session = face_status and face.get('face_id') in face_status and face_status[face.get('face_id')] == 'identified'
# If face_status is provided (identify mode), use both filters
# If face_status is None (auto-match mode), only use database filter
if face_status is not None:
# Identify mode: filter out both database and session identified faces
if not is_identified_in_db and not is_identified_in_session:
# Calculate confidence percentage
confidence_pct = (1 - face['distance']) * 100
# Only include matches with reasonable confidence (at least 40%)
if confidence_pct >= 40:
filtered_faces.append(face)
else:
# Auto-match mode: only filter by database state (keep existing behavior)
if not is_identified_in_db:
# Calculate confidence percentage
confidence_pct = (1 - face['distance']) * 100
# Only include matches with reasonable confidence (at least 40%)
if confidence_pct >= 40:
filtered_faces.append(face)
# Sort by confidence (distance) - highest confidence first
filtered_faces.sort(key=lambda x: x['distance'])
return filtered_faces
def _filter_unique_faces(self, faces: List[Dict]) -> List[Dict]:
"""Filter faces to show only unique ones, hiding duplicates with high/medium confidence matches"""
if not faces:
return faces
unique_faces = []
seen_face_groups = set() # Track face groups that have been seen
for face in faces:
face_id = face['face_id']
confidence_pct = (1 - face['distance']) * 100
# Only consider high (>=70%) or medium (>=60%) confidence matches for grouping
if confidence_pct >= 60:
# Find all faces that match this one with high/medium confidence
matching_face_ids = set()
for other_face in faces:
other_face_id = other_face['face_id']
other_confidence_pct = (1 - other_face['distance']) * 100
# If this face matches the current face with high/medium confidence
if other_confidence_pct >= 60:
matching_face_ids.add(other_face_id)
# Create a sorted tuple to represent this group of matching faces
face_group = tuple(sorted(matching_face_ids))
# Only show this face if we haven't seen this group before
if face_group not in seen_face_groups:
seen_face_groups.add(face_group)
unique_faces.append(face)
else:
# For low confidence matches, always show them (they're likely different people)
unique_faces.append(face)
return unique_faces
def find_similar_faces(self, face_id: int = None, tolerance: float = DEFAULT_FACE_TOLERANCE, include_same_photo: bool = False) -> List[Dict]:
"""Find similar faces across all photos with improved multi-encoding and quality scoring"""
if face_id:
# Find faces similar to a specific face
target_face = self.db.get_face_encodings(face_id)
if not target_face:
print(f"❌ Face ID {face_id} not found")
return []
target_encoding = self._get_cached_face_encoding(face_id, target_face)
# Get all other faces with quality scores
all_faces = self.db.get_all_face_encodings()
matches = []
# Compare target face with all other faces using adaptive tolerance
for face_data in all_faces:
other_id, other_encoding, other_person_id, other_quality = face_data
if other_id == face_id:
continue
other_enc = self._get_cached_face_encoding(other_id, other_encoding)
# Calculate adaptive tolerance based on both face qualities
target_quality = 0.5 # Default quality for target face
avg_quality = (target_quality + other_quality) / 2
adaptive_tolerance = self._calculate_adaptive_tolerance(tolerance, avg_quality)
distance = face_recognition.face_distance([target_encoding], other_enc)[0]
if distance <= adaptive_tolerance:
# Get photo info for this face
photo_info = self.db.get_photos_by_pattern() # This needs to be implemented properly
matches.append({
'face_id': other_id,
'person_id': other_person_id,
'distance': distance,
'quality_score': other_quality,
'adaptive_tolerance': adaptive_tolerance
})
return matches
else:
# Find all unidentified faces and try to match them with identified ones
all_faces = self.db.get_all_face_encodings()
matches = []
# Auto-match unidentified faces with identified ones using multi-encoding
identified_faces = [f for f in all_faces if f[2] is not None] # person_id is not None
unidentified_faces = [f for f in all_faces if f[2] is None] # person_id is None
print(f"\n🔍 Auto-matching {len(unidentified_faces)} unidentified faces with {len(identified_faces)} known faces...")
# Group identified faces by person
person_encodings = {}
for id_face in identified_faces:
person_id = id_face[2]
if person_id not in person_encodings:
id_enc = self._get_cached_face_encoding(id_face[0], id_face[1])
person_encodings[person_id] = [(id_enc, id_face[3])]
for unid_face in unidentified_faces:
unid_id, unid_encoding, _, unid_quality = unid_face
unid_enc = self._get_cached_face_encoding(unid_id, unid_encoding)
best_match = None
best_distance = float('inf')
best_person_id = None
# Compare with all person encodings
for person_id, encodings in person_encodings.items():
for person_enc, person_quality in encodings:
# Calculate adaptive tolerance based on both face qualities
avg_quality = (unid_quality + person_quality) / 2
adaptive_tolerance = self._calculate_adaptive_tolerance(tolerance, avg_quality)
distance = face_recognition.face_distance([unid_enc], person_enc)[0]
if distance <= adaptive_tolerance and distance < best_distance:
best_distance = distance
best_person_id = person_id
best_match = {
'unidentified_id': unid_id,
'person_id': person_id,
'distance': distance,
'quality_score': unid_quality,
'adaptive_tolerance': adaptive_tolerance
}
if best_match:
matches.append(best_match)
return matches
def add_person_encoding(self, person_id: int, face_id: int, encoding: np.ndarray, quality_score: float):
"""Add a face encoding to a person's encoding collection"""
self.db.add_person_encoding(person_id, face_id, encoding.tobytes(), quality_score)
def get_person_encodings(self, person_id: int, min_quality: float = MIN_FACE_QUALITY) -> List[Tuple[np.ndarray, float]]:
"""Get all high-quality encodings for a person"""
results = self.db.get_person_encodings(person_id, min_quality)
return [(np.frombuffer(encoding, dtype=np.float64), quality_score) for encoding, quality_score in results]
def update_person_encodings(self, person_id: int):
"""Update person encodings when a face is identified"""
self.db.update_person_encodings(person_id)