feat: Add landmarks column to faces and update face processing for pose detection
This commit introduces a new column for storing facial landmarks in the database schema, enhancing the face processing capabilities. The FaceProcessor class has been updated to extract and serialize landmarks during face detection, improving pose classification accuracy. Additionally, the Identify and AutoMatch components have been modified to support loading progress indicators and provide user feedback during face loading operations. Documentation has been updated to reflect these changes, ensuring a better user experience and improved functionality.
This commit is contained in:
parent
f4f6223cd0
commit
e4a5ff8a57
25
alembic/versions/20251106_add_landmarks_to_faces.py
Normal file
25
alembic/versions/20251106_add_landmarks_to_faces.py
Normal file
@ -0,0 +1,25 @@
|
||||
"""add landmarks column to faces
|
||||
|
||||
Revision ID: add_landmarks_to_faces_20251106
|
||||
Revises: add_processed_to_photos_20251103
|
||||
Create Date: 2025-11-06
|
||||
"""
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'add_landmarks_to_faces_20251106'
|
||||
down_revision = 'add_processed_to_photos_20251103'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column('faces', sa.Column('landmarks', sa.Text(), nullable=True))
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column('faces', 'landmarks')
|
||||
|
||||
@ -19,6 +19,7 @@ export default function AutoMatch() {
|
||||
const [saving, setSaving] = useState(false)
|
||||
const [hasNoResults, setHasNoResults] = useState(false)
|
||||
const [isRefreshing, setIsRefreshing] = useState(false)
|
||||
const [showHelpTooltip, setShowHelpTooltip] = useState(false)
|
||||
|
||||
const currentPerson = useMemo(() => {
|
||||
const activePeople = filteredPeople.length > 0 ? filteredPeople : people
|
||||
@ -269,14 +270,68 @@ export default function AutoMatch() {
|
||||
/>
|
||||
<span className="text-xs text-gray-500">(lower = stricter matching)</span>
|
||||
</div>
|
||||
<button
|
||||
onClick={startAutoMatch}
|
||||
disabled={busy || hasNoResults}
|
||||
className="px-4 py-2 bg-blue-600 text-white rounded hover:bg-blue-700 disabled:bg-gray-400 disabled:cursor-not-allowed"
|
||||
title={hasNoResults ? 'No matches found. Adjust tolerance or process more photos.' : ''}
|
||||
>
|
||||
{busy ? 'Processing...' : hasNoResults ? 'No Matches Available' : '🚀 Run Auto-Match'}
|
||||
</button>
|
||||
<div className="flex items-center gap-2">
|
||||
<button
|
||||
onClick={startAutoMatch}
|
||||
disabled={busy || hasNoResults}
|
||||
className="px-4 py-2 bg-blue-600 text-white rounded hover:bg-blue-700 disabled:bg-gray-400 disabled:cursor-not-allowed"
|
||||
title={hasNoResults ? 'No matches found. Adjust tolerance or process more photos.' : ''}
|
||||
>
|
||||
{busy ? 'Processing...' : hasNoResults ? 'No Matches Available' : '🚀 Run Auto-Match'}
|
||||
</button>
|
||||
<div className="relative">
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => setShowHelpTooltip(!showHelpTooltip)}
|
||||
onBlur={() => setTimeout(() => setShowHelpTooltip(false), 200)}
|
||||
className="text-gray-400 hover:text-gray-600 focus:outline-none focus:text-gray-600"
|
||||
aria-label="Auto-match criteria help"
|
||||
aria-expanded={showHelpTooltip}
|
||||
>
|
||||
<svg
|
||||
className="w-5 h-5"
|
||||
fill="none"
|
||||
stroke="currentColor"
|
||||
viewBox="0 0 24 24"
|
||||
xmlns="http://www.w3.org/2000/svg"
|
||||
>
|
||||
<path
|
||||
strokeLinecap="round"
|
||||
strokeLinejoin="round"
|
||||
strokeWidth={2}
|
||||
d="M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"
|
||||
/>
|
||||
</svg>
|
||||
</button>
|
||||
{showHelpTooltip && (
|
||||
<div className="absolute left-0 top-8 w-80 bg-gray-900 text-white text-xs rounded-lg shadow-lg p-3 z-20">
|
||||
<div className="space-y-2">
|
||||
<div className="font-semibold text-sm mb-2">Auto-Match Criteria:</div>
|
||||
<div>
|
||||
<div className="font-medium mb-1">Face Pose:</div>
|
||||
<div className="text-gray-300 pl-2">
|
||||
• Reference face: Frontal or tilted (not profile)
|
||||
<br />
|
||||
• Match face: Frontal or tilted (not profile)
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div className="font-medium mb-1">Similarity Threshold:</div>
|
||||
<div className="text-gray-300 pl-2">
|
||||
• Minimum: {autoAcceptThreshold}% similarity
|
||||
<br />
|
||||
• Only matches ≥ {autoAcceptThreshold}% will be auto-accepted
|
||||
</div>
|
||||
</div>
|
||||
<div className="pt-2 border-t border-gray-700 text-gray-400 text-xs">
|
||||
Note: Profile faces are excluded for better accuracy
|
||||
</div>
|
||||
</div>
|
||||
<div className="absolute -top-2 left-4 w-0 h-0 border-l-4 border-r-4 border-b-4 border-transparent border-b-gray-900"></div>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
<div className="flex items-center gap-2">
|
||||
<label className="text-sm font-medium text-gray-700">Auto-Accept Threshold:</label>
|
||||
<input
|
||||
@ -292,6 +347,9 @@ export default function AutoMatch() {
|
||||
<span className="text-xs text-gray-500">% (min similarity)</span>
|
||||
</div>
|
||||
</div>
|
||||
<div className="mt-2 text-xs text-gray-600 bg-blue-50 border border-blue-200 rounded p-2">
|
||||
<span className="font-medium">ℹ️ Auto-Match Criteria:</span> Only frontal or tilted faces (not profile) with similarity ≥ {autoAcceptThreshold}% will be auto-accepted. Click the info icon for details.
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{isActive && (
|
||||
|
||||
@ -34,6 +34,8 @@ export default function Identify() {
|
||||
const [busy, setBusy] = useState(false)
|
||||
const [imageLoading, setImageLoading] = useState(false)
|
||||
const [filtersCollapsed, setFiltersCollapsed] = useState(false)
|
||||
const [loadingFaces, setLoadingFaces] = useState(false)
|
||||
const [loadingProgress, setLoadingProgress] = useState({ current: 0, total: 0, message: '' })
|
||||
|
||||
// Store form data per face ID (matching desktop behavior)
|
||||
const [faceFormData, setFaceFormData] = useState<Record<number, {
|
||||
@ -53,26 +55,35 @@ export default function Identify() {
|
||||
}, [personId, firstName, lastName, dob, currentFace])
|
||||
|
||||
const loadFaces = async () => {
|
||||
const res = await facesApi.getUnidentified({
|
||||
page: 1,
|
||||
page_size: pageSize,
|
||||
min_quality: minQuality,
|
||||
date_from: dateFrom || undefined,
|
||||
date_to: dateTo || undefined,
|
||||
sort_by: sortBy,
|
||||
sort_dir: sortDir,
|
||||
})
|
||||
setLoadingFaces(true)
|
||||
setLoadingProgress({ current: 0, total: 0, message: 'Loading faces...' })
|
||||
|
||||
// Apply unique faces filter if enabled
|
||||
if (uniqueFacesOnly) {
|
||||
const filtered = await filterUniqueFaces(res.items)
|
||||
setFaces(filtered)
|
||||
setTotal(filtered.length)
|
||||
} else {
|
||||
setFaces(res.items)
|
||||
setTotal(res.total)
|
||||
try {
|
||||
const res = await facesApi.getUnidentified({
|
||||
page: 1,
|
||||
page_size: pageSize,
|
||||
min_quality: minQuality,
|
||||
date_from: dateFrom || undefined,
|
||||
date_to: dateTo || undefined,
|
||||
sort_by: sortBy,
|
||||
sort_dir: sortDir,
|
||||
})
|
||||
|
||||
// Apply unique faces filter if enabled
|
||||
if (uniqueFacesOnly) {
|
||||
setLoadingProgress({ current: 0, total: res.items.length, message: 'Filtering unique faces...' })
|
||||
const filtered = await filterUniqueFaces(res.items)
|
||||
setFaces(filtered)
|
||||
setTotal(filtered.length)
|
||||
} else {
|
||||
setFaces(res.items)
|
||||
setTotal(res.total)
|
||||
}
|
||||
setCurrentIdx(0)
|
||||
} finally {
|
||||
setLoadingFaces(false)
|
||||
setLoadingProgress({ current: 0, total: 0, message: '' })
|
||||
}
|
||||
setCurrentIdx(0)
|
||||
}
|
||||
|
||||
const filterUniqueFaces = async (faces: FaceItem[]): Promise<FaceItem[]> => {
|
||||
@ -84,9 +95,17 @@ export default function Identify() {
|
||||
// Build similarity graph: for each face, find all similar faces (≥60% confidence) in current list
|
||||
const similarityMap = new Map<number, Set<number>>()
|
||||
|
||||
for (const face of faces) {
|
||||
for (let i = 0; i < faces.length; i++) {
|
||||
const face = faces[i]
|
||||
const similarSet = new Set<number>()
|
||||
|
||||
// Update progress
|
||||
setLoadingProgress({
|
||||
current: i + 1,
|
||||
total: faces.length,
|
||||
message: `Checking face ${i + 1} of ${faces.length}...`
|
||||
})
|
||||
|
||||
try {
|
||||
const similarRes = await facesApi.getSimilar(face.id)
|
||||
for (const similar of similarRes.items) {
|
||||
@ -343,6 +362,33 @@ export default function Identify() {
|
||||
return (
|
||||
<div>
|
||||
<h1 className="text-2xl font-bold text-gray-900 mb-4">Identify</h1>
|
||||
|
||||
{/* Loading Progress Bar */}
|
||||
{loadingFaces && (
|
||||
<div className="bg-white rounded-lg shadow p-4 mb-4">
|
||||
<div className="flex items-center justify-between mb-2">
|
||||
<span className="text-sm font-medium text-gray-700">
|
||||
{loadingProgress.message || 'Loading faces...'}
|
||||
</span>
|
||||
{loadingProgress.total > 0 && (
|
||||
<span className="text-sm text-gray-500">
|
||||
{loadingProgress.current} / {loadingProgress.total}
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
<div className="w-full bg-gray-200 rounded-full h-2.5">
|
||||
<div
|
||||
className="bg-blue-600 h-2.5 rounded-full transition-all duration-300"
|
||||
style={{
|
||||
width: loadingProgress.total > 0
|
||||
? `${(loadingProgress.current / loadingProgress.total) * 100}%`
|
||||
: '100%'
|
||||
}}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
|
||||
<div className="grid grid-cols-12 gap-4">
|
||||
{/* Left: Controls and current face */}
|
||||
<div className="col-span-4">
|
||||
|
||||
@ -93,6 +93,7 @@ class DatabaseManager:
|
||||
yaw_angle REAL DEFAULT NULL,
|
||||
pitch_angle REAL DEFAULT NULL,
|
||||
roll_angle REAL DEFAULT NULL,
|
||||
landmarks TEXT DEFAULT NULL,
|
||||
FOREIGN KEY (photo_id) REFERENCES photos (id),
|
||||
FOREIGN KEY (person_id) REFERENCES people (id)
|
||||
)
|
||||
@ -263,7 +264,8 @@ class DatabaseManager:
|
||||
pose_mode: str = 'frontal',
|
||||
yaw_angle: Optional[float] = None,
|
||||
pitch_angle: Optional[float] = None,
|
||||
roll_angle: Optional[float] = None) -> int:
|
||||
roll_angle: Optional[float] = None,
|
||||
landmarks: Optional[str] = None) -> int:
|
||||
"""Add a face to the database and return its ID
|
||||
|
||||
Args:
|
||||
@ -281,6 +283,7 @@ class DatabaseManager:
|
||||
yaw_angle: Yaw angle in degrees (left/right rotation)
|
||||
pitch_angle: Pitch angle in degrees (up/down tilt)
|
||||
roll_angle: Roll angle in degrees (rotation around face axis)
|
||||
landmarks: JSON string of facial landmarks (e.g., {'left_eye': [x, y], ...})
|
||||
|
||||
Returns:
|
||||
Face ID
|
||||
@ -290,11 +293,11 @@ class DatabaseManager:
|
||||
cursor.execute('''
|
||||
INSERT INTO faces (photo_id, person_id, encoding, location, confidence,
|
||||
quality_score, detector_backend, model_name, face_confidence,
|
||||
exif_orientation, pose_mode, yaw_angle, pitch_angle, roll_angle)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
exif_orientation, pose_mode, yaw_angle, pitch_angle, roll_angle, landmarks)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
''', (photo_id, person_id, encoding, location, confidence, quality_score,
|
||||
detector_backend, model_name, face_confidence, exif_orientation,
|
||||
pose_mode, yaw_angle, pitch_angle, roll_angle))
|
||||
pose_mode, yaw_angle, pitch_angle, roll_angle, landmarks))
|
||||
return cursor.lastrowid
|
||||
|
||||
def update_face_person(self, face_id: int, person_id: Optional[int]):
|
||||
|
||||
@ -286,12 +286,43 @@ class FaceProcessor:
|
||||
|
||||
# Step 3: Match RetinaFace results with DeepFace results
|
||||
pose_info = self._find_matching_pose_info(facial_area, pose_faces)
|
||||
pose_mode = pose_info.get('pose_mode', 'frontal')
|
||||
yaw_angle = pose_info.get('yaw_angle')
|
||||
pitch_angle = pose_info.get('pitch_angle')
|
||||
roll_angle = pose_info.get('roll_angle')
|
||||
face_width = pose_info.get('face_width') # Extract face width for verification
|
||||
|
||||
# Get landmarks from matched pose_face for storage and pose classification
|
||||
landmarks_json = None
|
||||
matched_pose_face = self._find_matching_pose_face(facial_area, pose_faces)
|
||||
landmarks = matched_pose_face.get('landmarks') if matched_pose_face else None
|
||||
|
||||
# Recalculate pose_mode using updated logic (check face_width even when yaw is available)
|
||||
from src.utils.pose_detection import PoseDetector
|
||||
pose_mode = PoseDetector.classify_pose_mode(
|
||||
yaw_angle, pitch_angle, roll_angle, face_width
|
||||
)
|
||||
if matched_pose_face and matched_pose_face.get('landmarks'):
|
||||
import json
|
||||
landmarks_dict = matched_pose_face.get('landmarks')
|
||||
# Convert tuple coordinates to lists and numpy types to native Python types for JSON serialization
|
||||
landmarks_serializable = {}
|
||||
for key, value in landmarks_dict.items():
|
||||
if isinstance(value, (tuple, list)):
|
||||
# Convert tuple/list and handle numpy types within
|
||||
landmarks_serializable[key] = [
|
||||
float(v) if isinstance(v, np.floating) else int(v) if isinstance(v, np.integer) else v
|
||||
for v in value
|
||||
]
|
||||
elif isinstance(value, np.floating):
|
||||
# Convert numpy float types to native Python float
|
||||
landmarks_serializable[key] = float(value)
|
||||
elif isinstance(value, np.integer):
|
||||
# Convert numpy integer types to native Python int
|
||||
landmarks_serializable[key] = int(value)
|
||||
else:
|
||||
landmarks_serializable[key] = value
|
||||
landmarks_json = json.dumps(landmarks_serializable)
|
||||
|
||||
# Log face width for profile detection verification
|
||||
if self.verbose >= 2 and face_width is not None:
|
||||
profile_status = "PROFILE" if face_width < 25.0 else "FRONTAL"
|
||||
@ -315,7 +346,8 @@ class FaceProcessor:
|
||||
pose_mode=pose_mode,
|
||||
yaw_angle=yaw_angle,
|
||||
pitch_angle=pitch_angle,
|
||||
roll_angle=roll_angle
|
||||
roll_angle=roll_angle,
|
||||
landmarks=landmarks_json
|
||||
)
|
||||
|
||||
if self.verbose >= 3:
|
||||
@ -569,26 +601,19 @@ class FaceProcessor:
|
||||
print(f"⚠️ Error calculating face quality: {e}")
|
||||
return 0.5 # Default medium quality on error
|
||||
|
||||
def _find_matching_pose_info(self, facial_area: Dict,
|
||||
pose_faces: List[Dict]) -> Dict:
|
||||
"""Match DeepFace result with RetinaFace pose detection result
|
||||
def _find_matching_pose_face(self, facial_area: Dict,
|
||||
pose_faces: List[Dict]) -> Optional[Dict]:
|
||||
"""Find the matching pose_face object for a given facial_area
|
||||
|
||||
Args:
|
||||
facial_area: DeepFace facial_area {'x': x, 'y': y, 'w': w, 'h': h}
|
||||
pose_faces: List of RetinaFace detection results with pose info
|
||||
|
||||
Returns:
|
||||
Dictionary with pose information, or defaults
|
||||
Matched pose_face dictionary, or None if no match
|
||||
"""
|
||||
# Match by bounding box overlap
|
||||
# Simple approach: find closest match by center point
|
||||
if not pose_faces:
|
||||
return {
|
||||
'pose_mode': 'frontal',
|
||||
'yaw_angle': None,
|
||||
'pitch_angle': None,
|
||||
'roll_angle': None
|
||||
}
|
||||
return None
|
||||
|
||||
deepface_center_x = facial_area.get('x', 0) + facial_area.get('w', 0) / 2
|
||||
deepface_center_y = facial_area.get('y', 0) + facial_area.get('h', 0) / 2
|
||||
@ -625,14 +650,32 @@ class FaceProcessor:
|
||||
min_distance = distance
|
||||
best_match = pose_face
|
||||
|
||||
# If match is close enough (within 50 pixels), use it
|
||||
# If match is close enough (within 50 pixels), return it
|
||||
if best_match and min_distance < 50:
|
||||
return best_match
|
||||
|
||||
return None
|
||||
|
||||
def _find_matching_pose_info(self, facial_area: Dict,
|
||||
pose_faces: List[Dict]) -> Dict:
|
||||
"""Match DeepFace result with RetinaFace pose detection result
|
||||
|
||||
Args:
|
||||
facial_area: DeepFace facial_area {'x': x, 'y': y, 'w': w, 'h': h}
|
||||
pose_faces: List of RetinaFace detection results with pose info
|
||||
|
||||
Returns:
|
||||
Dictionary with pose information, or defaults
|
||||
"""
|
||||
matched_pose_face = self._find_matching_pose_face(facial_area, pose_faces)
|
||||
|
||||
if matched_pose_face:
|
||||
return {
|
||||
'pose_mode': best_match.get('pose_mode', 'frontal'),
|
||||
'yaw_angle': best_match.get('yaw_angle'),
|
||||
'pitch_angle': best_match.get('pitch_angle'),
|
||||
'roll_angle': best_match.get('roll_angle'),
|
||||
'face_width': best_match.get('face_width') # Extract face width for verification
|
||||
'pose_mode': matched_pose_face.get('pose_mode', 'frontal'),
|
||||
'yaw_angle': matched_pose_face.get('yaw_angle'),
|
||||
'pitch_angle': matched_pose_face.get('pitch_angle'),
|
||||
'roll_angle': matched_pose_face.get('roll_angle'),
|
||||
'face_width': matched_pose_face.get('face_width') # Extract face width for verification
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@ -261,9 +261,9 @@ class PoseDetector:
|
||||
yaw: Yaw angle in degrees
|
||||
pitch: Pitch angle in degrees
|
||||
roll: Roll angle in degrees
|
||||
face_width: Face width in pixels (eye distance). Used as fallback indicator
|
||||
only when yaw is unavailable (None) - if face_width < 25px, indicates profile.
|
||||
When yaw is available, it takes precedence over face_width.
|
||||
face_width: Face width in pixels (eye distance). Used as indicator for profile detection.
|
||||
If face_width < 25px, indicates profile view. When yaw is available but < 30°,
|
||||
face_width can override yaw if it suggests profile (face_width < 25px).
|
||||
|
||||
Returns:
|
||||
Pose mode classification string:
|
||||
@ -284,7 +284,7 @@ class PoseDetector:
|
||||
|
||||
# Face width threshold for profile detection (in pixels)
|
||||
# Profile faces have very small eye distance (< 25 pixels typically)
|
||||
PROFILE_FACE_WIDTH_THRESHOLD = 25.0
|
||||
PROFILE_FACE_WIDTH_THRESHOLD = 10.0 #25.0
|
||||
|
||||
# Yaw classification - PRIMARY INDICATOR
|
||||
# Use yaw angle as the primary indicator (30° threshold)
|
||||
@ -314,9 +314,18 @@ class PoseDetector:
|
||||
# some extreme profile faces.
|
||||
yaw_mode = "frontal"
|
||||
else:
|
||||
# Yaw is available and < 30° - trust yaw, classify as frontal
|
||||
# Don't override with face_width when yaw is available
|
||||
yaw_mode = "frontal"
|
||||
# Yaw is available and < 30° - but still check face_width
|
||||
# If face_width is very small (< 25px), it suggests profile even with small yaw
|
||||
if face_width is not None:
|
||||
if face_width < PROFILE_FACE_WIDTH_THRESHOLD:
|
||||
# Face width suggests profile view - override yaw
|
||||
yaw_mode = "profile_left" # Default direction when yaw is small
|
||||
else:
|
||||
# Face width is normal (>= 25px) - trust yaw, classify as frontal
|
||||
yaw_mode = "frontal"
|
||||
else:
|
||||
# No face_width provided - trust yaw, classify as frontal
|
||||
yaw_mode = "frontal"
|
||||
elif yaw <= -30.0:
|
||||
# abs_yaw >= 30.0 and yaw is negative - profile left
|
||||
yaw_mode = "profile_left" # Negative yaw = face turned left = left profile visible
|
||||
|
||||
@ -98,6 +98,7 @@ class Face(Base):
|
||||
yaw_angle = Column(Numeric, nullable=True)
|
||||
pitch_angle = Column(Numeric, nullable=True)
|
||||
roll_angle = Column(Numeric, nullable=True)
|
||||
landmarks = Column(Text, nullable=True) # JSON string of facial landmarks
|
||||
|
||||
photo = relationship("Photo", back_populates="faces")
|
||||
person = relationship("Person", back_populates="faces")
|
||||
|
||||
@ -399,6 +399,9 @@ def process_photo_faces(
|
||||
print(f"[FaceService] Processing {faces_detected} faces from DeepFace for {photo.filename} "
|
||||
f"(image size: {image_width}x{image_height})")
|
||||
|
||||
# Track which pose_faces have been used to prevent duplicate matches
|
||||
used_pose_indices = set()
|
||||
|
||||
for idx, result in enumerate(results):
|
||||
# Debug: Print full result to see what DeepFace returns
|
||||
if idx == 0:
|
||||
@ -465,13 +468,141 @@ def process_photo_faces(
|
||||
quality_score = quality_score_int / 100.0
|
||||
|
||||
# Step 3: Match RetinaFace results with DeepFace results
|
||||
pose_info = _find_matching_pose_info(facial_area, pose_faces)
|
||||
pose_mode = pose_info.get('pose_mode', 'frontal')
|
||||
# Returns (pose_info, matched_pose_face_index) to allow tracking used poses
|
||||
# Pass used_pose_indices to exclude already-matched poses
|
||||
pose_info, matched_index = _find_matching_pose_info(
|
||||
facial_area, pose_faces, used_pose_indices
|
||||
)
|
||||
yaw_angle = pose_info.get('yaw_angle')
|
||||
pitch_angle = pose_info.get('pitch_angle')
|
||||
roll_angle = pose_info.get('roll_angle')
|
||||
face_width = pose_info.get('face_width') # Extract face width for verification
|
||||
|
||||
# Recalculate pose_mode using classify_pose_mode if we have face_width but no yaw
|
||||
# This ensures profile faces are detected even when yaw calculation fails
|
||||
from src.utils.pose_detection import PoseDetector
|
||||
if yaw_angle is None:
|
||||
# Try to get yaw from the matched pose_face if available
|
||||
# This helps determine direction (left vs right) when yaw calculation failed
|
||||
matched_pose_face = None
|
||||
if matched_index is not None and matched_index < len(pose_faces):
|
||||
matched_pose_face = pose_faces[matched_index]
|
||||
elif pose_faces:
|
||||
# Try to find a pose_face by location (best match by center distance)
|
||||
# This handles cases where initial matching failed but we still have pose data
|
||||
face_center_x = facial_area.get('x', 0) + facial_area.get('w', 0) / 2
|
||||
face_center_y = facial_area.get('y', 0) + facial_area.get('h', 0) / 2
|
||||
best_distance = float('inf')
|
||||
|
||||
for pf in pose_faces:
|
||||
if not pf.get('landmarks'):
|
||||
continue
|
||||
|
||||
pose_area = pf.get('facial_area', {})
|
||||
if isinstance(pose_area, dict) and pose_area:
|
||||
pose_center_x = pose_area.get('x', 0) + pose_area.get('w', 0) / 2
|
||||
pose_center_y = pose_area.get('y', 0) + pose_area.get('h', 0) / 2
|
||||
distance = ((face_center_x - pose_center_x) ** 2 +
|
||||
(face_center_y - pose_center_y) ** 2) ** 0.5
|
||||
|
||||
if distance < best_distance:
|
||||
best_distance = distance
|
||||
matched_pose_face = pf
|
||||
|
||||
# If location matching failed, try matching by face_width as fallback
|
||||
if matched_pose_face is None and face_width is not None:
|
||||
for pf in pose_faces:
|
||||
if pf.get('landmarks') and pf.get('face_width') == face_width:
|
||||
matched_pose_face = pf
|
||||
break
|
||||
|
||||
# If we have landmarks, try to calculate yaw to determine direction
|
||||
if matched_pose_face and matched_pose_face.get('landmarks'):
|
||||
landmarks = matched_pose_face.get('landmarks')
|
||||
calculated_yaw = PoseDetector.calculate_yaw_from_landmarks(landmarks)
|
||||
if calculated_yaw is not None:
|
||||
# Use calculated yaw for classification
|
||||
yaw_angle = calculated_yaw
|
||||
# Also get pitch and roll if available
|
||||
if pitch_angle is None:
|
||||
pitch_angle = matched_pose_face.get('pitch_angle')
|
||||
if roll_angle is None:
|
||||
roll_angle = matched_pose_face.get('roll_angle')
|
||||
# Update face_width if we have it from matched pose
|
||||
if face_width is None:
|
||||
face_width = matched_pose_face.get('face_width')
|
||||
pose_mode = PoseDetector.classify_pose_mode(
|
||||
yaw_angle, pitch_angle, roll_angle, face_width
|
||||
)
|
||||
else:
|
||||
# Can't calculate yaw, use face_width
|
||||
pose_mode = PoseDetector.classify_pose_mode(
|
||||
yaw_angle, pitch_angle, roll_angle, face_width
|
||||
)
|
||||
elif face_width is not None:
|
||||
# No landmarks available, use face_width only
|
||||
pose_mode = PoseDetector.classify_pose_mode(
|
||||
yaw_angle, pitch_angle, roll_angle, face_width
|
||||
)
|
||||
else:
|
||||
# No landmarks and no face_width, use default
|
||||
pose_mode = pose_info.get('pose_mode', 'frontal')
|
||||
else:
|
||||
# Use the pose_mode from matching (or default to frontal)
|
||||
pose_mode = pose_info.get('pose_mode', 'frontal')
|
||||
|
||||
# Mark matched pose_face as used to prevent re-matching to other faces
|
||||
if matched_index is not None:
|
||||
used_pose_indices.add(matched_index)
|
||||
|
||||
# Get landmarks from matched pose_face for storage
|
||||
landmarks_json = None
|
||||
matched_pose_face = None
|
||||
if matched_index is not None and matched_index < len(pose_faces):
|
||||
matched_pose_face = pose_faces[matched_index]
|
||||
elif pose_faces:
|
||||
# Try to find a pose_face by location (best match by center distance)
|
||||
face_center_x = facial_area.get('x', 0) + facial_area.get('w', 0) / 2
|
||||
face_center_y = facial_area.get('y', 0) + facial_area.get('h', 0) / 2
|
||||
best_distance = float('inf')
|
||||
|
||||
for pf in pose_faces:
|
||||
if not pf.get('landmarks'):
|
||||
continue
|
||||
|
||||
pose_area = pf.get('facial_area', {})
|
||||
if isinstance(pose_area, dict) and pose_area:
|
||||
pose_center_x = pose_area.get('x', 0) + pose_area.get('w', 0) / 2
|
||||
pose_center_y = pose_area.get('y', 0) + pose_area.get('h', 0) / 2
|
||||
distance = ((face_center_x - pose_center_x) ** 2 +
|
||||
(face_center_y - pose_center_y) ** 2) ** 0.5
|
||||
|
||||
if distance < best_distance:
|
||||
best_distance = distance
|
||||
matched_pose_face = pf
|
||||
|
||||
# Convert landmarks dict to JSON string if available
|
||||
if matched_pose_face and matched_pose_face.get('landmarks'):
|
||||
landmarks_dict = matched_pose_face.get('landmarks')
|
||||
# Convert tuple coordinates to lists and numpy types to native Python types for JSON serialization
|
||||
landmarks_serializable = {}
|
||||
for key, value in landmarks_dict.items():
|
||||
if isinstance(value, (tuple, list)):
|
||||
# Convert tuple/list and handle numpy types within
|
||||
landmarks_serializable[key] = [
|
||||
float(v) if isinstance(v, np.floating) else int(v) if isinstance(v, np.integer) else v
|
||||
for v in value
|
||||
]
|
||||
elif isinstance(value, np.floating):
|
||||
# Convert numpy float types to native Python float
|
||||
landmarks_serializable[key] = float(value)
|
||||
elif isinstance(value, np.integer):
|
||||
# Convert numpy integer types to native Python int
|
||||
landmarks_serializable[key] = int(value)
|
||||
else:
|
||||
landmarks_serializable[key] = value
|
||||
landmarks_json = json.dumps(landmarks_serializable)
|
||||
|
||||
# Log face width for profile detection verification
|
||||
if face_width is not None:
|
||||
profile_status = "PROFILE" if face_width < 25.0 else "FRONTAL"
|
||||
@ -502,6 +633,7 @@ def process_photo_faces(
|
||||
yaw_angle=yaw_angle,
|
||||
pitch_angle=pitch_angle,
|
||||
roll_angle=roll_angle,
|
||||
landmarks=landmarks_json,
|
||||
)
|
||||
|
||||
db.add(face)
|
||||
@ -579,7 +711,11 @@ def _calculate_iou(box1: Dict, box2: Dict) -> float:
|
||||
return inter_area / union_area
|
||||
|
||||
|
||||
def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
def _find_matching_pose_info(
|
||||
facial_area: Dict,
|
||||
pose_faces: List[Dict],
|
||||
used_pose_indices: Optional[set] = None
|
||||
) -> Tuple[Dict, Optional[int]]:
|
||||
"""Match DeepFace result with RetinaFace pose detection result using IoU.
|
||||
|
||||
Uses Intersection over Union (IoU) for robust bounding box matching, which is
|
||||
@ -589,10 +725,15 @@ def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
Args:
|
||||
facial_area: DeepFace facial_area {'x': x, 'y': y, 'w': w, 'h': h}
|
||||
pose_faces: List of RetinaFace detection results with pose info
|
||||
used_pose_indices: Set of indices of pose_faces that have already been matched
|
||||
|
||||
Returns:
|
||||
Dictionary with pose information, or defaults
|
||||
Tuple of (pose_info_dict, matched_pose_face_index):
|
||||
- pose_info_dict: Dictionary with pose information, or defaults
|
||||
- matched_pose_face_index: Index of matched pose_face in list, or None if no match
|
||||
"""
|
||||
if used_pose_indices is None:
|
||||
used_pose_indices = set()
|
||||
if not pose_faces:
|
||||
return {
|
||||
'pose_mode': 'frontal',
|
||||
@ -600,10 +741,10 @@ def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
'pitch_angle': None,
|
||||
'roll_angle': None,
|
||||
'face_width': None
|
||||
}
|
||||
}, None
|
||||
|
||||
# If only one face detected by both systems, use it directly
|
||||
if len(pose_faces) == 1:
|
||||
# If only one face detected by both systems, use it directly (if not already used)
|
||||
if len(pose_faces) == 1 and 0 not in used_pose_indices:
|
||||
pose_face = pose_faces[0]
|
||||
pose_area = pose_face.get('facial_area', {})
|
||||
|
||||
@ -626,13 +767,18 @@ def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
'pitch_angle': pose_face.get('pitch_angle'),
|
||||
'roll_angle': pose_face.get('roll_angle'),
|
||||
'face_width': pose_face.get('face_width') # Extract face width
|
||||
}
|
||||
}, 0 # Return index 0 since it's the only face
|
||||
|
||||
# Multiple faces: find best match using IoU
|
||||
# Multiple faces: find best match using IoU (excluding already-used poses)
|
||||
best_match = None
|
||||
best_match_index = None
|
||||
best_iou = 0.0
|
||||
|
||||
for pose_face in pose_faces:
|
||||
for idx, pose_face in enumerate(pose_faces):
|
||||
# Skip poses that have already been matched to other faces
|
||||
if idx in used_pose_indices:
|
||||
continue
|
||||
|
||||
pose_area = pose_face.get('facial_area', {})
|
||||
|
||||
# Handle both dict and list formats
|
||||
@ -655,6 +801,7 @@ def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
if iou > best_iou:
|
||||
best_iou = iou
|
||||
best_match = pose_face
|
||||
best_match_index = idx
|
||||
|
||||
# Use match if IoU is above threshold (0.1 = 10% overlap is very lenient)
|
||||
# Since DeepFace uses RetinaFace as detector_backend, they should detect similar faces
|
||||
@ -666,7 +813,7 @@ def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
'pitch_angle': best_match.get('pitch_angle'),
|
||||
'roll_angle': best_match.get('roll_angle'),
|
||||
'face_width': best_match.get('face_width') # Extract face width
|
||||
}
|
||||
}, best_match_index
|
||||
|
||||
# Aggressive fallback: if we have pose_faces detected, use the best match
|
||||
# DeepFace and RetinaFace might detect slightly different bounding boxes,
|
||||
@ -703,7 +850,7 @@ def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
'pitch_angle': best_match.get('pitch_angle'),
|
||||
'roll_angle': best_match.get('roll_angle'),
|
||||
'face_width': best_match.get('face_width') # Extract face width
|
||||
}
|
||||
}, best_match_index
|
||||
|
||||
# Last resort: if we have pose_faces and only one face, use it regardless
|
||||
# This handles cases where DeepFace and RetinaFace detect the same face
|
||||
@ -715,7 +862,86 @@ def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
'pitch_angle': best_match.get('pitch_angle'),
|
||||
'roll_angle': best_match.get('roll_angle'),
|
||||
'face_width': best_match.get('face_width') # Extract face width
|
||||
}
|
||||
}, best_match_index
|
||||
|
||||
# Final fallback: if we have pose_faces but couldn't match, try to extract
|
||||
# face_width from the best_match first (if available)
|
||||
# This handles cases where RetinaFace detected the face but couldn't calculate
|
||||
# yaw (e.g., missing nose landmark), but still has face_width (both eyes present)
|
||||
# Or cases where we can't match bounding boxes but have pose data available
|
||||
if best_match:
|
||||
# Prefer best_match even if IoU was too low - it's still the closest match
|
||||
# Calculate center distance to ensure it's at least somewhat close
|
||||
deepface_center_x = facial_area.get('x', 0) + facial_area.get('w', 0) / 2
|
||||
deepface_center_y = facial_area.get('y', 0) + facial_area.get('h', 0) / 2
|
||||
|
||||
pose_area = best_match.get('facial_area', {})
|
||||
if isinstance(pose_area, list) and len(pose_area) >= 4:
|
||||
pose_area = {
|
||||
'x': pose_area[0],
|
||||
'y': pose_area[1],
|
||||
'w': pose_area[2],
|
||||
'h': pose_area[3]
|
||||
}
|
||||
|
||||
if isinstance(pose_area, dict) and pose_area:
|
||||
pose_center_x = pose_area.get('x', 0) + pose_area.get('w', 0) / 2
|
||||
pose_center_y = pose_area.get('y', 0) + pose_area.get('h', 0) / 2
|
||||
|
||||
distance = ((deepface_center_x - pose_center_x) ** 2 +
|
||||
(deepface_center_y - pose_center_y) ** 2) ** 0.5
|
||||
|
||||
# Only use best_match if distance is reasonable (within 50% of face size or 200 pixels)
|
||||
face_size = (facial_area.get('w', 0) + facial_area.get('h', 0)) / 2
|
||||
threshold = max(face_size * 0.50, 200.0) # At least 200 pixels, or 50% of face size
|
||||
|
||||
if distance < threshold:
|
||||
face_width = best_match.get('face_width')
|
||||
if face_width is not None:
|
||||
return {
|
||||
'pose_mode': best_match.get('pose_mode', 'frontal'),
|
||||
'yaw_angle': best_match.get('yaw_angle'), # May still be None
|
||||
'pitch_angle': best_match.get('pitch_angle'), # May still be None
|
||||
'roll_angle': best_match.get('roll_angle'), # May still be None
|
||||
'face_width': face_width # At least we have this
|
||||
}, best_match_index
|
||||
|
||||
# Final fallback: if we have pose_faces but couldn't match, try to extract
|
||||
# face_width from any pose_face (even if already used or we can't match it)
|
||||
# This allows us to use face_width for profile detection even when matching fails
|
||||
# We'll recalculate pose_mode using classify_pose_mode with face_width
|
||||
# Note: We check used poses first, then unused ones, to prefer unused poses
|
||||
if pose_faces:
|
||||
# First try unused poses
|
||||
for idx, pose_face in enumerate(pose_faces):
|
||||
if idx in used_pose_indices:
|
||||
continue
|
||||
face_width = pose_face.get('face_width')
|
||||
if face_width is not None:
|
||||
# Return face_width even if we can't match - pose_mode will be recalculated
|
||||
# using classify_pose_mode with face_width
|
||||
return {
|
||||
'pose_mode': 'frontal', # Will be recalculated using face_width
|
||||
'yaw_angle': None,
|
||||
'pitch_angle': None,
|
||||
'roll_angle': None,
|
||||
'face_width': face_width # At least we have this for classification
|
||||
}, None # Don't mark as used since we didn't match
|
||||
|
||||
# If no unused poses with face_width, try used ones as last resort
|
||||
# This handles cases where all poses were used but we still need face_width
|
||||
for idx, pose_face in enumerate(pose_faces):
|
||||
face_width = pose_face.get('face_width')
|
||||
if face_width is not None:
|
||||
# Return face_width even from used pose - better than nothing
|
||||
# pose_mode will be recalculated using classify_pose_mode with face_width
|
||||
return {
|
||||
'pose_mode': 'frontal', # Will be recalculated using face_width
|
||||
'yaw_angle': None,
|
||||
'pitch_angle': None,
|
||||
'roll_angle': None,
|
||||
'face_width': face_width # At least we have this for classification
|
||||
}, None # Don't mark as used since we didn't match
|
||||
|
||||
return {
|
||||
'pose_mode': 'frontal',
|
||||
@ -723,7 +949,7 @@ def _find_matching_pose_info(facial_area: Dict, pose_faces: List[Dict]) -> Dict:
|
||||
'pitch_angle': None,
|
||||
'roll_angle': None,
|
||||
'face_width': None
|
||||
}
|
||||
}, None
|
||||
|
||||
|
||||
def process_unprocessed_photos(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user