PunimTag Web Application - Major Feature Release #1

Open
tanyar09 wants to merge 106 commits from dev into master
33 changed files with 171 additions and 2503 deletions
Showing only changes of commit 75a4dc7a4f - Show all commits

View File

@ -12,7 +12,7 @@ module.exports = {
},
ecmaVersion: 'latest',
sourceType: 'module',
project: ['./tsconfig.json'],
project: ['./tsconfig.json', './tsconfig.node.json'],
},
plugins: ['@typescript-eslint', 'react', 'react-hooks'],
extends: [
@ -30,21 +30,37 @@ module.exports = {
'max-len': [
'error',
{
code: 100,
code: 120,
tabWidth: 2,
ignoreUrls: true,
ignoreStrings: true,
ignoreTemplateLiterals: true,
ignoreComments: true,
},
],
'react/react-in-jsx-scope': 'off',
'react/no-unescaped-entities': [
'error',
{
forbid: ['>', '}'],
},
],
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/no-explicit-any': 'warn',
'@typescript-eslint/no-unused-vars': [
'error',
{ argsIgnorePattern: '^_', varsIgnorePattern: '^_' },
],
'react-hooks/exhaustive-deps': 'warn',
},
overrides: [
{
files: ['**/Help.tsx', '**/Dashboard.tsx'],
rules: {
'react/no-unescaped-entities': 'off',
},
},
],
}

View File

