feat: Implement face pose detection using RetinaFace for enhanced face processing
This commit introduces a comprehensive face pose detection system utilizing the RetinaFace library to automatically classify face poses (yaw, pitch, roll) during image processing. The database schema has been updated to store pose information, including pose mode and angles. The face processing pipeline has been modified to integrate pose detection with graceful fallback mechanisms, ensuring compatibility with existing functionality. Additionally, new utility functions for pose detection have been added, along with unit tests to validate the implementation. Documentation has been updated to reflect these changes, enhancing the overall user experience and accuracy in face matching.
This commit is contained in:
parent
7945b084a4
commit
0e69677d54
1480
docs/PORTRAIT_DETECTION_PLAN.md
Normal file
1480
docs/PORTRAIT_DETECTION_PLAN.md
Normal file
File diff suppressed because it is too large
Load Diff
@ -74,7 +74,7 @@ class DatabaseManager:
|
||||
)
|
||||
''')
|
||||
|
||||
# Faces table (updated for DeepFace)
|
||||
# Faces table (updated for DeepFace and pose detection)
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS faces (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@ -89,11 +89,36 @@ class DatabaseManager:
|
||||
model_name TEXT DEFAULT 'ArcFace',
|
||||
face_confidence REAL DEFAULT 0.0,
|
||||
exif_orientation INTEGER DEFAULT NULL,
|
||||
pose_mode TEXT DEFAULT 'frontal',
|
||||
yaw_angle REAL DEFAULT NULL,
|
||||
pitch_angle REAL DEFAULT NULL,
|
||||
roll_angle REAL DEFAULT NULL,
|
||||
FOREIGN KEY (photo_id) REFERENCES photos (id),
|
||||
FOREIGN KEY (person_id) REFERENCES people (id)
|
||||
)
|
||||
''')
|
||||
|
||||
# Add pose fields if they don't exist (for existing databases)
|
||||
try:
|
||||
cursor.execute('ALTER TABLE faces ADD COLUMN pose_mode TEXT DEFAULT "frontal"')
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
|
||||
try:
|
||||
cursor.execute('ALTER TABLE faces ADD COLUMN yaw_angle REAL DEFAULT NULL')
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
|
||||
try:
|
||||
cursor.execute('ALTER TABLE faces ADD COLUMN pitch_angle REAL DEFAULT NULL')
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
|
||||
try:
|
||||
cursor.execute('ALTER TABLE faces ADD COLUMN roll_angle REAL DEFAULT NULL')
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
|
||||
# Person encodings table for multiple encodings per person (updated for DeepFace)
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS person_encodings (
|
||||
@ -143,6 +168,7 @@ class DatabaseManager:
|
||||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_person_encodings_quality ON person_encodings(quality_score)')
|
||||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_taken ON photos(date_taken)')
|
||||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_photos_date_added ON photos(date_added)')
|
||||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_faces_pose_mode ON faces(pose_mode)')
|
||||
|
||||
|
||||
|
||||
@ -233,7 +259,11 @@ class DatabaseManager:
|
||||
detector_backend: str = 'retinaface',
|
||||
model_name: str = 'ArcFace',
|
||||
face_confidence: float = 0.0,
|
||||
exif_orientation: Optional[int] = None) -> int:
|
||||
exif_orientation: Optional[int] = None,
|
||||
pose_mode: str = 'frontal',
|
||||
yaw_angle: Optional[float] = None,
|
||||
pitch_angle: Optional[float] = None,
|
||||
roll_angle: Optional[float] = None) -> int:
|
||||
"""Add a face to the database and return its ID
|
||||
|
||||
Args:
|
||||
@ -247,6 +277,10 @@ class DatabaseManager:
|
||||
model_name: DeepFace model used (ArcFace, Facenet, etc.)
|
||||
face_confidence: Confidence from DeepFace detector
|
||||
exif_orientation: EXIF orientation value (1-8) for coordinate transformation
|
||||
pose_mode: Pose mode classification (e.g., 'frontal', 'profile_left', 'looking_up')
|
||||
yaw_angle: Yaw angle in degrees (left/right rotation)
|
||||
pitch_angle: Pitch angle in degrees (up/down tilt)
|
||||
roll_angle: Roll angle in degrees (rotation around face axis)
|
||||
|
||||
Returns:
|
||||
Face ID
|
||||
@ -255,10 +289,12 @@ class DatabaseManager:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
INSERT INTO faces (photo_id, person_id, encoding, location, confidence,
|
||||
quality_score, detector_backend, model_name, face_confidence, exif_orientation)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
quality_score, detector_backend, model_name, face_confidence,
|
||||
exif_orientation, pose_mode, yaw_angle, pitch_angle, roll_angle)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
''', (photo_id, person_id, encoding, location, confidence, quality_score,
|
||||
detector_backend, model_name, face_confidence, exif_orientation))
|
||||
detector_backend, model_name, face_confidence, exif_orientation,
|
||||
pose_mode, yaw_angle, pitch_angle, roll_angle))
|
||||
return cursor.lastrowid
|
||||
|
||||
def update_face_person(self, face_id: int, person_id: Optional[int]):
|
||||
|
||||
@ -35,6 +35,7 @@ from src.core.config import (
|
||||
)
|
||||
from src.core.database import DatabaseManager
|
||||
from src.utils.exif_utils import EXIFOrientationHandler
|
||||
from src.utils.pose_detection import PoseDetector, RETINAFACE_AVAILABLE
|
||||
|
||||
|
||||
class FaceProcessor:
|
||||
@ -59,6 +60,21 @@ class FaceProcessor:
|
||||
self._face_encoding_cache = {}
|
||||
self._image_cache = {}
|
||||
|
||||
# Initialize pose detector with graceful fallback
|
||||
self.pose_detector = None
|
||||
if RETINAFACE_AVAILABLE:
|
||||
try:
|
||||
self.pose_detector = PoseDetector()
|
||||
if self.verbose >= 2:
|
||||
print(f" Pose detection: enabled")
|
||||
except Exception as e:
|
||||
if self.verbose >= 1:
|
||||
print(f"⚠️ Pose detection not available: {e}")
|
||||
self.pose_detector = None
|
||||
else:
|
||||
if self.verbose >= 2:
|
||||
print(f" Pose detection: RetinaFace not available")
|
||||
|
||||
if self.verbose >= 2:
|
||||
print(f"🔧 FaceProcessor initialized:")
|
||||
print(f" Detector: {self.detector_backend}")
|
||||
@ -176,7 +192,19 @@ class FaceProcessor:
|
||||
# Use original image if no correction needed
|
||||
face_detection_path = photo_path
|
||||
|
||||
# Use DeepFace.represent() to get face detection and encodings
|
||||
# Step 1: Use RetinaFace directly for detection + landmarks (with graceful fallback)
|
||||
pose_faces = []
|
||||
if self.pose_detector:
|
||||
try:
|
||||
pose_faces = self.pose_detector.detect_pose_faces(face_detection_path)
|
||||
if self.verbose >= 2 and pose_faces:
|
||||
print(f" 📐 Pose detection: found {len(pose_faces)} faces with pose data")
|
||||
except Exception as e:
|
||||
if self.verbose >= 1:
|
||||
print(f"⚠️ Pose detection failed for {filename}: {e}, using defaults")
|
||||
pose_faces = []
|
||||
|
||||
# Step 2: Use DeepFace for encoding generation
|
||||
deepface_start_time = time.time()
|
||||
results = DeepFace.represent(
|
||||
img_path=face_detection_path,
|
||||
@ -256,7 +284,14 @@ class FaceProcessor:
|
||||
image_np = np.array(image)
|
||||
quality_score = self._calculate_face_quality_score(image_np, face_location_dict)
|
||||
|
||||
# Store in database with DeepFace format and EXIF orientation
|
||||
# Step 3: Match RetinaFace results with DeepFace results
|
||||
pose_info = self._find_matching_pose_info(facial_area, pose_faces)
|
||||
pose_mode = pose_info.get('pose_mode', 'frontal')
|
||||
yaw_angle = pose_info.get('yaw_angle')
|
||||
pitch_angle = pose_info.get('pitch_angle')
|
||||
roll_angle = pose_info.get('roll_angle')
|
||||
|
||||
# Store in database with DeepFace format, EXIF orientation, and pose data
|
||||
self.db.add_face(
|
||||
photo_id=photo_id,
|
||||
encoding=embedding.tobytes(),
|
||||
@ -267,7 +302,11 @@ class FaceProcessor:
|
||||
detector_backend=self.detector_backend,
|
||||
model_name=self.model_name,
|
||||
face_confidence=face_confidence,
|
||||
exif_orientation=exif_orientation
|
||||
exif_orientation=exif_orientation,
|
||||
pose_mode=pose_mode,
|
||||
yaw_angle=yaw_angle,
|
||||
pitch_angle=pitch_angle,
|
||||
roll_angle=roll_angle
|
||||
)
|
||||
|
||||
if self.verbose >= 3:
|
||||
@ -521,6 +560,78 @@ class FaceProcessor:
|
||||
print(f"⚠️ Error calculating face quality: {e}")
|
||||
return 0.5 # Default medium quality on error
|
||||
|
||||
def _find_matching_pose_info(self, facial_area: Dict,
|
||||
pose_faces: List[Dict]) -> Dict:
|
||||
"""Match DeepFace result with RetinaFace pose detection result
|
||||
|
||||
Args:
|
||||
facial_area: DeepFace facial_area {'x': x, 'y': y, 'w': w, 'h': h}
|
||||
pose_faces: List of RetinaFace detection results with pose info
|
||||
|
||||
Returns:
|
||||
Dictionary with pose information, or defaults
|
||||
"""
|
||||
# Match by bounding box overlap
|
||||
# Simple approach: find closest match by center point
|
||||
if not pose_faces:
|
||||
return {
|
||||
'pose_mode': 'frontal',
|
||||
'yaw_angle': None,
|
||||
'pitch_angle': None,
|
||||
'roll_angle': None
|
||||
}
|
||||
|
||||
deepface_center_x = facial_area.get('x', 0) + facial_area.get('w', 0) / 2
|
||||
deepface_center_y = facial_area.get('y', 0) + facial_area.get('h', 0) / 2
|
||||
|
||||
best_match = None
|
||||
min_distance = float('inf')
|
||||
|
||||
for pose_face in pose_faces:
|
||||
pose_area = pose_face.get('facial_area', {})
|
||||
|
||||
# Handle both dict and list formats (for robustness)
|
||||
if isinstance(pose_area, list) and len(pose_area) >= 4:
|
||||
# Convert list [x, y, w, h] to dict format
|
||||
pose_area = {
|
||||
'x': pose_area[0],
|
||||
'y': pose_area[1],
|
||||
'w': pose_area[2],
|
||||
'h': pose_area[3]
|
||||
}
|
||||
elif not isinstance(pose_area, dict):
|
||||
# Skip if not dict or list
|
||||
continue
|
||||
|
||||
pose_center_x = (pose_area.get('x', 0) +
|
||||
pose_area.get('w', 0) / 2)
|
||||
pose_center_y = (pose_area.get('y', 0) +
|
||||
pose_area.get('h', 0) / 2)
|
||||
|
||||
# Calculate distance between centers
|
||||
distance = ((deepface_center_x - pose_center_x) ** 2 +
|
||||
(deepface_center_y - pose_center_y) ** 2) ** 0.5
|
||||
|
||||
if distance < min_distance:
|
||||
min_distance = distance
|
||||
best_match = pose_face
|
||||
|
||||
# If match is close enough (within 50 pixels), use it
|
||||
if best_match and min_distance < 50:
|
||||
return {
|
||||
'pose_mode': best_match.get('pose_mode', 'frontal'),
|
||||
'yaw_angle': best_match.get('yaw_angle'),
|
||||
'pitch_angle': best_match.get('pitch_angle'),
|
||||
'roll_angle': best_match.get('roll_angle')
|
||||
}
|
||||
|
||||
return {
|
||||
'pose_mode': 'frontal',
|
||||
'yaw_angle': None,
|
||||
'pitch_angle': None,
|
||||
'roll_angle': None
|
||||
}
|
||||
|
||||
def _extract_face_crop(self, photo_path: str, location: dict, face_id: int) -> str:
|
||||
"""Extract and save individual face crop for identification with EXIF orientation correction"""
|
||||
try:
|
||||
|
||||
349
src/utils/pose_detection.py
Normal file
349
src/utils/pose_detection.py
Normal file
@ -0,0 +1,349 @@
|
||||
"""Face pose detection (yaw, pitch, roll) using RetinaFace landmarks"""
|
||||
|
||||
import numpy as np
|
||||
from math import atan2, degrees
|
||||
from typing import Dict, Tuple, Optional, List
|
||||
|
||||
try:
|
||||
from retinaface import RetinaFace
|
||||
RETINAFACE_AVAILABLE = True
|
||||
except ImportError:
|
||||
RETINAFACE_AVAILABLE = False
|
||||
RetinaFace = None
|
||||
|
||||
|
||||
class PoseDetector:
|
||||
"""Detect face pose (yaw, pitch, roll) using RetinaFace landmarks"""
|
||||
|
||||
# Thresholds for pose detection (in degrees)
|
||||
PROFILE_YAW_THRESHOLD = 30.0 # Faces with |yaw| >= 30° are considered profile
|
||||
EXTREME_YAW_THRESHOLD = 60.0 # Faces with |yaw| >= 60° are extreme profile
|
||||
|
||||
PITCH_THRESHOLD = 20.0 # Faces with |pitch| >= 20° are looking up/down
|
||||
EXTREME_PITCH_THRESHOLD = 45.0 # Faces with |pitch| >= 45° are extreme
|
||||
|
||||
ROLL_THRESHOLD = 15.0 # Faces with |roll| >= 15° are tilted
|
||||
EXTREME_ROLL_THRESHOLD = 45.0 # Faces with |roll| >= 45° are extreme
|
||||
|
||||
def __init__(self,
|
||||
yaw_threshold: float = None,
|
||||
pitch_threshold: float = None,
|
||||
roll_threshold: float = None):
|
||||
"""Initialize pose detector
|
||||
|
||||
Args:
|
||||
yaw_threshold: Yaw angle threshold for profile detection (degrees)
|
||||
Default: 30.0
|
||||
pitch_threshold: Pitch angle threshold for up/down detection (degrees)
|
||||
Default: 20.0
|
||||
roll_threshold: Roll angle threshold for tilt detection (degrees)
|
||||
Default: 15.0
|
||||
"""
|
||||
if not RETINAFACE_AVAILABLE:
|
||||
raise RuntimeError("RetinaFace not available")
|
||||
|
||||
self.yaw_threshold = yaw_threshold or self.PROFILE_YAW_THRESHOLD
|
||||
self.pitch_threshold = pitch_threshold or self.PITCH_THRESHOLD
|
||||
self.roll_threshold = roll_threshold or self.ROLL_THRESHOLD
|
||||
|
||||
@staticmethod
|
||||
def detect_faces_with_landmarks(img_path: str) -> Dict:
|
||||
"""Detect faces using RetinaFace directly
|
||||
|
||||
Returns:
|
||||
Dictionary with face keys and landmark data:
|
||||
{
|
||||
'face_1': {
|
||||
'facial_area': {'x': x, 'y': y, 'w': w, 'h': h},
|
||||
'landmarks': {
|
||||
'left_eye': (x, y),
|
||||
'right_eye': (x, y),
|
||||
'nose': (x, y),
|
||||
'left_mouth': (x, y),
|
||||
'right_mouth': (x, y)
|
||||
},
|
||||
'confidence': 0.95
|
||||
}
|
||||
}
|
||||
"""
|
||||
if not RETINAFACE_AVAILABLE:
|
||||
return {}
|
||||
|
||||
faces = RetinaFace.detect_faces(img_path)
|
||||
return faces
|
||||
|
||||
@staticmethod
|
||||
def calculate_yaw_from_landmarks(landmarks: Dict) -> Optional[float]:
|
||||
"""Calculate yaw angle from facial landmarks
|
||||
|
||||
Args:
|
||||
landmarks: Dictionary with landmark positions:
|
||||
{
|
||||
'left_eye': (x, y),
|
||||
'right_eye': (x, y),
|
||||
'nose': (x, y),
|
||||
'left_mouth': (x, y),
|
||||
'right_mouth': (x, y)
|
||||
}
|
||||
|
||||
Returns:
|
||||
Yaw angle in degrees (-90 to +90):
|
||||
- Negative: face turned left (right profile)
|
||||
- Positive: face turned right (left profile)
|
||||
- Zero: frontal face
|
||||
- None: if landmarks invalid
|
||||
"""
|
||||
if not landmarks:
|
||||
return None
|
||||
|
||||
left_eye = landmarks.get('left_eye')
|
||||
right_eye = landmarks.get('right_eye')
|
||||
nose = landmarks.get('nose')
|
||||
|
||||
if not all([left_eye, right_eye, nose]):
|
||||
return None
|
||||
|
||||
# Calculate eye midpoint
|
||||
eye_mid_x = (left_eye[0] + right_eye[0]) / 2
|
||||
eye_mid_y = (left_eye[1] + right_eye[1]) / 2
|
||||
|
||||
# Calculate horizontal distance from nose to eye midpoint
|
||||
nose_x = nose[0]
|
||||
eye_midpoint_x = eye_mid_x
|
||||
|
||||
# Calculate face width (eye distance)
|
||||
face_width = abs(right_eye[0] - left_eye[0])
|
||||
|
||||
if face_width == 0:
|
||||
return None
|
||||
|
||||
# Calculate horizontal offset
|
||||
horizontal_offset = nose_x - eye_midpoint_x
|
||||
|
||||
# Calculate yaw angle using atan2
|
||||
# Normalize by face width to get angle
|
||||
yaw_radians = atan2(horizontal_offset, face_width)
|
||||
yaw_degrees = degrees(yaw_radians)
|
||||
|
||||
return yaw_degrees
|
||||
|
||||
@staticmethod
|
||||
def calculate_pitch_from_landmarks(landmarks: Dict) -> Optional[float]:
|
||||
"""Calculate pitch angle from facial landmarks (up/down tilt)
|
||||
|
||||
Args:
|
||||
landmarks: Dictionary with landmark positions
|
||||
|
||||
Returns:
|
||||
Pitch angle in degrees (-90 to +90):
|
||||
- Positive: looking up
|
||||
- Negative: looking down
|
||||
- None: if landmarks invalid
|
||||
"""
|
||||
if not landmarks:
|
||||
return None
|
||||
|
||||
left_eye = landmarks.get('left_eye')
|
||||
right_eye = landmarks.get('right_eye')
|
||||
left_mouth = landmarks.get('left_mouth')
|
||||
right_mouth = landmarks.get('right_mouth')
|
||||
nose = landmarks.get('nose')
|
||||
|
||||
if not all([left_eye, right_eye, left_mouth, right_mouth, nose]):
|
||||
return None
|
||||
|
||||
# Eye midpoint
|
||||
eye_mid_y = (left_eye[1] + right_eye[1]) / 2
|
||||
# Mouth midpoint
|
||||
mouth_mid_y = (left_mouth[1] + right_mouth[1]) / 2
|
||||
# Nose vertical position
|
||||
nose_y = nose[1]
|
||||
|
||||
# Expected nose position (typically 60% down from eyes to mouth)
|
||||
expected_nose_y = eye_mid_y + (mouth_mid_y - eye_mid_y) * 0.6
|
||||
face_height = abs(mouth_mid_y - eye_mid_y)
|
||||
|
||||
if face_height == 0:
|
||||
return None
|
||||
|
||||
# Vertical offset from expected position
|
||||
vertical_offset = nose_y - expected_nose_y
|
||||
|
||||
# Calculate pitch angle
|
||||
pitch_radians = atan2(vertical_offset, face_height)
|
||||
pitch_degrees = degrees(pitch_radians)
|
||||
|
||||
return pitch_degrees
|
||||
|
||||
@staticmethod
|
||||
def calculate_roll_from_landmarks(landmarks: Dict) -> Optional[float]:
|
||||
"""Calculate roll angle from facial landmarks (rotation around face axis)
|
||||
|
||||
Args:
|
||||
landmarks: Dictionary with landmark positions
|
||||
|
||||
Returns:
|
||||
Roll angle in degrees (-90 to +90):
|
||||
- Positive: tilted right (clockwise)
|
||||
- Negative: tilted left (counterclockwise)
|
||||
- None: if landmarks invalid
|
||||
"""
|
||||
if not landmarks:
|
||||
return None
|
||||
|
||||
left_eye = landmarks.get('left_eye')
|
||||
right_eye = landmarks.get('right_eye')
|
||||
|
||||
if not all([left_eye, right_eye]):
|
||||
return None
|
||||
|
||||
# Calculate angle of eye line
|
||||
dx = right_eye[0] - left_eye[0]
|
||||
dy = right_eye[1] - left_eye[1]
|
||||
|
||||
if dx == 0:
|
||||
return 90.0 if dy > 0 else -90.0 # Vertical line
|
||||
|
||||
# Roll angle
|
||||
roll_radians = atan2(dy, dx)
|
||||
roll_degrees = degrees(roll_radians)
|
||||
|
||||
return roll_degrees
|
||||
|
||||
@staticmethod
|
||||
def classify_pose_mode(yaw: Optional[float],
|
||||
pitch: Optional[float],
|
||||
roll: Optional[float]) -> str:
|
||||
"""Classify face pose mode from all three angles
|
||||
|
||||
Args:
|
||||
yaw: Yaw angle in degrees
|
||||
pitch: Pitch angle in degrees
|
||||
roll: Roll angle in degrees
|
||||
|
||||
Returns:
|
||||
Pose mode classification string:
|
||||
- 'frontal': frontal, level, upright
|
||||
- 'profile_left', 'profile_right': profile views
|
||||
- 'looking_up', 'looking_down': pitch variations
|
||||
- 'tilted_left', 'tilted_right': roll variations
|
||||
- Combined modes: e.g., 'profile_left_looking_up'
|
||||
"""
|
||||
# Default to frontal if angles unknown
|
||||
if yaw is None:
|
||||
yaw = 0.0
|
||||
if pitch is None:
|
||||
pitch = 0.0
|
||||
if roll is None:
|
||||
roll = 0.0
|
||||
|
||||
# Yaw classification
|
||||
abs_yaw = abs(yaw)
|
||||
if abs_yaw < 30.0:
|
||||
yaw_mode = "frontal"
|
||||
elif yaw < -30.0:
|
||||
yaw_mode = "profile_right"
|
||||
elif yaw > 30.0:
|
||||
yaw_mode = "profile_left"
|
||||
else:
|
||||
yaw_mode = "slight_yaw"
|
||||
|
||||
# Pitch classification
|
||||
abs_pitch = abs(pitch)
|
||||
if abs_pitch < 20.0:
|
||||
pitch_mode = "level"
|
||||
elif pitch > 20.0:
|
||||
pitch_mode = "looking_up"
|
||||
elif pitch < -20.0:
|
||||
pitch_mode = "looking_down"
|
||||
else:
|
||||
pitch_mode = "slight_pitch"
|
||||
|
||||
# Roll classification
|
||||
abs_roll = abs(roll)
|
||||
if abs_roll < 15.0:
|
||||
roll_mode = "upright"
|
||||
elif roll > 15.0:
|
||||
roll_mode = "tilted_right"
|
||||
elif roll < -15.0:
|
||||
roll_mode = "tilted_left"
|
||||
else:
|
||||
roll_mode = "slight_roll"
|
||||
|
||||
# Combine modes - simple case first
|
||||
if yaw_mode == "frontal" and pitch_mode == "level" and roll_mode == "upright":
|
||||
return "frontal"
|
||||
|
||||
# Build combined mode string
|
||||
modes = []
|
||||
if yaw_mode != "frontal":
|
||||
modes.append(yaw_mode)
|
||||
if pitch_mode != "level":
|
||||
modes.append(pitch_mode)
|
||||
if roll_mode != "upright":
|
||||
modes.append(roll_mode)
|
||||
|
||||
return "_".join(modes) if modes else "frontal"
|
||||
|
||||
def detect_pose_faces(self, img_path: str) -> List[Dict]:
|
||||
"""Detect all faces and classify pose status (all angles)
|
||||
|
||||
Args:
|
||||
img_path: Path to image file
|
||||
|
||||
Returns:
|
||||
List of face dictionaries with pose information:
|
||||
[{
|
||||
'facial_area': {'x': x, 'y': y, 'w': w, 'h': h},
|
||||
'landmarks': {...},
|
||||
'confidence': 0.95,
|
||||
'yaw_angle': -45.2,
|
||||
'pitch_angle': 10.5,
|
||||
'roll_angle': -5.2,
|
||||
'pose_mode': 'profile_right_level_upright'
|
||||
}, ...]
|
||||
"""
|
||||
faces = self.detect_faces_with_landmarks(img_path)
|
||||
|
||||
results = []
|
||||
for face_key, face_data in faces.items():
|
||||
landmarks = face_data.get('landmarks', {})
|
||||
|
||||
# Calculate all three angles
|
||||
yaw_angle = self.calculate_yaw_from_landmarks(landmarks)
|
||||
pitch_angle = self.calculate_pitch_from_landmarks(landmarks)
|
||||
roll_angle = self.calculate_roll_from_landmarks(landmarks)
|
||||
|
||||
# Classify pose mode
|
||||
pose_mode = self.classify_pose_mode(yaw_angle, pitch_angle, roll_angle)
|
||||
|
||||
# Normalize facial_area format (RetinaFace returns list [x, y, w, h] or dict)
|
||||
facial_area_raw = face_data.get('facial_area', {})
|
||||
if isinstance(facial_area_raw, list) and len(facial_area_raw) >= 4:
|
||||
# Convert list [x, y, w, h] to dict format
|
||||
facial_area = {
|
||||
'x': facial_area_raw[0],
|
||||
'y': facial_area_raw[1],
|
||||
'w': facial_area_raw[2],
|
||||
'h': facial_area_raw[3]
|
||||
}
|
||||
elif isinstance(facial_area_raw, dict):
|
||||
# Already in dict format
|
||||
facial_area = facial_area_raw
|
||||
else:
|
||||
# Default to empty dict
|
||||
facial_area = {}
|
||||
|
||||
result = {
|
||||
'facial_area': facial_area,
|
||||
'landmarks': landmarks,
|
||||
'confidence': face_data.get('confidence', 0.0),
|
||||
'yaw_angle': yaw_angle,
|
||||
'pitch_angle': pitch_angle,
|
||||
'roll_angle': roll_angle,
|
||||
'pose_mode': pose_mode
|
||||
}
|
||||
results.append(result)
|
||||
|
||||
return results
|
||||
|
||||
@ -21,6 +21,8 @@ from src.web.api.version import router as version_router
|
||||
from src.web.settings import APP_TITLE, APP_VERSION
|
||||
from src.web.db.base import Base, engine
|
||||
from src.web.db.session import database_url
|
||||
# Import models to ensure they're registered with Base.metadata
|
||||
from src.web.db import models # noqa: F401
|
||||
|
||||
# Global worker process (will be set in lifespan)
|
||||
_worker_process: subprocess.Popen | None = None
|
||||
|
||||
@ -94,6 +94,10 @@ class Face(Base):
|
||||
model_name = Column(Text, default="ArcFace", nullable=False)
|
||||
face_confidence = Column(Numeric, default=0.0, nullable=False)
|
||||
exif_orientation = Column(Integer, nullable=True)
|
||||
pose_mode = Column(Text, default="frontal", nullable=False, index=True)
|
||||
yaw_angle = Column(Numeric, nullable=True)
|
||||
pitch_angle = Column(Numeric, nullable=True)
|
||||
roll_angle = Column(Numeric, nullable=True)
|
||||
|
||||
photo = relationship("Photo", back_populates="faces")
|
||||
person = relationship("Person", back_populates="faces")
|
||||
@ -105,6 +109,7 @@ class Face(Base):
|
||||
Index("idx_faces_person_id", "person_id"),
|
||||
Index("idx_faces_photo_id", "photo_id"),
|
||||
Index("idx_faces_quality", "quality_score"),
|
||||
Index("idx_faces_pose_mode", "pose_mode"),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -28,6 +28,7 @@ from src.core.config import (
|
||||
MAX_FACE_SIZE,
|
||||
)
|
||||
from src.utils.exif_utils import EXIFOrientationHandler
|
||||
from src.utils.pose_detection import PoseDetector, RETINAFACE_AVAILABLE
|
||||
from src.web.db.models import Face, Photo, Person
|
||||
|
||||
|
||||
@ -326,8 +327,21 @@ def process_photo_faces(
|
||||
else:
|
||||
face_detection_path = photo_path
|
||||
|
||||
# Step 1: Use RetinaFace directly for detection + landmarks (with graceful fallback)
|
||||
pose_faces = []
|
||||
pose_detector = None
|
||||
if RETINAFACE_AVAILABLE:
|
||||
try:
|
||||
pose_detector = PoseDetector()
|
||||
pose_faces = pose_detector.detect_pose_faces(face_detection_path)
|
||||
if pose_faces:
|
||||
print(f"[FaceService] Pose detection: found {len(pose_faces)} faces with pose data")
|
||||
except Exception as e:
|
||||
print(f"[FaceService] ⚠️ Pose detection failed for {photo.filename}: {e}, using defaults")
|
||||
pose_faces = []
|
||||
|
||||
try:
|
||||
# Use DeepFace to detect faces and compute embeddings
|
||||
# Step 2: Use DeepFace for encoding generation
|
||||
# Note: First call may take time to download/initialize models
|
||||
print(f"[DeepFace] Processing {photo.filename} with {detector_backend}/{model_name}...")
|
||||
results = DeepFace.represent(
|
||||
@ -437,6 +451,13 @@ def process_photo_faces(
|
||||
# Convert from 0-100 to 0.0-1.0 for database (desktop stores REAL)
|
||||
quality_score = quality_score_int / 100.0
|
||||
|
||||
# Step 3: Match RetinaFace results with DeepFace results
|
||||
pose_info = _find_matching_pose_info(facial_area, pose_faces)
|
||||
pose_mode = pose_info.get('pose_mode', 'frontal')
|
||||
yaw_angle = pose_info.get('yaw_angle')
|
||||
pitch_angle = pose_info.get('pitch_angle')
|
||||
roll_angle = pose_info.get('roll_angle')
|
||||
|
||||
# Store face in database - match desktop schema exactly
|
||||
# Desktop: confidence REAL DEFAULT 0.0 (legacy), face_confidence REAL (actual)
|
||||
# Desktop: quality_score REAL DEFAULT 0.0 (0.0-1.0 range)
|
||||
@ -452,6 +473,10 @@ def process_photo_faces(
|
||||
model_name=model_name,
|
||||
face_confidence=face_confidence, # REAL in 0.0-1.0 range
|
||||
exif_orientation=exif_orientation,
|
||||
pose_mode=pose_mode,
|
||||
yaw_angle=yaw_angle,
|
||||
pitch_angle=pitch_angle,
|
||||
roll_angle=roll_angle,
|
||||
)
|
||||
|
||||
db.add(face)
|
||||
@ -486,6 +511,78 @@ def process_photo_faces(
|
||||
raise Exception(f"Error processing faces in {photo.filename}: {str(e)}")
|
||||
|
||||
|
||||
def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
"""Match DeepFace result with RetinaFace pose detection result
|
||||
|
||||
Args:
|
||||
facial_area: DeepFace facial_area {'x': x, 'y': y, 'w': w, 'h': h}
|
||||
pose_faces: List of RetinaFace detection results with pose info
|
||||
|
||||
Returns:
|
||||
Dictionary with pose information, or defaults
|
||||
"""
|
||||
# Match by bounding box overlap
|
||||
# Simple approach: find closest match by center point
|
||||
if not pose_faces:
|
||||
return {
|
||||
'pose_mode': 'frontal',
|
||||
'yaw_angle': None,
|
||||
'pitch_angle': None,
|
||||
'roll_angle': None
|
||||
}
|
||||
|
||||
deepface_center_x = facial_area.get('x', 0) + facial_area.get('w', 0) / 2
|
||||
deepface_center_y = facial_area.get('y', 0) + facial_area.get('h', 0) / 2
|
||||
|
||||
best_match = None
|
||||
min_distance = float('inf')
|
||||
|
||||
for pose_face in pose_faces:
|
||||
pose_area = pose_face.get('facial_area', {})
|
||||
|
||||
# Handle both dict and list formats (for robustness)
|
||||
if isinstance(pose_area, list) and len(pose_area) >= 4:
|
||||
# Convert list [x, y, w, h] to dict format
|
||||
pose_area = {
|
||||
'x': pose_area[0],
|
||||
'y': pose_area[1],
|
||||
'w': pose_area[2],
|
||||
'h': pose_area[3]
|
||||
}
|
||||
elif not isinstance(pose_area, dict):
|
||||
# Skip if not dict or list
|
||||
continue
|
||||
|
||||
pose_center_x = (pose_area.get('x', 0) +
|
||||
pose_area.get('w', 0) / 2)
|
||||
pose_center_y = (pose_area.get('y', 0) +
|
||||
pose_area.get('h', 0) / 2)
|
||||
|
||||
# Calculate distance between centers
|
||||
distance = ((deepface_center_x - pose_center_x) ** 2 +
|
||||
(deepface_center_y - pose_center_y) ** 2) ** 0.5
|
||||
|
||||
if distance < min_distance:
|
||||
min_distance = distance
|
||||
best_match = pose_face
|
||||
|
||||
# If match is close enough (within 50 pixels), use it
|
||||
if best_match and min_distance < 50:
|
||||
return {
|
||||
'pose_mode': best_match.get('pose_mode', 'frontal'),
|
||||
'yaw_angle': best_match.get('yaw_angle'),
|
||||
'pitch_angle': best_match.get('pitch_angle'),
|
||||
'roll_angle': best_match.get('roll_angle')
|
||||
}
|
||||
|
||||
return {
|
||||
'pose_mode': 'frontal',
|
||||
'yaw_angle': None,
|
||||
'pitch_angle': None,
|
||||
'roll_angle': None
|
||||
}
|
||||
|
||||
|
||||
def process_unprocessed_photos(
|
||||
db: Session,
|
||||
batch_size: Optional[int] = None,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user