punimtag/punimtag.py

94 lines
3.3 KiB
Python

import os
import sqlite3
import face_recognition
from PIL import Image
import numpy as np
import hdbscan
# Constants
PHOTOS_DIR = 'photos'
DB_FILE = 'faces.db'
# Create or connect to database
def init_db():
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS images
(id INTEGER PRIMARY KEY, path TEXT UNIQUE)''')
c.execute('''CREATE TABLE IF NOT EXISTS faces
(id INTEGER PRIMARY KEY, image_id INTEGER, location TEXT, encoding BLOB, cluster_id INTEGER,
FOREIGN KEY(image_id) REFERENCES images(id))''')
c.execute('''CREATE TABLE IF NOT EXISTS clusters
(id INTEGER PRIMARY KEY, label TEXT)''')
conn.commit()
return conn
# Walk folder and process images
def process_images(conn):
c = conn.cursor()
encodings = []
face_data = []
for root, _, files in os.walk(PHOTOS_DIR):
for file in files:
if file.lower().endswith(('.jpg', '.png')):
path = os.path.join(root, file)
try:
image = face_recognition.load_image_file(path)
locations = face_recognition.face_locations(image)
face_encodings = face_recognition.face_encodings(image, locations)
c.execute("INSERT OR IGNORE INTO images (path) VALUES (?)", (path,))
c.execute("SELECT id FROM images WHERE path = ?", (path,))
image_id = c.fetchone()[0]
for loc, enc in zip(locations, face_encodings):
loc_str = str(loc)
enc_blob = enc.tobytes()
encodings.append(enc)
face_data.append((image_id, loc_str, enc_blob))
except Exception as e:
print(f"Error processing {path}: {e}")
conn.commit()
return np.array(encodings), face_data
# Cluster encodings
def cluster_encodings(encodings):
if len(encodings) == 0:
return []
clusterer = hdbscan.HDBSCAN(min_cluster_size=2)
labels = clusterer.fit_predict(encodings)
return labels
# Store clusters and update faces
def store_clusters(conn, labels, face_data):
c = conn.cursor()
unique_labels = set(labels)
for label in unique_labels:
if label != -1: # Ignore noise
c.execute("INSERT INTO clusters (label) VALUES (?)", (f"Cluster {label}",))
c.execute("SELECT last_insert_rowid()")
cluster_id = c.fetchone()[0]
else:
cluster_id = None
# Update faces with this label
for i, (image_id, loc_str, enc_blob) in enumerate(face_data):
if labels[i] == label:
c.execute("INSERT INTO faces (image_id, location, encoding, cluster_id) VALUES (?, ?, ?, ?)",
(image_id, loc_str, enc_blob, cluster_id))
conn.commit()
# Main function
def main():
conn = init_db()
encodings, face_data = process_images(conn)
labels = cluster_encodings(encodings)
store_clusters(conn, labels, face_data)
conn.close()
print("Processing complete. Data stored in faces.db")
if __name__ == "__main__":
main()