@ -159,10 +159,7 @@ export default function ApproveIdentified() {
}
}, [dateFrom, dateTo])
const handleOpenReport = () => {
setShowReport(true)
loadReport()
}
// Removed unused handleOpenReport function
const handleCloseReport = () => {
setShowReport(false)

View File

@ -180,7 +180,6 @@ export default function AutoMatch() {
} finally {
setSettingsLoaded(true)
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [])
// Load state from sessionStorage on mount (people, current index, selected faces)

View File

@ -4,7 +4,7 @@ import { photosApi, PhotoSearchResult } from '../api/photos'
import apiClient from '../api/client'
export default function Dashboard() {
const { username } = useAuth()
const { username: _username } = useAuth()
const [samplePhotos, setSamplePhotos] = useState<PhotoSearchResult[]>([])
const [loadingPhotos, setLoadingPhotos] = useState(true)

View File

@ -386,7 +386,7 @@ export default function Identify() {
} finally {
setSettingsLoaded(true)
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [photoIds])
// Load state from sessionStorage on mount (faces, current index, similar, form data)
@ -433,7 +433,7 @@ export default function Identify() {
} finally {
setStateRestored(true)
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [photoIds])
// Save state to sessionStorage whenever it changes (but only after initial restore)
@ -530,7 +530,7 @@ export default function Identify() {
loadPeople()
loadTags()
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [settingsLoaded])
// Reset filters when photoIds is provided (to ensure all faces from those photos are shown)
@ -544,7 +544,7 @@ export default function Identify() {
// Keep uniqueFacesOnly as is (user preference)
// Keep sortBy/sortDir as defaults (quality desc)
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [photoIds, settingsLoaded])
// Initial load on mount (after settings and state are loaded)
@ -951,6 +951,7 @@ export default function Identify() {
loadVideos()
loadPeople() // Load people for the dropdown
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [activeTab, videosPage, videosPageSize, videosFolderFilter, videosDateFrom, videosDateTo, videosHasPeople, videosPersonName, videosSortBy, videosSortDir])
return (
@ -1290,7 +1291,6 @@ export default function Identify() {
crossOrigin="anonymous"
loading="eager"
onLoad={() => setImageLoading(false)}
onLoadStart={() => setImageLoading(true)}
onError={(e) => {
const target = e.target as HTMLImageElement
target.style.display = 'none'

View File

@ -305,7 +305,7 @@ export default function Modify() {
} finally {
setStateRestored(true)
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [])
useEffect(() => {

View File

@ -2,7 +2,7 @@ import { useEffect, useState, useCallback, useRef, useMemo } from 'react'
import { pendingPhotosApi, PendingPhotoResponse, ReviewDecision, CleanupResponse } from '../api/pendingPhotos'
import { apiClient } from '../api/client'
import { useAuth } from '../context/AuthContext'
import { videosApi } from '../api/videos'
// Removed unused videosApi import
type SortKey = 'photo' | 'uploaded_by' | 'file_info' | 'submitted_at' | 'status'
@ -259,7 +259,7 @@ export default function PendingPhotos() {
// Apply to all currently rejected photos
const rejectedPhotoIds = Object.entries(decisions)
.filter(([id, decision]) => decision === 'reject')
.filter(([_id, decision]) => decision === 'reject')
.map(([id]) => parseInt(id))
if (rejectedPhotoIds.length > 0) {

View File

@ -4,15 +4,7 @@ import { useDeveloperMode } from '../context/DeveloperModeContext'
type ViewMode = 'list' | 'icons' | 'compact'
interface PendingTagChange {
photoId: number
tagIds: number[]
}
interface PendingTagRemoval {
photoId: number
tagIds: number[]
}
// Removed unused interfaces PendingTagChange and PendingTagRemoval
interface FolderGroup {
folderPath: string
@ -41,7 +33,7 @@ const loadFolderStatesFromStorage = (): Record<string, boolean> => {
}
export default function Tags() {
const { isDeveloperMode } = useDeveloperMode()
const { isDeveloperMode: _isDeveloperMode } = useDeveloperMode()
const [viewMode, setViewMode] = useState<ViewMode>('list')
const [photos, setPhotos] = useState<PhotoWithTagsItem[]>([])
const [tags, setTags] = useState<TagResponse[]>([])
@ -50,7 +42,7 @@ export default function Tags() {
const [pendingTagChanges, setPendingTagChanges] = useState<Record<number, number[]>>({})
const [pendingTagRemovals, setPendingTagRemovals] = useState<Record<number, number[]>>({})
const [loading, setLoading] = useState(false)
const [saving, setSaving] = useState(false)
const [_saving, setSaving] = useState(false)
const [showManageTags, setShowManageTags] = useState(false)
const [showTagDialog, setShowTagDialog] = useState<number | null>(null)
const [showBulkTagDialog, setShowBulkTagDialog] = useState<string | null>(null)
@ -189,7 +181,7 @@ export default function Tags() {
aVal = a.face_count || 0
bVal = b.face_count || 0
break
case 'identified':
case 'identified': {
// Sort by identified count (identified/total ratio)
const aTotal = a.face_count || 0
const aIdentified = aTotal - (a.unidentified_face_count || 0)
@ -206,13 +198,15 @@ export default function Tags() {
bVal = bIdentified
}
break
case 'tags':
}
case 'tags': {
// Get tags for comparison - use photo.tags directly
const aTags = (a.tags || '').toLowerCase()
const bTags = (b.tags || '').toLowerCase()
aVal = aTags
bVal = bTags
break
}
default:
return 0
}
@ -421,7 +415,7 @@ export default function Tags() {
}
// Save pending changes
const saveChanges = async () => {
const _saveChanges = async () => {
const pendingPhotoIds = new Set([
...Object.keys(pendingTagChanges).map(Number),
...Object.keys(pendingTagRemovals).map(Number),
@ -490,7 +484,7 @@ export default function Tags() {
}
// Get pending changes count
const pendingChangesCount = useMemo(() => {
const _pendingChangesCount = useMemo(() => {
const additions = Object.values(pendingTagChanges).reduce((sum, ids) => sum + ids.length, 0)
const removals = Object.values(pendingTagRemovals).reduce((sum, ids) => sum + ids.length, 0)
return additions + removals
@ -1565,7 +1559,7 @@ function BulkTagDialog({
onRemoveTag,
getPhotoTags,
}: {
folderPath: string
folderPath: string // eslint-disable-line @typescript-eslint/no-unused-vars
folder: FolderGroup | undefined
tags: TagResponse[]
pendingTagChanges: Record<number, number[]>

128
docs/CI_SCRIPTS_MAPPING.md Normal file
View File

@ -0,0 +1,128 @@
# CI Workflow and Package Scripts Mapping
This document maps the Gitea CI workflow jobs to the corresponding npm scripts in package.json.
## CI Workflow Jobs → Package Scripts
### 1. `lint-and-type-check` Job
**CI Workflow:**
- Runs `npm run lint` in admin-frontend
- Runs `npm run type-check` in viewer-frontend
**Package Scripts:**
- `npm run lint:admin` - Lint admin-frontend
- `npm run lint:viewer` - Lint viewer-frontend
- `npm run type-check:viewer` - Type check viewer-frontend
- `npm run lint:all` - Lint both frontends
### 2. `python-lint` Job
**CI Workflow:**
- Installs flake8, black, mypy, pylint
- Runs Python syntax check: `find backend -name "*.py" -exec python -m py_compile {} \;`
- Runs flake8: `flake8 backend --max-line-length=100 --ignore=E501,W503`
**Package Scripts:**
- `npm run lint:python` - Run flake8 on backend
- `npm run lint:python:syntax` - Check Python syntax
### 3. `test-backend` Job
**CI Workflow:**
- Installs dependencies from requirements.txt
- Runs: `python -m pytest tests/ -v`
**Package Scripts:**
- `npm run test:backend` - Run backend tests with pytest
- `npm run test:all` - Run all tests (currently just backend)
### 4. `build` Job
**CI Workflow:**
- Builds admin-frontend: `npm run build`
- Generates Prisma client: `npx prisma generate`
- Builds viewer-frontend: `npm run build`
**Package Scripts:**
- `npm run build:admin` - Build admin-frontend
- `npm run build:viewer` - Build viewer-frontend
- `npm run build:all` - Build both frontends
### 5. Security Scans
**CI Workflow:**
- `secret-scanning` - Gitleaks
- `dependency-scan` - Trivy vulnerability and secret scanning
- `sast-scan` - Semgrep
**Package Scripts:**
- No local scripts (these are CI-only security scans)
## Combined Scripts
### `ci:local` - Run All CI Checks Locally
**Package Script:**
```bash
npm run ci:local
```
This runs:
1. `lint:all` - Lint both frontends
2. `type-check:viewer` - Type check viewer-frontend
3. `lint:python` - Lint Python backend
4. `test:backend` - Run backend tests
5. `build:all` - Build both frontends
**Note:** This is a convenience script to run all CI checks locally before pushing.
## Missing from CI (Not in Package Scripts)
These CI jobs don't have corresponding package scripts (by design):
- `secret-scanning` - Gitleaks (security tool, CI-only)
- `dependency-scan` - Trivy (security tool, CI-only)
- `sast-scan` - Semgrep (security tool, CI-only)
- `workflow-summary` - CI workflow summary generation
## Usage Examples
### Run All CI Checks Locally
```bash
npm run ci:local
```
### Run Individual Checks
```bash
# Frontend linting
npm run lint:all
# Type checking
npm run type-check:viewer
# Python linting
npm run lint:python
# Backend tests
npm run test:backend
# Build everything
npm run build:all
```
### Development
```bash
# Start all services
npm run dev:admin # Terminal 1
npm run dev:viewer # Terminal 2
npm run dev:backend # Terminal 3
```
## Notes
- All CI scripts use `continue-on-error: true` or `|| true` to not fail the build
- Local scripts also use `|| true` for non-critical checks
- The `ci:local` script will stop on first failure (unlike CI which continues)
- Python linting requires flake8: `pip install flake8`
- Backend tests require pytest: `pip install pytest`

View File

@ -1,83 +0,0 @@
#!/usr/bin/env python3
"""
Analyze all faces to see why most don't have angle data
"""
import sqlite3
import os
db_path = "data/punimtag.db"
if not os.path.exists(db_path):
print(f"❌ Database not found: {db_path}")
exit(1)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get total faces
cursor.execute("SELECT COUNT(*) FROM faces")
total_faces = cursor.fetchone()[0]
# Get faces with angle data
cursor.execute("SELECT COUNT(*) FROM faces WHERE yaw_angle IS NOT NULL OR pitch_angle IS NOT NULL OR roll_angle IS NOT NULL")
faces_with_angles = cursor.fetchone()[0]
# Get faces without any angle data
faces_without_angles = total_faces - faces_with_angles
print("=" * 80)
print("FACE ANGLE DATA ANALYSIS")
print("=" * 80)
print(f"\nTotal faces: {total_faces}")
print(f"Faces WITH angle data: {faces_with_angles}")
print(f"Faces WITHOUT angle data: {faces_without_angles}")
print(f"Percentage with angle data: {(faces_with_angles/total_faces*100):.1f}%")
# Check pose_mode distribution
print("\n" + "=" * 80)
print("POSE_MODE DISTRIBUTION")
print("=" * 80)
cursor.execute("""
SELECT pose_mode, COUNT(*) as count
FROM faces
GROUP BY pose_mode
ORDER BY count DESC
""")
pose_modes = cursor.fetchall()
for row in pose_modes:
percentage = (row['count'] / total_faces) * 100
print(f" {row['pose_mode']:<30} : {row['count']:>4} ({percentage:>5.1f}%)")
# Check faces with pose_mode=frontal but might have high yaw
print("\n" + "=" * 80)
print("FACES WITH POSE_MODE='frontal' BUT NO ANGLE DATA")
print("=" * 80)
print("(These faces might actually be profile faces but weren't analyzed)")
cursor.execute("""
SELECT COUNT(*)
FROM faces
WHERE pose_mode = 'frontal'
AND yaw_angle IS NULL
AND pitch_angle IS NULL
AND roll_angle IS NULL
""")
frontal_no_data = cursor.fetchone()[0]
print(f" Faces with pose_mode='frontal' and no angle data: {frontal_no_data}")
# Check if pose detection is being run for all faces
print("\n" + "=" * 80)
print("ANALYSIS")
print("=" * 80)
print(f"Only {faces_with_angles} out of {total_faces} faces have angle data stored.")
print("This suggests that pose detection is NOT being run for all faces.")
print("\nPossible reasons:")
print(" 1. Pose detection may have been disabled or failed for most faces")
print(" 2. Only faces processed recently have pose data")
print(" 3. Pose detection might only run when RetinaFace is available")
conn.close()

View File

@ -1,156 +0,0 @@
#!/usr/bin/env python3
"""
Analyze why only 6 faces have yaw angle data - investigate the matching process
"""
import sqlite3
import os
import json
db_path = "data/punimtag.db"
if not os.path.exists(db_path):
print(f"❌ Database not found: {db_path}")
exit(1)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get total faces
cursor.execute("SELECT COUNT(*) FROM faces")
total_faces = cursor.fetchone()[0]
# Get faces with angle data
cursor.execute("SELECT COUNT(*) FROM faces WHERE yaw_angle IS NOT NULL")
faces_with_yaw = cursor.fetchone()[0]
# Get faces without angle data
cursor.execute("SELECT COUNT(*) FROM faces WHERE yaw_angle IS NULL AND pitch_angle IS NULL AND roll_angle IS NULL")
faces_without_angles = cursor.fetchone()[0]
print("=" * 80)
print("POSE DATA COVERAGE ANALYSIS")
print("=" * 80)
print(f"\nTotal faces: {total_faces}")
print(f"Faces WITH yaw angle: {faces_with_yaw}")
print(f"Faces WITHOUT any angle data: {faces_without_angles}")
print(f"Coverage: {(faces_with_yaw/total_faces*100):.1f}%")
# Check pose_mode distribution
print("\n" + "=" * 80)
print("POSE_MODE DISTRIBUTION")
print("=" * 80)
cursor.execute("""
SELECT pose_mode, COUNT(*) as count,
SUM(CASE WHEN yaw_angle IS NOT NULL THEN 1 ELSE 0 END) as with_yaw,
SUM(CASE WHEN pitch_angle IS NOT NULL THEN 1 ELSE 0 END) as with_pitch,
SUM(CASE WHEN roll_angle IS NOT NULL THEN 1 ELSE 0 END) as with_roll
FROM faces
GROUP BY pose_mode
ORDER BY count DESC
""")
pose_modes = cursor.fetchall()
for row in pose_modes:
print(f"\n{row['pose_mode']}:")
print(f" Total: {row['count']}")
print(f" With yaw: {row['with_yaw']}")
print(f" With pitch: {row['with_pitch']}")
print(f" With roll: {row['with_roll']}")
# Check photos and see if some photos have pose data while others don't
print("\n" + "=" * 80)
print("POSE DATA BY PHOTO")
print("=" * 80)
cursor.execute("""
SELECT
p.id as photo_id,
p.filename,
COUNT(f.id) as total_faces,
SUM(CASE WHEN f.yaw_angle IS NOT NULL THEN 1 ELSE 0 END) as faces_with_yaw,
SUM(CASE WHEN f.pitch_angle IS NOT NULL THEN 1 ELSE 0 END) as faces_with_pitch,
SUM(CASE WHEN f.roll_angle IS NOT NULL THEN 1 ELSE 0 END) as faces_with_roll
FROM photos p
LEFT JOIN faces f ON f.photo_id = p.id
GROUP BY p.id, p.filename
HAVING COUNT(f.id) > 0
ORDER BY faces_with_yaw DESC, total_faces DESC
LIMIT 20
""")
photos = cursor.fetchall()
print(f"\n{'Photo ID':<10} {'Filename':<40} {'Total':<8} {'Yaw':<6} {'Pitch':<7} {'Roll':<6}")
print("-" * 80)
for row in photos:
print(f"{row['photo_id']:<10} {row['filename'][:38]:<40} {row['total_faces']:<8} "
f"{row['faces_with_yaw']:<6} {row['faces_with_pitch']:<7} {row['faces_with_roll']:<6}")
# Check if there's a pattern - maybe older photos don't have pose data
print("\n" + "=" * 80)
print("ANALYSIS")
print("=" * 80)
# Check date added vs pose data
cursor.execute("""
SELECT
DATE(p.date_added) as date_added,
COUNT(f.id) as total_faces,
SUM(CASE WHEN f.yaw_angle IS NOT NULL THEN 1 ELSE 0 END) as faces_with_yaw
FROM photos p
LEFT JOIN faces f ON f.photo_id = p.id
GROUP BY DATE(p.date_added)
ORDER BY date_added DESC
""")
dates = cursor.fetchall()
print("\nFaces by date added:")
print(f"{'Date':<15} {'Total':<8} {'With Yaw':<10} {'Coverage':<10}")
print("-" * 50)
for row in dates:
coverage = (row['faces_with_yaw'] / row['total_faces'] * 100) if row['total_faces'] > 0 else 0
print(f"{row['date_added'] or 'NULL':<15} {row['total_faces']:<8} {row['faces_with_yaw']:<10} {coverage:.1f}%")
# Check if pose detection might be failing for some photos
print("\n" + "=" * 80)
print("POSSIBLE REASONS FOR LOW COVERAGE")
print("=" * 80)
print("\n1. Pose detection might not be running for all photos")
print("2. Matching between DeepFace and RetinaFace might be failing (IoU threshold too strict?)")
print("3. RetinaFace might not be detecting faces in some photos")
print("4. Photos might have been processed before pose detection was fully implemented")
# Check if there are photos with multiple faces where some have pose data and some don't
cursor.execute("""
SELECT
p.id as photo_id,
p.filename,
COUNT(f.id) as total_faces,
SUM(CASE WHEN f.yaw_angle IS NOT NULL THEN 1 ELSE 0 END) as faces_with_yaw,
SUM(CASE WHEN f.yaw_angle IS NULL THEN 1 ELSE 0 END) as faces_without_yaw
FROM photos p
JOIN faces f ON f.photo_id = p.id
GROUP BY p.id, p.filename
HAVING COUNT(f.id) > 1
AND SUM(CASE WHEN f.yaw_angle IS NOT NULL THEN 1 ELSE 0 END) > 0
AND SUM(CASE WHEN f.yaw_angle IS NULL THEN 1 ELSE 0 END) > 0
ORDER BY total_faces DESC
LIMIT 10
""")
mixed_photos = cursor.fetchall()
if mixed_photos:
print("\n" + "=" * 80)
print("PHOTOS WITH MIXED POSE DATA (some faces have it, some don't)")
print("=" * 80)
print(f"\n{'Photo ID':<10} {'Filename':<40} {'Total':<8} {'With Yaw':<10} {'Without Yaw':<12}")
print("-" * 80)
for row in mixed_photos:
print(f"{row['photo_id']:<10} {row['filename'][:38]:<40} {row['total_faces']:<8} "
f"{row['faces_with_yaw']:<10} {row['faces_without_yaw']:<12}")
print("\n⚠️ This suggests matching is failing for some faces even when pose detection runs")
else:
print("\n✅ No photos found with mixed pose data (all or nothing per photo)")
conn.close()

View File

@ -1,192 +0,0 @@
#!/usr/bin/env python3
"""
Analyze pose_mode values in the faces table
"""
import sqlite3
import sys
import os
from collections import Counter
from typing import Dict, List, Tuple
# Default database path
DEFAULT_DB_PATH = "data/photos.db"
def analyze_poses(db_path: str) -> None:
"""Analyze pose_mode values in faces table"""
if not os.path.exists(db_path):
print(f"❌ Database not found: {db_path}")
return
print(f"📊 Analyzing poses in database: {db_path}\n")
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get total number of faces
cursor.execute("SELECT COUNT(*) FROM faces")
total_faces = cursor.fetchone()[0]
print(f"Total faces in database: {total_faces}\n")
if total_faces == 0:
print("No faces found in database.")
conn.close()
return
# Get pose_mode distribution
cursor.execute("""
SELECT pose_mode, COUNT(*) as count
FROM faces
GROUP BY pose_mode
ORDER BY count DESC
""")
pose_modes = cursor.fetchall()
print("=" * 60)
print("POSE_MODE DISTRIBUTION")
print("=" * 60)
for row in pose_modes:
pose_mode = row['pose_mode'] or 'NULL'
count = row['count']
percentage = (count / total_faces) * 100
print(f" {pose_mode:30s} : {count:6d} ({percentage:5.1f}%)")
print("\n" + "=" * 60)
print("ANGLE STATISTICS")
print("=" * 60)
# Yaw angle statistics
cursor.execute("""
SELECT
COUNT(*) as total,
COUNT(yaw_angle) as with_yaw,
MIN(yaw_angle) as min_yaw,
MAX(yaw_angle) as max_yaw,
AVG(yaw_angle) as avg_yaw
FROM faces
WHERE yaw_angle IS NOT NULL
""")
yaw_stats = cursor.fetchone()
# Pitch angle statistics
cursor.execute("""
SELECT
COUNT(*) as total,
COUNT(pitch_angle) as with_pitch,
MIN(pitch_angle) as min_pitch,
MAX(pitch_angle) as max_pitch,
AVG(pitch_angle) as avg_pitch
FROM faces
WHERE pitch_angle IS NOT NULL
""")
pitch_stats = cursor.fetchone()
# Roll angle statistics
cursor.execute("""
SELECT
COUNT(*) as total,
COUNT(roll_angle) as with_roll,
MIN(roll_angle) as min_roll,
MAX(roll_angle) as max_roll,
AVG(roll_angle) as avg_roll
FROM faces
WHERE roll_angle IS NOT NULL
""")
roll_stats = cursor.fetchone()
print(f"\nYaw Angle:")
print(f" Faces with yaw data: {yaw_stats['with_yaw']}")
if yaw_stats['with_yaw'] > 0:
print(f" Min: {yaw_stats['min_yaw']:.1f}°")
print(f" Max: {yaw_stats['max_yaw']:.1f}°")
print(f" Avg: {yaw_stats['avg_yaw']:.1f}°")
print(f"\nPitch Angle:")
print(f" Faces with pitch data: {pitch_stats['with_pitch']}")
if pitch_stats['with_pitch'] > 0:
print(f" Min: {pitch_stats['min_pitch']:.1f}°")
print(f" Max: {pitch_stats['max_pitch']:.1f}°")
print(f" Avg: {pitch_stats['avg_pitch']:.1f}°")
print(f"\nRoll Angle:")
print(f" Faces with roll data: {roll_stats['with_roll']}")
if roll_stats['with_roll'] > 0:
print(f" Min: {roll_stats['min_roll']:.1f}°")
print(f" Max: {roll_stats['max_roll']:.1f}°")
print(f" Avg: {roll_stats['avg_roll']:.1f}°")
# Sample faces with different poses
print("\n" + "=" * 60)
print("SAMPLE FACES BY POSE")
print("=" * 60)
for row in pose_modes[:10]: # Top 10 pose modes
pose_mode = row['pose_mode']
cursor.execute("""
SELECT id, photo_id, pose_mode, yaw_angle, pitch_angle, roll_angle
FROM faces
WHERE pose_mode = ?
LIMIT 3
""", (pose_mode,))
samples = cursor.fetchall()
print(f"\n{pose_mode}:")
for sample in samples:
yaw_str = f"{sample['yaw_angle']:.1f}°" if sample['yaw_angle'] is not None else "N/A"
pitch_str = f"{sample['pitch_angle']:.1f}°" if sample['pitch_angle'] is not None else "N/A"
roll_str = f"{sample['roll_angle']:.1f}°" if sample['roll_angle'] is not None else "N/A"
print(f" Face ID {sample['id']}: "
f"yaw={yaw_str} "
f"pitch={pitch_str} "
f"roll={roll_str}")
conn.close()
except sqlite3.Error as e:
print(f"❌ Database error: {e}")
except Exception as e:
print(f"❌ Error: {e}")
def check_web_database() -> None:
"""Check if web database exists and analyze it"""
# Common web database locations
web_db_paths = [
"data/punimtag.db", # Default web database
"data/web_photos.db",
"data/photos_web.db",
"web_photos.db",
]
for db_path in web_db_paths:
if os.path.exists(db_path):
print(f"\n{'='*60}")
print(f"WEB DATABASE: {db_path}")
print(f"{'='*60}\n")
analyze_poses(db_path)
break
if __name__ == "__main__":
# Check desktop database
desktop_db = DEFAULT_DB_PATH
if os.path.exists(desktop_db):
analyze_poses(desktop_db)
# Check web database
check_web_database()
# If no database found, list what we tried
if not os.path.exists(desktop_db):
print(f"❌ Desktop database not found: {desktop_db}")
print("\nTrying to find database files...")
for root, dirs, files in os.walk("data"):
for file in files:
if file.endswith(('.db', '.sqlite', '.sqlite3')):
print(f" Found: {os.path.join(root, file)}")

View File

@ -1,135 +0,0 @@
#!/usr/bin/env python3
"""
Check what tables exist in the punimtag main database and their record counts.
"""
from __future__ import annotations
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import create_engine, inspect, text
from backend.db.session import get_database_url
def check_database_tables() -> None:
"""Check all tables in the database and their record counts."""
database_url = get_database_url()
print("=" * 80)
print("PUNIMTAG MAIN DATABASE - TABLE INFORMATION")
print("=" * 80)
print(f"\nDatabase URL: {database_url.replace('://', '://****') if '://' in database_url else database_url}\n")
# Create engine
connect_args = {}
if database_url.startswith("sqlite"):
connect_args = {"check_same_thread": False}
engine = create_engine(database_url, connect_args=connect_args)
try:
# Get inspector to list tables
inspector = inspect(engine)
all_tables = inspector.get_table_names()
if not all_tables:
print("❌ No tables found in database.")
return
print(f"Found {len(all_tables)} tables:\n")
# Expected tables from models
expected_tables = {
"photos",
"people",
"faces",
"person_encodings",
"tags",
"phototaglinkage",
"photo_favorites",
"users",
"photo_person_linkage",
"role_permissions",
}
# Connect and query each table
with engine.connect() as conn:
print(f"{'Table Name':<30} {'Record Count':<15} {'Status'}")
print("-" * 80)
for table_name in sorted(all_tables):
# Skip SQLite system tables
if table_name.startswith("sqlite_"):
continue
try:
# Get record count
if database_url.startswith("sqlite"):
result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
else:
result = conn.execute(text(f'SELECT COUNT(*) FROM "{table_name}"'))
count = result.scalar()
# Check if it's an expected table
status = "✅ Expected" if table_name in expected_tables else "⚠️ Unexpected"
print(f"{table_name:<30} {count:<15} {status}")
except Exception as e:
print(f"{table_name:<30} {'ERROR':<15}{str(e)[:50]}")
print("-" * 80)
# Summary
print("\n📊 Summary:")
with engine.connect() as conn:
total_records = 0
tables_with_data = 0
for table_name in sorted(all_tables):
if table_name.startswith("sqlite_"):
continue
try:
if database_url.startswith("sqlite"):
result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
else:
result = conn.execute(text(f'SELECT COUNT(*) FROM "{table_name}"'))
count = result.scalar()
total_records += count
if count > 0:
tables_with_data += 1
except:
pass
print(f" Total tables: {len([t for t in all_tables if not t.startswith('sqlite_')])}")
print(f" Tables with records: {tables_with_data}")
print(f" Total records across all tables: {total_records:,}")
# Check for missing expected tables
missing_tables = expected_tables - set(all_tables)
if missing_tables:
print(f"\n⚠️ Missing expected tables: {', '.join(sorted(missing_tables))}")
# Check for unexpected tables
unexpected_tables = set(all_tables) - expected_tables - {"alembic_version"}
unexpected_tables = {t for t in unexpected_tables if not t.startswith("sqlite_")}
if unexpected_tables:
print(f"\n Additional tables found: {', '.join(sorted(unexpected_tables))}")
except Exception as e:
print(f"❌ Error connecting to database: {e}")
import traceback
traceback.print_exc()
return
print("\n" + "=" * 80)
if __name__ == "__main__":
check_database_tables()

View File

@ -1,99 +0,0 @@
#!/usr/bin/env python3
"""Check all identified faces for pose information (web database)"""
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from backend.db.models import Face, Person, Photo
from backend.db.session import get_database_url
def check_identified_faces():
"""Check all identified faces for pose information"""
db_url = get_database_url()
print(f"Connecting to database: {db_url}")
engine = create_engine(db_url)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Get all identified faces with pose information
faces = (
session.query(Face, Person, Photo)
.join(Person, Face.person_id == Person.id)
.join(Photo, Face.photo_id == Photo.id)
.filter(Face.person_id.isnot(None))
.order_by(Person.id, Face.id)
.all()
)
if not faces:
print("No identified faces found.")
return
print(f"\n{'='*80}")
print(f"Found {len(faces)} identified faces")
print(f"{'='*80}\n")
# Group by person
by_person = {}
for face, person, photo in faces:
person_id = person.id
if person_id not in by_person:
by_person[person_id] = []
by_person[person_id].append((face, person, photo))
# Print summary
print("SUMMARY BY PERSON:")
print("-" * 80)
for person_id, person_faces in by_person.items():
person = person_faces[0][1]
person_name = f"{person.first_name} {person.last_name}"
pose_modes = [f[0].pose_mode for f in person_faces]
frontal_count = sum(1 for p in pose_modes if p == 'frontal')
profile_count = sum(1 for p in pose_modes if 'profile' in p)
other_count = len(pose_modes) - frontal_count - profile_count
print(f"\nPerson {person_id}: {person_name}")
print(f" Total faces: {len(person_faces)}")
print(f" Frontal: {frontal_count}")
print(f" Profile: {profile_count}")
print(f" Other: {other_count}")
print(f" Pose modes: {set(pose_modes)}")
# Print detailed information
print(f"\n{'='*80}")
print("DETAILED FACE INFORMATION:")
print(f"{'='*80}\n")
for face, person, photo in faces:
person_name = f"{person.first_name} {person.last_name}"
print(f"Face ID: {face.id}")
print(f" Person: {person_name} (ID: {face.person_id})")
print(f" Photo: {photo.filename}")
print(f" Pose Mode: {face.pose_mode}")
print(f" Yaw: {face.yaw_angle:.2f}°" if face.yaw_angle is not None else " Yaw: None")
print(f" Pitch: {face.pitch_angle:.2f}°" if face.pitch_angle is not None else " Pitch: None")
print(f" Roll: {face.roll_angle:.2f}°" if face.roll_angle is not None else " Roll: None")
print(f" Confidence: {face.face_confidence:.3f}")
print(f" Quality: {face.quality_score:.3f}")
print(f" Location: {face.location}")
print()
finally:
session.close()
if __name__ == "__main__":
try:
check_identified_faces()
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -1,188 +0,0 @@
#!/usr/bin/env python3
"""Check two identified faces and analyze why their pose modes are wrong"""
import sys
import os
import json
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from backend.db.models import Face, Person, Photo
from backend.db.session import get_database_url
from src.utils.pose_detection import PoseDetector
def check_two_faces(face_id1: int = None, face_id2: int = None):
"""Check two identified faces and analyze their pose modes"""
db_url = get_database_url()
print(f"Connecting to database: {db_url}")
engine = create_engine(db_url)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Get all identified faces
query = (
session.query(Face, Person, Photo)
.join(Person, Face.person_id == Person.id)
.join(Photo, Face.photo_id == Photo.id)
.filter(Face.person_id.isnot(None))
.order_by(Face.id)
)
if face_id1:
query = query.filter(Face.id == face_id1)
elif face_id2:
query = query.filter(Face.id == face_id2)
faces = query.limit(2).all()
if len(faces) < 2:
print(f"Found {len(faces)} identified face(s). Need 2 faces to compare.")
if len(faces) == 0:
print("No identified faces found.")
return
print("\nShowing available identified faces:")
all_faces = (
session.query(Face, Person, Photo)
.join(Person, Face.person_id == Person.id)
.join(Photo, Face.photo_id == Photo.id)
.filter(Face.person_id.isnot(None))
.order_by(Face.id)
.limit(10)
.all()
)
for face, person, photo in all_faces:
print(f" Face ID: {face.id}, Person: {person.first_name} {person.last_name}, Photo: {photo.filename}, Pose: {face.pose_mode}")
return
print(f"\n{'='*80}")
print("ANALYZING TWO IDENTIFIED FACES")
print(f"{'='*80}\n")
for idx, (face, person, photo) in enumerate(faces, 1):
person_name = f"{person.first_name} {person.last_name}"
print(f"{'='*80}")
print(f"FACE {idx}: ID {face.id}")
print(f"{'='*80}")
print(f"Person: {person_name} (ID: {face.person_id})")
print(f"Photo: {photo.filename}")
print(f"Current Pose Mode: {face.pose_mode}")
print(f"Yaw: {face.yaw_angle:.2f}°" if face.yaw_angle is not None else "Yaw: None")
print(f"Pitch: {face.pitch_angle:.2f}°" if face.pitch_angle is not None else "Pitch: None")
print(f"Roll: {face.roll_angle:.2f}°" if face.roll_angle is not None else "Roll: None")
print(f"Face Width: {face.face_width if hasattr(face, 'face_width') else 'N/A'}")
print(f"Confidence: {face.face_confidence:.3f}")
print(f"Quality: {face.quality_score:.3f}")
print(f"Location: {face.location}")
# Parse landmarks if available
landmarks = None
if face.landmarks:
try:
landmarks = json.loads(face.landmarks)
print(f"\nLandmarks:")
for key, value in landmarks.items():
print(f" {key}: {value}")
except json.JSONDecodeError:
print(f"\nLandmarks: (invalid JSON)")
# Recalculate pose mode using current logic
print(f"\n{''*80}")
print("RECALCULATING POSE MODE:")
print(f"{''*80}")
# Calculate face width from landmarks if available
face_width = None
if landmarks:
face_width = PoseDetector.calculate_face_width_from_landmarks(landmarks)
print(f"Calculated face_width from landmarks: {face_width}")
# Recalculate pose mode
recalculated_pose = PoseDetector.classify_pose_mode(
face.yaw_angle,
face.pitch_angle,
face.roll_angle,
face_width,
landmarks
)
print(f"Recalculated Pose Mode: {recalculated_pose}")
if recalculated_pose != face.pose_mode:
print(f"⚠️ MISMATCH! Current: '{face.pose_mode}' vs Recalculated: '{recalculated_pose}'")
# Analyze why
print(f"\nAnalysis:")
if face.yaw_angle is None:
print(f" - Yaw is None")
if landmarks:
left_eye = landmarks.get('left_eye')
right_eye = landmarks.get('right_eye')
nose = landmarks.get('nose')
missing = []
if not left_eye:
missing.append('left_eye')
if not right_eye:
missing.append('right_eye')
if not nose:
missing.append('nose')
if missing:
print(f" - Missing landmarks: {', '.join(missing)}")
print(f" - Should be classified as profile (missing landmarks)")
else:
print(f" - All landmarks present")
if face_width:
print(f" - Face width: {face_width}px")
if face_width < 25.0:
print(f" - Face width < 25px, should be profile")
else:
print(f" - Face width >= 25px, should be frontal")
else:
print(f" - No landmarks available")
else:
abs_yaw = abs(face.yaw_angle)
print(f" - Yaw angle: {face.yaw_angle:.2f}° (abs: {abs_yaw:.2f}°)")
if abs_yaw >= 30.0:
expected = "profile_left" if face.yaw_angle < 0 else "profile_right"
print(f" - |yaw| >= 30°, should be '{expected}'")
else:
print(f" - |yaw| < 30°, should be 'frontal'")
else:
print(f"✓ Pose mode matches recalculated value")
print()
finally:
session.close()
if __name__ == "__main__":
face_id1 = None
face_id2 = None
if len(sys.argv) > 1:
try:
face_id1 = int(sys.argv[1])
except ValueError:
print(f"Invalid face ID: {sys.argv[1]}")
sys.exit(1)
if len(sys.argv) > 2:
try:
face_id2 = int(sys.argv[2])
except ValueError:
print(f"Invalid face ID: {sys.argv[2]}")
sys.exit(1)
try:
check_two_faces(face_id1, face_id2)
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -1,80 +0,0 @@
#!/usr/bin/env python3
"""
Check yaw angles in database to see why profile faces aren't being detected
"""
import sqlite3
import os
db_path = "data/punimtag.db"
if not os.path.exists(db_path):
print(f"❌ Database not found: {db_path}")
exit(1)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get all faces with yaw data
cursor.execute("""
SELECT id, pose_mode, yaw_angle, pitch_angle, roll_angle
FROM faces
WHERE yaw_angle IS NOT NULL
ORDER BY ABS(yaw_angle) DESC
""")
faces = cursor.fetchall()
print(f"Found {len(faces)} faces with yaw data\n")
print("=" * 80)
print("YAW ANGLE ANALYSIS")
print("=" * 80)
print(f"\n{'Face ID':<10} {'Pose Mode':<25} {'Yaw':<10} {'Should be Profile?'}")
print("-" * 80)
PROFILE_THRESHOLD = 30.0 # From pose_detection.py
profile_count = 0
for face in faces:
yaw = face['yaw_angle']
pose_mode = face['pose_mode']
is_profile = abs(yaw) >= PROFILE_THRESHOLD
should_be_profile = "YES" if is_profile else "NO"
if is_profile:
profile_count += 1
print(f"{face['id']:<10} {pose_mode:<25} {yaw:>8.2f}° {should_be_profile}")
print("\n" + "=" * 80)
print(f"Total faces with yaw data: {len(faces)}")
print(f"Faces with |yaw| >= {PROFILE_THRESHOLD}° (should be profile): {profile_count}")
print(f"Faces currently classified as profile: {cursor.execute('SELECT COUNT(*) FROM faces WHERE pose_mode LIKE \"profile%\"').fetchone()[0]}")
print("=" * 80)
# Check yaw distribution
print("\n" + "=" * 80)
print("YAW ANGLE DISTRIBUTION")
print("=" * 80)
cursor.execute("""
SELECT
CASE
WHEN ABS(yaw_angle) < 30 THEN 'frontal (< 30°)'
WHEN ABS(yaw_angle) >= 30 AND ABS(yaw_angle) < 60 THEN 'profile (30-60°)'
WHEN ABS(yaw_angle) >= 60 THEN 'extreme profile (>= 60°)'
ELSE 'unknown'
END as category,
COUNT(*) as count
FROM faces
WHERE yaw_angle IS NOT NULL
GROUP BY category
ORDER BY count DESC
""")
distribution = cursor.fetchall()
for row in distribution:
print(f" {row['category']}: {row['count']} faces")
conn.close()

View File

@ -1,253 +0,0 @@
#!/usr/bin/env python3
"""Debug pose classification for identified faces
This script helps identify why poses might be incorrectly classified.
It shows detailed pose information and can recalculate poses from photos.
"""
import sys
import os
import json
from typing import Optional, List, Tuple
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from backend.db.models import Face, Person, Photo
from backend.db.session import get_database_url
from src.utils.pose_detection import PoseDetector
def analyze_pose_classification(
face_id: Optional[int] = None,
person_id: Optional[int] = None,
recalculate: bool = False,
) -> None:
"""Analyze pose classification for identified faces.
Args:
face_id: Specific face ID to check (None = all identified faces)
person_id: Specific person ID to check (None = all persons)
recalculate: If True, recalculate pose from photo to verify classification
"""
db_url = get_database_url()
print(f"Connecting to database: {db_url}")
engine = create_engine(db_url)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Build query
query = (
session.query(Face, Person, Photo)
.join(Person, Face.person_id == Person.id)
.join(Photo, Face.photo_id == Photo.id)
.filter(Face.person_id.isnot(None))
)
if face_id:
query = query.filter(Face.id == face_id)
if person_id:
query = query.filter(Person.id == person_id)
faces = query.order_by(Person.id, Face.id).all()
if not faces:
print("No identified faces found matching criteria.")
return
print(f"\n{'='*80}")
print(f"Found {len(faces)} identified face(s)")
print(f"{'='*80}\n")
pose_detector = None
if recalculate:
try:
pose_detector = PoseDetector()
print("Pose detector initialized for recalculation\n")
except Exception as e:
print(f"Warning: Could not initialize pose detector: {e}")
print("Skipping recalculation\n")
recalculate = False
for face, person, photo in faces:
person_name = f"{person.first_name} {person.last_name}"
print(f"{'='*80}")
print(f"Face ID: {face.id}")
print(f"Person: {person_name} (ID: {person.id})")
print(f"Photo: {photo.filename}")
print(f"Photo Path: {photo.path}")
print(f"{'-'*80}")
# Current stored pose information
print("STORED POSE INFORMATION:")
print(f" Pose Mode: {face.pose_mode}")
print(f" Yaw Angle: {face.yaw_angle:.2f}°" if face.yaw_angle is not None else " Yaw Angle: None")
print(f" Pitch Angle: {face.pitch_angle:.2f}°" if face.pitch_angle is not None else " Pitch Angle: None")
print(f" Roll Angle: {face.roll_angle:.2f}°" if face.roll_angle is not None else " Roll Angle: None")
print(f" Face Confidence: {face.face_confidence:.3f}")
print(f" Quality Score: {face.quality_score:.3f}")
# Parse location
try:
location = json.loads(face.location) if isinstance(face.location, str) else face.location
print(f" Location: {location}")
except:
print(f" Location: {face.location}")
# Analyze classification
print(f"\nPOSE CLASSIFICATION ANALYSIS:")
yaw = face.yaw_angle
pitch = face.pitch_angle
roll = face.roll_angle
if yaw is not None:
abs_yaw = abs(yaw)
print(f" Yaw: {yaw:.2f}° (absolute: {abs_yaw:.2f}°)")
if abs_yaw < 30.0:
expected_mode = "frontal"
print(f" → Expected: {expected_mode} (yaw < 30°)")
elif yaw <= -30.0:
expected_mode = "profile_left"
print(f" → Expected: {expected_mode} (yaw <= -30°, face turned left)")
elif yaw >= 30.0:
expected_mode = "profile_right"
print(f" → Expected: {expected_mode} (yaw >= 30°, face turned right)")
else:
expected_mode = "unknown"
print(f" → Expected: {expected_mode} (edge case)")
if face.pose_mode != expected_mode:
print(f" ⚠️ MISMATCH: Stored pose_mode='{face.pose_mode}' but expected '{expected_mode}'")
else:
print(f" ✓ Classification matches expected mode")
else:
print(f" Yaw: None (cannot determine pose from yaw)")
print(f" ⚠️ Warning: Yaw angle is missing, pose classification may be unreliable")
# Recalculate if requested
if recalculate and pose_detector and photo.path and os.path.exists(photo.path):
print(f"\nRECALCULATING POSE FROM PHOTO:")
try:
pose_faces = pose_detector.detect_pose_faces(photo.path)
if not pose_faces:
print(" No faces detected in photo")
else:
# Try to match face by location
face_location = location if isinstance(location, dict) else json.loads(face.location) if isinstance(face.location, str) else {}
face_x = face_location.get('x', 0)
face_y = face_location.get('y', 0)
face_w = face_location.get('w', 0)
face_h = face_location.get('h', 0)
face_center_x = face_x + face_w / 2
face_center_y = face_y + face_h / 2
best_match = None
best_distance = float('inf')
for pose_face in pose_faces:
pose_area = pose_face.get('facial_area', {})
if isinstance(pose_area, dict):
pose_x = pose_area.get('x', 0)
pose_y = pose_area.get('y', 0)
pose_w = pose_area.get('w', 0)
pose_h = pose_area.get('h', 0)
pose_center_x = pose_x + pose_w / 2
pose_center_y = pose_y + pose_h / 2
# Calculate distance between centers
distance = ((face_center_x - pose_center_x) ** 2 +
(face_center_y - pose_center_y) ** 2) ** 0.5
if distance < best_distance:
best_distance = distance
best_match = pose_face
if best_match:
recalc_yaw = best_match.get('yaw_angle')
recalc_pitch = best_match.get('pitch_angle')
recalc_roll = best_match.get('roll_angle')
recalc_face_width = best_match.get('face_width')
recalc_pose_mode = best_match.get('pose_mode')
print(f" Recalculated Yaw: {recalc_yaw:.2f}°" if recalc_yaw is not None else " Recalculated Yaw: None")
print(f" Recalculated Pitch: {recalc_pitch:.2f}°" if recalc_pitch is not None else " Recalculated Pitch: None")
print(f" Recalculated Roll: {recalc_roll:.2f}°" if recalc_roll is not None else " Recalculated Roll: None")
print(f" Face Width: {recalc_face_width:.2f}px" if recalc_face_width is not None else " Face Width: None")
print(f" Recalculated Pose Mode: {recalc_pose_mode}")
# Compare
if recalc_pose_mode != face.pose_mode:
print(f" ⚠️ MISMATCH: Stored='{face.pose_mode}' vs Recalculated='{recalc_pose_mode}'")
if recalc_yaw is not None and face.yaw_angle is not None:
# Convert Decimal to float for comparison
stored_yaw = float(face.yaw_angle)
yaw_diff = abs(recalc_yaw - stored_yaw)
if yaw_diff > 1.0: # More than 1 degree difference
print(f" ⚠️ Yaw difference: {yaw_diff:.2f}°")
else:
print(" Could not match face location to detected faces")
except Exception as e:
print(f" Error recalculating: {e}")
import traceback
traceback.print_exc()
print()
print(f"{'='*80}")
print("Analysis complete")
print(f"{'='*80}\n")
finally:
session.close()
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(
description="Debug pose classification for identified faces"
)
parser.add_argument(
"--face-id",
type=int,
help="Specific face ID to check"
)
parser.add_argument(
"--person-id",
type=int,
help="Specific person ID to check"
)
parser.add_argument(
"--recalculate",
action="store_true",
help="Recalculate pose from photo to verify classification"
)
args = parser.parse_args()
try:
analyze_pose_classification(
face_id=args.face_id,
person_id=args.person_id,
recalculate=args.recalculate,
)
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,160 +0,0 @@
#!/usr/bin/env python3
"""
Diagnose frontend issues:
1. Check if backend API is running and accessible
2. Check database connection
3. Test search endpoint
"""
import os
import sys
import requests
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from backend.db.session import get_database_url, engine
from sqlalchemy import text
def check_backend_api():
"""Check if backend API is running."""
print("=" * 80)
print("BACKEND API CHECK")
print("=" * 80)
try:
# Check if docs endpoint is accessible
response = requests.get("http://127.0.0.1:8000/docs", timeout=5)
if response.status_code == 200:
print("✅ Backend API is running (docs accessible)")
else:
print(f"⚠️ Backend API returned status {response.status_code}")
except requests.exceptions.ConnectionError:
print("❌ Backend API is NOT running or not accessible")
print(" Start it with: cd backend && uvicorn app:app --reload")
return False
except Exception as e:
print(f"❌ Error checking backend API: {e}")
return False
# Check search endpoint (requires auth)
try:
response = requests.get(
"http://127.0.0.1:8000/api/v1/photos",
params={"search_type": "processed", "page": 1, "page_size": 1},
timeout=5
)
if response.status_code == 200:
print("✅ Search endpoint is accessible (no auth required for this query)")
elif response.status_code == 401:
print("⚠️ Search endpoint requires authentication")
print(" User needs to log in through admin frontend")
else:
print(f"⚠️ Search endpoint returned status {response.status_code}")
except Exception as e:
print(f"⚠️ Error checking search endpoint: {e}")
return True
def check_database():
"""Check database connection and photo count."""
print("\n" + "=" * 80)
print("DATABASE CHECK")
print("=" * 80)
db_url = get_database_url()
print(f"Database URL: {db_url.replace('://', '://****') if '://' in db_url else db_url}")
try:
with engine.connect() as conn:
# Check photo count
result = conn.execute(text("SELECT COUNT(*) FROM photos WHERE processed = 1"))
count = result.scalar()
print(f"✅ Database connection successful")
print(f" Processed photos: {count}")
if count == 0:
print("⚠️ No processed photos found in database")
print(" This explains why viewer frontend shows 0 photos")
else:
print(f" Database has {count} processed photos")
except Exception as e:
print(f"❌ Database connection error: {e}")
return False
return True
def check_viewer_frontend_config():
"""Check viewer frontend configuration."""
print("\n" + "=" * 80)
print("VIEWER FRONTEND CONFIGURATION")
print("=" * 80)
viewer_env = project_root / "viewer-frontend" / ".env"
if not viewer_env.exists():
print("❌ viewer-frontend/.env file not found")
return False
with open(viewer_env) as f:
content = f.read()
if "DATABASE_URL" in content:
# Extract DATABASE_URL
for line in content.split("\n"):
if line.startswith("DATABASE_URL="):
db_url = line.split("=", 1)[1].strip().strip('"')
print(f"Viewer frontend DATABASE_URL: {db_url.replace('://', '://****') if '://' in db_url else db_url}")
# Check if it matches actual database
actual_db = get_database_url()
if "postgresql" in db_url and "sqlite" in actual_db:
print("❌ MISMATCH: Viewer frontend configured for PostgreSQL")
print(" but actual database is SQLite")
print("\n SOLUTION OPTIONS:")
print(" 1. Change viewer-frontend/.env DATABASE_URL to SQLite:")
print(f' DATABASE_URL="file:../data/punimtag.db"')
print(" 2. Update Prisma schema to use SQLite provider")
print(" 3. Migrate database to PostgreSQL")
return False
elif "sqlite" in db_url and "sqlite" in actual_db:
print("✅ Viewer frontend configured for SQLite (matches actual database)")
else:
print("⚠️ Database type mismatch or unclear")
else:
print("⚠️ DATABASE_URL not found in viewer-frontend/.env")
return True
def main():
print("\n🔍 DIAGNOSING FRONTEND ISSUES\n")
backend_ok = check_backend_api()
db_ok = check_database()
viewer_config_ok = check_viewer_frontend_config()
print("\n" + "=" * 80)
print("SUMMARY")
print("=" * 80)
if not backend_ok:
print("❌ Backend API is not running - admin frontend search will fail")
else:
print("✅ Backend API is running")
if not db_ok:
print("❌ Database connection issue")
else:
print("✅ Database connection OK")
if not viewer_config_ok:
print("❌ Viewer frontend configuration issue - needs to be fixed")
else:
print("✅ Viewer frontend configuration OK")
print("\n" + "=" * 80)
if __name__ == "__main__":
main()

View File

@ -1,78 +0,0 @@
import sqlite3
import sys
import os
def drop_all_tables(db_path: str) -> None:
if not os.path.exists(db_path):
print(f"Database not found: {db_path}")
return
conn = sqlite3.connect(db_path)
try:
conn.isolation_level = None # autocommit mode for DDL
cur = conn.cursor()
# Disable foreign key enforcement to allow dropping in any order
cur.execute("PRAGMA foreign_keys = OFF;")
# Collect tables and views
cur.execute("SELECT name, type FROM sqlite_master WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%';")
objects = cur.fetchall()
print(f"DB: {db_path}")
if not objects:
print("No user tables or views found.")
return
# Drop views first, then tables
views = [name for name, t in objects if t == 'view']
tables = [name for name, t in objects if t == 'table']
print(f"Found {len(tables)} tables and {len(views)} views.")
for v in views:
print(f"Dropping view: {v}")
cur.execute(f"DROP VIEW IF EXISTS \"{v}\";")
for t in tables:
print(f"Dropping table: {t}")
cur.execute(f"DROP TABLE IF EXISTS \"{t}\";")
# Vacuum to clean up
cur.execute("VACUUM;")
print("Done.")
finally:
conn.close()
def list_tables(db_path: str) -> None:
if not os.path.exists(db_path):
print(f"Database not found: {db_path}")
return
conn = sqlite3.connect(db_path)
try:
cur = conn.cursor()
cur.execute("SELECT name, type FROM sqlite_master WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' ORDER BY type, name;")
objects = cur.fetchall()
print(f"DB: {db_path}")
if not objects:
print("No user tables or views found.")
return
for name, t in objects:
print(f"- {t}: {name}")
finally:
conn.close()
if __name__ == "__main__":
# Usage: python drop_all_tables.py <db1> [<db2> ...]
paths = sys.argv[1:]
if not paths:
base = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
paths = [os.path.join(base, 'photos.db'), os.path.join(base, 'data', 'photos.db')]
for p in paths:
list_tables(p)
for p in paths:
drop_all_tables(p)
for p in paths:
list_tables(p)

View File

@ -1,59 +0,0 @@
#!/usr/bin/env python3
"""Drop all tables from the web database to start fresh."""
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import inspect
from backend.db.session import engine, get_database_url
from backend.db.models import Base
# Ordered list ensures foreign-key dependents drop first
TARGET_TABLES = [
"photo_favorites",
"phototaglinkage",
"person_encodings",
"faces",
"tags",
"photos",
"people",
]
def drop_all_tables():
"""Drop all tables from the database."""
db_url = get_database_url()
print(f"Connecting to database: {db_url}")
inspector = inspect(engine)
existing_tables = set(inspector.get_table_names())
print("\nDropping selected tables...")
for table_name in TARGET_TABLES:
if table_name not in Base.metadata.tables:
print(f" ⚠️ Table '{table_name}' not found in metadata, skipping.")
continue
if table_name not in existing_tables:
print(f" Table '{table_name}' does not exist in database, skipping.")
continue
table = Base.metadata.tables[table_name]
print(f" 🗑️ Dropping '{table_name}'...")
table.drop(bind=engine, checkfirst=True)
print("✅ Selected tables dropped successfully!")
print("\nYou can now recreate tables using:")
print(" python scripts/recreate_tables_web.py")
if __name__ == "__main__":
try:
drop_all_tables()
except Exception as e:
print(f"❌ Error dropping tables: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -14,3 +14,4 @@ else
fi

View File

@ -1,53 +0,0 @@
#!/usr/bin/env python3
"""Fix admin user password in database."""
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from backend.db.session import get_db
from backend.db.models import User
from backend.utils.password import hash_password, verify_password
def fix_admin_password():
"""Set admin user password to 'admin'."""
db = next(get_db())
try:
admin_user = db.query(User).filter(User.username == 'admin').first()
if not admin_user:
print("❌ Admin user not found in database")
return False
# Set password to 'admin'
new_hash = hash_password('admin')
admin_user.password_hash = new_hash
admin_user.is_active = True
admin_user.is_admin = True
db.commit()
# Verify it works
if verify_password('admin', new_hash):
print("✅ Admin password updated successfully")
print(" Username: admin")
print(" Password: admin")
return True
else:
print("❌ Password verification failed after update")
return False
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
db.rollback()
return False
finally:
db.close()
if __name__ == "__main__":
success = fix_admin_password()
sys.exit(0 if success else 1)

View File

@ -1,115 +0,0 @@
#!/usr/bin/env python3
"""Grant DELETE permission on auth database users table.
This script grants DELETE permission to the database user specified in DATABASE_URL_AUTH.
It requires superuser access (postgres user) to grant permissions.
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
from urllib.parse import urlparse
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
# Load environment variables
env_path = Path(__file__).parent.parent.parent / ".env"
load_dotenv(dotenv_path=env_path)
def parse_database_url(db_url: str) -> dict:
"""Parse database URL into components."""
# Handle postgresql+psycopg2:// format
if db_url.startswith("postgresql+psycopg2://"):
db_url = db_url.replace("postgresql+psycopg2://", "postgresql://")
parsed = urlparse(db_url)
return {
"user": parsed.username,
"password": parsed.password,
"host": parsed.hostname or "localhost",
"port": parsed.port or 5432,
"database": parsed.path.lstrip("/"),
}
def grant_delete_permission() -> None:
"""Grant DELETE permission on users and pending_photos tables in auth database."""
auth_db_url = os.getenv("DATABASE_URL_AUTH")
if not auth_db_url:
print("❌ Error: DATABASE_URL_AUTH environment variable not set")
sys.exit(1)
if not auth_db_url.startswith("postgresql"):
print(" Auth database is not PostgreSQL. No permissions to grant.")
return
db_info = parse_database_url(auth_db_url)
db_user = db_info["user"]
db_name = db_info["database"]
print(f"📋 Granting DELETE permission on auth database tables...")
print(f" Database: {db_name}")
print(f" User: {db_user}")
# Tables that need DELETE permission
tables = ["users", "pending_photos", "pending_identifications", "inappropriate_photo_reports"]
# Connect as postgres superuser to grant permissions
# Try to connect as postgres user (superuser)
try:
# Try to get postgres password from environment or use peer authentication
postgres_url = f"postgresql://postgres@{db_info['host']}:{db_info['port']}/{db_name}"
engine = create_engine(postgres_url)
with engine.connect() as conn:
for table in tables:
try:
# Grant DELETE permission
conn.execute(text(f"""
GRANT DELETE ON TABLE {table} TO {db_user}
"""))
print(f" ✅ Granted DELETE on {table}")
except Exception as e:
# Table might not exist, skip it
print(f" ⚠️ Could not grant DELETE on {table}: {e}")
conn.commit()
print(f"✅ Successfully granted DELETE permissions to user '{db_user}'")
return
except Exception as e:
# If connecting as postgres fails, try with the same user (might have grant privileges)
print(f"⚠️ Could not connect as postgres user: {e}")
print(f" Trying with current database user...")
try:
engine = create_engine(auth_db_url)
with engine.connect() as conn:
for table in tables:
try:
# Try to grant permission
conn.execute(text(f"""
GRANT DELETE ON TABLE {table} TO {db_user}
"""))
print(f" ✅ Granted DELETE on {table}")
except Exception as e2:
print(f" ⚠️ Could not grant DELETE on {table}: {e2}")
conn.commit()
print(f"✅ Successfully granted DELETE permissions to user '{db_user}'")
return
except Exception as e2:
print(f"❌ Failed to grant permission: {e2}")
print(f"\n💡 To grant permission manually, run as postgres superuser:")
for table in tables:
print(f" sudo -u postgres psql -d {db_name} -c \"GRANT DELETE ON TABLE {table} TO {db_user};\"")
sys.exit(1)
if __name__ == "__main__":
grant_delete_permission()

View File

@ -1,264 +0,0 @@
#!/usr/bin/env python3
"""
Migrate data from SQLite to PostgreSQL database.
This script:
1. Creates PostgreSQL databases if they don't exist
2. Creates all tables in PostgreSQL
3. Migrates all data from SQLite to PostgreSQL
"""
from __future__ import annotations
import sys
import os
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.orm import sessionmaker
from backend.db.models import Base
from backend.db.session import get_database_url
import sqlite3
def create_postgresql_databases():
"""Create PostgreSQL databases if they don't exist."""
from urllib.parse import urlparse
# Get database URLs from environment
db_url = os.getenv("DATABASE_URL", "postgresql+psycopg2://punimtag:punimtag_password@localhost:5432/punimtag")
auth_db_url = os.getenv("DATABASE_URL_AUTH", "postgresql://punimtag:punimtag_password@localhost:5432/punimtag_auth")
# Parse URLs
main_parsed = urlparse(db_url.replace("postgresql+psycopg2://", "postgresql://"))
auth_parsed = urlparse(auth_db_url.replace("postgresql+psycopg2://", "postgresql://"))
main_db_name = main_parsed.path.lstrip("/")
auth_db_name = auth_parsed.path.lstrip("/")
# Connect to postgres database to create other databases
postgres_url = f"postgresql://{main_parsed.username}:{main_parsed.password}@{main_parsed.hostname}:{main_parsed.port or 5432}/postgres"
try:
engine = create_engine(postgres_url)
with engine.connect() as conn:
# Check if databases exist
result = conn.execute(text("SELECT 1 FROM pg_database WHERE datname = :name"), {"name": main_db_name})
if not result.fetchone():
conn.execute(text("COMMIT")) # End any transaction
conn.execute(text(f'CREATE DATABASE "{main_db_name}"'))
print(f"✅ Created database: {main_db_name}")
else:
print(f"✅ Database already exists: {main_db_name}")
result = conn.execute(text("SELECT 1 FROM pg_database WHERE datname = :name"), {"name": auth_db_name})
if not result.fetchone():
conn.execute(text("COMMIT"))
conn.execute(text(f'CREATE DATABASE "{auth_db_name}"'))
print(f"✅ Created database: {auth_db_name}")
else:
print(f"✅ Database already exists: {auth_db_name}")
except Exception as e:
print(f"⚠️ Error creating databases: {e}")
print(" Make sure PostgreSQL is running and credentials are correct")
def migrate_data():
"""Migrate data from SQLite to PostgreSQL."""
print("=" * 80)
print("MIGRATING DATA FROM SQLITE TO POSTGRESQL")
print("=" * 80)
# Get database URLs
sqlite_url = "sqlite:///data/punimtag.db"
postgres_url = os.getenv("DATABASE_URL", "postgresql+psycopg2://punimtag:punimtag_password@localhost:5432/punimtag")
if not postgres_url.startswith("postgresql"):
print("❌ DATABASE_URL is not set to PostgreSQL")
print(" Set DATABASE_URL in .env file to PostgreSQL connection string")
return False
# Connect to both databases
sqlite_engine = create_engine(sqlite_url)
postgres_engine = create_engine(postgres_url)
# Create tables in PostgreSQL
print("\n📋 Creating tables in PostgreSQL...")
Base.metadata.create_all(bind=postgres_engine)
print("✅ Tables created")
# Get table names
inspector = inspect(sqlite_engine)
all_tables = inspector.get_table_names()
# Exclude system tables
all_tables = [t for t in all_tables if not t.startswith("sqlite_")]
# Define migration order (respecting foreign key constraints)
# Tables with no dependencies first, then dependent tables
migration_order = [
"alembic_version", # Migration tracking (optional)
"photos", # Base table
"people", # Base table
"tags", # Base table
"users", # Base table
"faces", # Depends on photos, people, users
"person_encodings", # Depends on people, faces
"phototaglinkage", # Depends on photos, tags
"photo_favorites", # Depends on photos
"photo_person_linkage", # Depends on photos, people, users
"role_permissions", # Base table
]
# Filter to only tables that exist
tables = [t for t in migration_order if t in all_tables]
# Add any remaining tables not in the order list
for t in all_tables:
if t not in tables:
tables.append(t)
print(f"\n📊 Found {len(tables)} tables to migrate: {', '.join(tables)}")
# Boolean columns mapping (SQLite stores as integer, PostgreSQL needs boolean)
boolean_columns = {
"photos": ["processed"],
"faces": ["is_primary_encoding", "excluded"],
"users": ["is_active", "is_admin", "password_change_required"],
"role_permissions": ["allowed"],
}
# Columns that might be missing in SQLite but required in PostgreSQL
# Map: table_name -> {column: default_value}
default_values = {
"photos": {"file_hash": "migrated"}, # file_hash might be missing in old SQLite
}
# Migrate each table
with sqlite_engine.connect() as sqlite_conn, postgres_engine.connect() as postgres_conn:
for table in tables:
print(f"\n🔄 Migrating table: {table}")
# Get row count
count_result = sqlite_conn.execute(text(f"SELECT COUNT(*) FROM {table}"))
row_count = count_result.scalar()
if row_count == 0:
print(f" ⏭️ Table is empty, skipping")
continue
print(f" 📦 {row_count} rows to migrate")
# Check if table already has data in PostgreSQL
try:
pg_count_result = postgres_conn.execute(text(f'SELECT COUNT(*) FROM "{table}"'))
pg_count = pg_count_result.scalar()
if pg_count > 0:
print(f" ⚠️ Table already has {pg_count} rows in PostgreSQL")
# Auto-truncate for non-interactive mode, or ask in interactive
print(f" 🗑️ Truncating existing data...")
postgres_conn.execute(text(f'TRUNCATE TABLE "{table}" CASCADE'))
postgres_conn.commit()
except Exception as e:
# Table might not exist yet, that's OK
pass
# Get column names and types from SQLite
columns_result = sqlite_conn.execute(text(f"PRAGMA table_info({table})"))
column_info = columns_result.fetchall()
sqlite_columns = [row[1] for row in column_info]
# Get PostgreSQL column names
pg_inspector = inspect(postgres_engine)
pg_columns_info = pg_inspector.get_columns(table)
pg_columns = [col['name'] for col in pg_columns_info]
# Use PostgreSQL columns (they're the source of truth)
columns = pg_columns
# Get boolean columns for this table
table_bool_cols = boolean_columns.get(table, [])
# Get default values for missing columns
table_defaults = default_values.get(table, {})
# Build SELECT statement for SQLite (only select columns that exist)
select_cols = [col for col in columns if col in sqlite_columns]
select_sql = f"SELECT {', '.join(select_cols)} FROM {table}"
# Fetch all data
data_result = sqlite_conn.execute(text(select_sql))
rows = data_result.fetchall()
# Insert into PostgreSQL
inserted = 0
for row in rows:
try:
# Build insert statement with boolean conversion
values = {}
for i, col in enumerate(select_cols):
val = row[i]
# Convert integer booleans to Python booleans for PostgreSQL
if col in table_bool_cols:
val = bool(val) if val is not None else None
values[col] = val
# Add default values for missing columns
for col, default_val in table_defaults.items():
if col not in values and col in columns:
values[col] = default_val
# Only insert columns we have values for (that exist in PostgreSQL)
insert_cols = [col for col in columns if col in values]
cols_str = ', '.join([f'"{c}"' for c in insert_cols])
placeholders = ', '.join([f':{c}' for c in insert_cols])
insert_sql = f'INSERT INTO "{table}" ({cols_str}) VALUES ({placeholders})'
postgres_conn.execute(text(insert_sql), values)
inserted += 1
if inserted % 100 == 0:
postgres_conn.commit()
print(f" ✅ Inserted {inserted}/{row_count} rows...", end='\r')
except Exception as e:
print(f"\n ❌ Error inserting row: {e}")
print(f" Row data: {dict(zip(columns, row))}")
postgres_conn.rollback()
break
postgres_conn.commit()
print(f" ✅ Migrated {inserted}/{row_count} rows from {table}")
print("\n" + "=" * 80)
print("✅ MIGRATION COMPLETE")
print("=" * 80)
print("\nNext steps:")
print("1. Update .env file to use PostgreSQL:")
print(" DATABASE_URL=postgresql+psycopg2://punimtag:punimtag_password@localhost:5432/punimtag")
print("2. Restart the backend API")
print("3. Restart the viewer frontend")
print("4. Verify data in viewer frontend")
return True
if __name__ == "__main__":
print("🔧 SQLite to PostgreSQL Migration Tool\n")
# Check if SQLite database exists
sqlite_path = project_root / "data" / "punimtag.db"
if not sqlite_path.exists():
print(f"❌ SQLite database not found: {sqlite_path}")
sys.exit(1)
print(f"✅ Found SQLite database: {sqlite_path}")
# Create PostgreSQL databases
print("\n📦 Creating PostgreSQL databases...")
create_postgresql_databases()
# Migrate data
print("\n")
migrate_data()

View File

@ -1,35 +0,0 @@
#!/usr/bin/env python3
"""Recreate all tables from models (fresh start)."""
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from backend.db.models import Base
from backend.db.session import engine, get_database_url
def recreate_tables():
"""Recreate all tables from models."""
db_url = get_database_url()
print(f"Connecting to database: {db_url}")
# Create all tables from models
print("\nCreating all tables from models...")
Base.metadata.create_all(bind=engine)
print("✅ All tables created successfully!")
print("✅ Database is now fresh and ready to use!")
if __name__ == "__main__":
try:
recreate_tables()
except Exception as e:
print(f"❌ Error recreating tables: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -1,129 +0,0 @@
#!/usr/bin/env python3
"""Show all tables and their structures in the database."""
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import inspect, text
from backend.db.session import engine, get_database_url
from backend.db.models import Base
def show_table_structure(table_name: str, inspector):
"""Show the structure of a table."""
print(f"\n{'='*80}")
print(f"Table: {table_name}")
print(f"{'='*80}")
# Get columns
columns = inspector.get_columns(table_name)
print("\nColumns:")
print(f"{'Name':<30} {'Type':<25} {'Nullable':<10} {'Primary Key':<12} {'Default'}")
print("-" * 100)
for col in columns:
col_type = str(col['type'])
nullable = "Yes" if col['nullable'] else "No"
primary_key = "Yes" if col.get('primary_key', False) else "No"
default = str(col.get('default', ''))[:30] if col.get('default') else ''
print(f"{col['name']:<30} {col_type:<25} {nullable:<10} {primary_key:<12} {default}")
# Get indexes
indexes = inspector.get_indexes(table_name)
if indexes:
print("\nIndexes:")
for idx in indexes:
unique = "UNIQUE" if idx.get('unique', False) else ""
columns_str = ", ".join(idx['column_names'])
print(f" {idx['name']}: {columns_str} {unique}")
# Get foreign keys
foreign_keys = inspector.get_foreign_keys(table_name)
if foreign_keys:
print("\nForeign Keys:")
for fk in foreign_keys:
constrained_cols = ", ".join(fk['constrained_columns'])
referred_table = fk['referred_table']
referred_cols = ", ".join(fk['referred_columns'])
print(f" {constrained_cols} -> {referred_table}({referred_cols})")
def show_all_tables():
"""Show all tables and their structures."""
db_url = get_database_url()
print(f"Database: {db_url}")
print(f"\n{'='*80}")
# Create inspector
inspector = inspect(engine)
# Get all table names
table_names = inspector.get_table_names()
if not table_names:
print("No tables found in database.")
print("\nTables should be created on web app startup.")
print("\nHere are the table structures from models:")
# Show from models instead
from backend.db.models import Photo, Person, Face, PersonEmbedding, Tag, PhotoTag
models = [
("photos", Photo),
("people", Person),
("faces", Face),
("person_embeddings", PersonEmbedding),
("tags", Tag),
("photo_tags", PhotoTag),
]
for table_name, model in models:
print(f"\n{'='*80}")
print(f"Table: {table_name}")
print(f"{'='*80}")
print("\nColumns:")
for col in model.__table__.columns:
nullable = "Yes" if col.nullable else "No"
primary_key = "Yes" if col.primary_key else "No"
default = str(col.default) if col.default else ''
print(f" {col.name:<30} {col.type!s:<25} Nullable: {nullable:<10} PK: {primary_key:<12} Default: {default}")
# Show indexes
indexes = model.__table__.indexes
if indexes:
print("\nIndexes:")
for idx in indexes:
unique = "UNIQUE" if idx.unique else ""
cols = ", ".join([c.name for c in idx.columns])
print(f" {idx.name}: {cols} {unique}")
# Show foreign keys
fks = [fk for fk in model.__table__.foreign_keys]
if fks:
print("\nForeign Keys:")
for fk in fks:
print(f" {fk.parent.name} -> {fk.column.table.name}({fk.column.name})")
return
print(f"\nFound {len(table_names)} table(s):")
for table_name in sorted(table_names):
print(f" - {table_name}")
# Show structure for each table
for table_name in sorted(table_names):
show_table_structure(table_name, inspector)
if __name__ == "__main__":
try:
show_all_tables()
except Exception as e:
print(f"❌ Error showing tables: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -1,115 +0,0 @@
#!/usr/bin/env python3
"""
Test if RetinaFace provides both eyes for profile faces or if one eye is missing
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
try:
from src.utils.pose_detection import PoseDetector, RETINAFACE_AVAILABLE
from pathlib import Path
if not RETINAFACE_AVAILABLE:
print("❌ RetinaFace not available")
exit(1)
detector = PoseDetector()
# Find test images
test_image_paths = ["demo_photos", "data/uploads"]
test_image = None
for path in test_image_paths:
if os.path.exists(path):
for ext in ['.jpg', '.jpeg', '.png']:
for img_file in Path(path).glob(f'*{ext}'):
test_image = str(img_file)
break
if test_image:
break
if not test_image:
print("❌ No test image found")
exit(1)
print(f"Testing with: {test_image}\n")
print("=" * 80)
print("EYE VISIBILITY ANALYSIS")
print("=" * 80)
faces = detector.detect_faces_with_landmarks(test_image)
if not faces:
print("❌ No faces detected")
exit(1)
print(f"Found {len(faces)} face(s)\n")
for face_key, face_data in faces.items():
landmarks = face_data.get('landmarks', {})
print(f"{face_key}:")
print(f" Landmarks available: {list(landmarks.keys())}")
left_eye = landmarks.get('left_eye')
right_eye = landmarks.get('right_eye')
nose = landmarks.get('nose')
print(f" Left eye: {left_eye}")
print(f" Right eye: {right_eye}")
print(f" Nose: {nose}")
# Check if both eyes are present
both_eyes_present = left_eye is not None and right_eye is not None
only_left_eye = left_eye is not None and right_eye is None
only_right_eye = left_eye is None and right_eye is not None
no_eyes = left_eye is None and right_eye is None
print(f"\n Eye visibility:")
print(f" Both eyes present: {both_eyes_present}")
print(f" Only left eye: {only_left_eye}")
print(f" Only right eye: {only_right_eye}")
print(f" No eyes: {no_eyes}")
# Calculate yaw if possible
yaw = detector.calculate_yaw_from_landmarks(landmarks)
print(f" Yaw angle: {yaw:.2f}°" if yaw is not None else " Yaw angle: None (requires both eyes)")
# Calculate face width if both eyes present
if both_eyes_present:
face_width = abs(right_eye[0] - left_eye[0])
print(f" Face width (eye distance): {face_width:.2f} pixels")
# If face width is very small, it might be a profile view
if face_width < 20:
print(f" ⚠️ Very small face width - likely extreme profile view")
# Classify pose
pitch = detector.calculate_pitch_from_landmarks(landmarks)
roll = detector.calculate_roll_from_landmarks(landmarks)
pose_mode = detector.classify_pose_mode(yaw, pitch, roll)
print(f" Pose mode: {pose_mode}")
print()
print("\n" + "=" * 80)
print("CONCLUSION")
print("=" * 80)
print("""
If RetinaFace provides both eyes even for profile faces:
- We can use eye distance (face width) as an indicator
- Small face width (< 20-30 pixels) suggests extreme profile
- But we can't directly use 'missing eye' as a signal
If RetinaFace sometimes only provides one eye for profile faces:
- We can check if left_eye or right_eye is None
- If only one eye is present, it's likely a profile view
- This would be a strong indicator for profile detection
""")
except ImportError as e:
print(f"❌ Import error: {e}")
print("Make sure you're in the project directory and dependencies are installed")

View File

@ -1,161 +0,0 @@
#!/usr/bin/env python3
"""
Test pitch and roll angle calculations to investigate issues
"""
import sys
import os
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
try:
from src.utils.pose_detection import PoseDetector, RETINAFACE_AVAILABLE
import sqlite3
from pathlib import Path
def test_retinaface_landmarks():
"""Test what landmarks RetinaFace actually provides"""
if not RETINAFACE_AVAILABLE:
print("❌ RetinaFace not available")
return
print("=" * 60)
print("TESTING RETINAFACE LANDMARKS")
print("=" * 60)
# Try to find a test image
test_image_paths = [
"demo_photos",
"data/uploads",
"data"
]
detector = PoseDetector()
test_image = None
for path in test_image_paths:
if os.path.exists(path):
for ext in ['.jpg', '.jpeg', '.png']:
for img_file in Path(path).glob(f'*{ext}'):
test_image = str(img_file)
break
if test_image:
break
if not test_image:
print("❌ No test image found")
return
print(f"Using test image: {test_image}")
# Detect faces
faces = detector.detect_faces_with_landmarks(test_image)
if not faces:
print("❌ No faces detected")
return
print(f"\n✅ Found {len(faces)} face(s)")
for face_key, face_data in faces.items():
print(f"\n{face_key}:")
landmarks = face_data.get('landmarks', {})
print(f" Landmarks keys: {list(landmarks.keys())}")
for landmark_name, position in landmarks.items():
print(f" {landmark_name}: {position}")
# Test calculations
yaw = detector.calculate_yaw_from_landmarks(landmarks)
pitch = detector.calculate_pitch_from_landmarks(landmarks)
roll = detector.calculate_roll_from_landmarks(landmarks)
print(f"\n Calculated angles:")
print(f" Yaw: {yaw:.2f}°" if yaw is not None else " Yaw: None")
print(f" Pitch: {pitch:.2f}°" if pitch is not None else " Pitch: None")
print(f" Roll: {roll:.2f}°" if roll is not None else " Roll: None")
# Check which landmarks are missing for pitch
required_for_pitch = ['left_eye', 'right_eye', 'left_mouth', 'right_mouth', 'nose']
missing = [lm for lm in required_for_pitch if lm not in landmarks]
if missing:
print(f" ⚠️ Missing landmarks for pitch: {missing}")
# Check roll calculation
if roll is not None:
left_eye = landmarks.get('left_eye')
right_eye = landmarks.get('right_eye')
if left_eye and right_eye:
dx = right_eye[0] - left_eye[0]
dy = right_eye[1] - left_eye[1]
print(f" Roll calculation details:")
print(f" dx (right_eye[0] - left_eye[0]): {dx:.2f}")
print(f" dy (right_eye[1] - left_eye[1]): {dy:.2f}")
print(f" atan2(dy, dx) = {roll:.2f}°")
# Normalize to [-90, 90] range
normalized_roll = roll
if normalized_roll > 90:
normalized_roll = normalized_roll - 180
elif normalized_roll < -90:
normalized_roll = normalized_roll + 180
print(f" Normalized to [-90, 90]: {normalized_roll:.2f}°")
pose_mode = detector.classify_pose_mode(yaw, pitch, roll)
print(f" Pose mode: {pose_mode}")
def analyze_database_angles():
"""Analyze angles in database to find patterns"""
db_path = "data/punimtag.db"
if not os.path.exists(db_path):
print(f"❌ Database not found: {db_path}")
return
print("\n" + "=" * 60)
print("ANALYZING DATABASE ANGLES")
print("=" * 60)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get faces with angle data
cursor.execute("""
SELECT id, pose_mode, yaw_angle, pitch_angle, roll_angle
FROM faces
WHERE yaw_angle IS NOT NULL OR pitch_angle IS NOT NULL OR roll_angle IS NOT NULL
LIMIT 20
""")
faces = cursor.fetchall()
print(f"\nFound {len(faces)} faces with angle data\n")
for face in faces:
print(f"Face ID {face['id']}: {face['pose_mode']}")
print(f" Yaw: {face['yaw_angle']:.2f}°" if face['yaw_angle'] else " Yaw: None")
print(f" Pitch: {face['pitch_angle']:.2f}°" if face['pitch_angle'] else " Pitch: None")
print(f" Roll: {face['roll_angle']:.2f}°" if face['roll_angle'] else " Roll: None")
# Check roll normalization
if face['roll_angle'] is not None:
roll = face['roll_angle']
normalized = roll
if normalized > 90:
normalized = normalized - 180
elif normalized < -90:
normalized = normalized + 180
print(f" Roll normalized: {normalized:.2f}°")
print()
conn.close()
if __name__ == "__main__":
test_retinaface_landmarks()
analyze_database_angles()
except ImportError as e:
print(f"❌ Import error: {e}")
print("Make sure you're in the project directory and dependencies are installed")

View File

@ -1,116 +0,0 @@
#!/usr/bin/env python3
"""Update status of a reported photo in the auth database."""
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import text
from backend.db.session import get_auth_database_url, AuthSessionLocal
def update_reported_photo_status(report_id: int, new_status: str):
"""Update the status of a reported photo."""
if AuthSessionLocal is None:
raise ValueError("Auth database not configured. Set DATABASE_URL_AUTH environment variable.")
db = AuthSessionLocal()
try:
# First check if the report exists and get its current status
check_result = db.execute(text("""
SELECT id, status, review_notes
FROM inappropriate_photo_reports
WHERE id = :report_id
"""), {"report_id": report_id})
row = check_result.fetchone()
if not row:
print(f"❌ Reported photo {report_id} not found in database.")
return
current_status = row.status
review_notes = row.review_notes
print(f"📋 Current status: '{current_status}'")
if review_notes:
print(f"📝 Review notes: '{review_notes}'")
if current_status == new_status:
print(f" Status is already '{new_status}'. No update needed.")
return
# Update the status
result = db.execute(text("""
UPDATE inappropriate_photo_reports
SET status = :new_status
WHERE id = :report_id
"""), {
"new_status": new_status,
"report_id": report_id
})
db.commit()
if result.rowcount > 0:
print(f"✅ Successfully updated reported photo {report_id} status from '{current_status}' to '{new_status}'")
else:
print(f"⚠️ No rows updated.")
except Exception as e:
db.rollback()
print(f"❌ Error updating reported photo status: {str(e)}")
raise
finally:
db.close()
def find_reported_photo_by_note(search_note: str):
"""Find reported photos by review notes."""
if AuthSessionLocal is None:
raise ValueError("Auth database not configured. Set DATABASE_URL_AUTH environment variable.")
db = AuthSessionLocal()
try:
result = db.execute(text("""
SELECT id, photo_id, status, review_notes, reported_at
FROM inappropriate_photo_reports
WHERE review_notes LIKE :search_pattern
ORDER BY id DESC
"""), {"search_pattern": f"%{search_note}%"})
rows = result.fetchall()
if not rows:
print(f"❌ No reported photos found with note containing '{search_note}'")
return []
print(f"📋 Found {len(rows)} reported photo(s) with note containing '{search_note}':\n")
for row in rows:
print(f" ID: {row.id}, Photo ID: {row.photo_id}, Status: {row.status}")
print(f" Notes: {row.review_notes}")
print(f" Reported at: {row.reported_at}\n")
return rows
except Exception as e:
print(f"❌ Error searching for reported photos: {str(e)}")
raise
finally:
db.close()
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python scripts/update_reported_photo_status.py <report_id> <new_status>")
print(" OR: python scripts/update_reported_photo_status.py search <search_text>")
print("Example: python scripts/update_reported_photo_status.py 57 dismissed")
print("Example: python scripts/update_reported_photo_status.py search 'agree. removed'")
sys.exit(1)
if sys.argv[1] == "search":
search_text = sys.argv[2]
find_reported_photo_by_note(search_text)
else:
report_id = int(sys.argv[1])
new_status = sys.argv[2]
update_reported_photo_status(report_id, new_status)

View File

@ -7,6 +7,7 @@
"build": "./scripts/with-sharp-libpath.sh next build",
"start": "./scripts/with-sharp-libpath.sh next start",
"lint": "next lint",
"type-check": "tsc --noEmit",
"prisma:generate": "prisma generate",
"prisma:generate:auth": "prisma generate --schema=prisma/schema-auth.prisma",
"prisma:generate:all": "prisma generate && prisma generate --schema=prisma/schema-auth.prisma",

View File

@ -204,3 +204,4 @@ echo "2. Run 'npm run dev' to start the development server"
echo "3. Run 'npm run check:permissions' to verify database access"
echo ""

View File

@ -145,3 +145,4 @@ testQueries()
prisma.$disconnect();
});

View File

@ -15,3 +15,4 @@ else
exec "$@"
fi