Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 162 additions & 31 deletions backend/app/routes/face_clusters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import logging
from binascii import Error as Base64Error
import base64
from typing import Annotated
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Annotated, Dict, Optional
import uuid
import os
from fastapi import APIRouter, HTTPException, Query, status
Expand All @@ -20,8 +23,10 @@
ErrorResponse,
GetClustersResponse,
GetClustersData,
GlobalReclusterResponse,
GlobalReclusterData,
GlobalReclusterStartData,
GlobalReclusterStartResponse,
GlobalReclusterStatusData,
GlobalReclusterStatusResponse,
ClusterMetadata,
GetClusterImagesResponse,
GetClusterImagesData,
Expand All @@ -38,6 +43,88 @@
router = APIRouter()


# Global reclustering runs synchronously over every face embedding in the
# library and can take well past any reasonable HTTP timeout on large
# libraries, so it runs as a background task that the client polls instead
# of blocking the request.
@dataclass
class ReclusterTask:
status: str = "running" # running | complete | error
clusters_created: Optional[int] = None
faces_skipped: Optional[int] = None
message: Optional[str] = None
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
finished_at: Optional[datetime] = None
task: Optional[asyncio.Task] = None
Comment thread
coderabbitai[bot] marked this conversation as resolved.


recluster_tasks: Dict[str, ReclusterTask] = {}

# Only one global reclustering job may run at a time: a full pass deletes and
# rebuilds every cluster, so two concurrent runs would race on the same tables.
# Holds the task_id of the in-flight job, or None when idle. Safe without a
# lock because it is only read/written from the (single-threaded) event loop
# with no await between check-and-set.
_active_recluster_task_id: Optional[str] = None
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# How long a finished task's result is retained for polling, measured from when
# it finished (not when it started) so a long-running job's result isn't reaped
# almost immediately after completion. Running tasks are never reaped (they are
# bounded to one by the concurrency guard above).
RECLUSTER_TASK_TTL_MINUTES = 15


async def _run_global_recluster(task_id: str):
global _active_recluster_task_id
entry = recluster_tasks[task_id]
try:
result, total_faces_skipped = await asyncio.to_thread(
cluster_util_face_clusters_sync, force_full_reclustering=True
)

entry.status = "complete"
entry.clusters_created = result or 0
entry.faces_skipped = total_faces_skipped
entry.message = (
"No faces found to cluster"
if not result
else "Global reclustering completed successfully."
)
logger.info("Global reclustering completed successfully (task_id=%s)", task_id)
except Exception as e:
logger.error(f"Global reclustering failed: {str(e)}")
entry.status = "error"
entry.message = f"Global reclustering failed: {str(e)}"
finally:
# Stamp completion time so cleanup ages the result from when it finished,
# and release the concurrency guard so a new job can be started, while
# the finished result stays in recluster_tasks for the client to poll.
entry.finished_at = datetime.now(timezone.utc)
if _active_recluster_task_id == task_id:
_active_recluster_task_id = None


async def _cleanup_stale_recluster_tasks():
"""Periodically drop finished reclustering results once they age out.

Running tasks are left untouched (a legitimate recluster can run for a
long time, and the concurrency guard already bounds them to one).
"""
while True:
await asyncio.sleep(300) # run every 5 minutes
now = datetime.now(timezone.utc)
stale = [
tid
for tid, entry in recluster_tasks.items()
if entry.status != "running"
and entry.finished_at is not None
and (now - entry.finished_at).total_seconds()
> RECLUSTER_TASK_TTL_MINUTES * 60
]
for tid in stale:
recluster_tasks.pop(tid, None)


@router.put(
"/{cluster_id}",
response_model=RenameClusterResponse,
Expand Down Expand Up @@ -308,51 +395,95 @@ def face_tagging(

@router.post(
"/global-recluster",
response_model=GlobalReclusterResponse,
status_code=status.HTTP_202_ACCEPTED,
response_model=GlobalReclusterStartResponse,
responses={code: {"model": ErrorResponse} for code in [500]},
)
def trigger_global_reclustering():
async def trigger_global_reclustering():
"""
Manually trigger global face reclustering.
Start a global face reclustering job in the background.
This forces full reclustering regardless of the 24-hour rule.

Returns immediately with a task_id; poll
GET /face-clusters/global-recluster/{task_id} for the result, since
reclustering runs over every face embedding and can take a long time
on large libraries.

If a reclustering job is already running, its task_id is returned instead
of starting a second (concurrent runs would race on the cluster tables).
"""
try:
logger.info("Starting manual global face reclustering...")
global _active_recluster_task_id

result, total_faces_skipped = cluster_util_face_clusters_sync(
force_full_reclustering=True
if _active_recluster_task_id is not None:
logger.info(
"Global reclustering already in progress (task_id=%s); reusing it",
_active_recluster_task_id,
)
return GlobalReclusterStartResponse(
success=True,
message="Global reclustering already in progress.",
data=GlobalReclusterStartData(task_id=_active_recluster_task_id),
)

if result == 0:
return GlobalReclusterResponse(
success=True,
message="No faces found to cluster",
data=GlobalReclusterData(
clusters_created=0, faces_skipped=total_faces_skipped
),
)
task_id = str(uuid.uuid4())
entry = ReclusterTask()
recluster_tasks[task_id] = entry
_active_recluster_task_id = task_id
entry.task = asyncio.create_task(_run_global_recluster(task_id))

logger.info("Global reclustering completed successfully")
logger.info("Started manual global face reclustering (task_id=%s)", task_id)

return GlobalReclusterResponse(
success=True,
message="Global reclustering completed successfully.",
data=GlobalReclusterData(
clusters_created=result, faces_skipped=total_faces_skipped
),
)
return GlobalReclusterStartResponse(
success=True,
message="Global reclustering started.",
data=GlobalReclusterStartData(task_id=task_id),
)

except Exception as e:
logger.error(f"Global reclustering failed: {str(e)}")

@router.get(
"/global-recluster/{task_id}",
response_model=GlobalReclusterStatusResponse,
responses={code: {"model": ErrorResponse} for code in [404]},
)
async def get_global_recluster_status(task_id: str):
"""Poll the status of a previously started global reclustering job."""
entry = recluster_tasks.get(task_id)
if entry is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
status_code=status.HTTP_404_NOT_FOUND,
detail=ErrorResponse(
success=False,
error="Internal server error",
message=f"Global reclustering failed: {str(e)}",
error="Task Not Found",
message=f"Recluster task '{task_id}' not found or already consumed.",
).model_dump(),
)

if entry.status == "running":
return GlobalReclusterStatusResponse(
success=True,
data=GlobalReclusterStatusData(status="running"),
)

# Terminal state: leave the entry in place so repeated polls (multiple
# tabs, retries) return the same result; _cleanup_stale_recluster_tasks
# reaps it once it ages out.
if entry.status == "error":
return GlobalReclusterStatusResponse(
success=False,
message=entry.message,
data=GlobalReclusterStatusData(status="error"),
)

return GlobalReclusterStatusResponse(
success=True,
message=entry.message,
data=GlobalReclusterStatusData(
status="complete",
clusters_created=entry.clusters_created,
faces_skipped=entry.faces_skipped,
),
)


@router.post(
"/multi-search",
Expand Down
24 changes: 20 additions & 4 deletions backend/app/schemas/face_clusters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pydantic import BaseModel
from typing import List, Optional, Dict, Union, Any
from typing import List, Literal, Optional, Dict, Union, Any


# Request Models
Expand Down Expand Up @@ -74,16 +74,32 @@ class GetClusterImagesResponse(BaseModel):
data: Optional[GetClusterImagesData] = None


class GlobalReclusterData(BaseModel):
class GlobalReclusterStartData(BaseModel):
task_id: str


class GlobalReclusterStartResponse(BaseModel):
"""Returned immediately when a global reclustering job is started."""

success: bool
message: Optional[str] = None
error: Optional[str] = None
data: Optional[GlobalReclusterStartData] = None


class GlobalReclusterStatusData(BaseModel):
status: Literal["running", "complete", "error"]
clusters_created: Optional[int] = None
faces_skipped: Optional[int] = None


class GlobalReclusterResponse(BaseModel):
class GlobalReclusterStatusResponse(BaseModel):
"""Polled to check on the progress of a global reclustering job."""

success: bool
message: Optional[str] = None
error: Optional[str] = None
data: Optional[GlobalReclusterData] = None
data: Optional[GlobalReclusterStatusData] = None


class MultiPersonSearchRequest(BaseModel):
Expand Down
16 changes: 14 additions & 2 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
from app.routes.folders import router as folders_router
from app.routes.albums import router as albums_router
from app.routes.images import router as images_router
from app.routes.face_clusters import router as face_clusters_router
from app.routes.face_clusters import (
router as face_clusters_router,
_cleanup_stale_recluster_tasks,
)
from app.routes.user_preferences import router as user_preferences_router
from app.routes.memories import router as memories_router
from app.routes.shutdown import router as shutdown_router
Expand Down Expand Up @@ -62,16 +65,25 @@ async def lifespan(app: FastAPI):
db_create_album_images_table()
db_create_metadata_table()
# Create ProcessPoolExecutor and attach it to app.state
# NOTE: model-download and global-reclustering job tracking is in-memory and
# per-worker, so the server is launched with a single worker (see run.sh /
# run-server.ps1). Keep it that way unless that state is moved to a shared
# store.
app.state.executor = ProcessPoolExecutor(max_workers=1)

# Start the SSE model download cleanup task
cleanup_task = asyncio.create_task(_cleanup_stale_tasks())
# Start the global-reclustering finished-task cleanup loop
recluster_cleanup_task = asyncio.create_task(_cleanup_stale_recluster_tasks())

try:
yield
finally:
cleanup_task.cancel()
await asyncio.gather(cleanup_task, return_exceptions=True)
recluster_cleanup_task.cancel()
await asyncio.gather(
cleanup_task, recluster_cleanup_task, return_exceptions=True
)
app.state.executor.shutdown(wait=True)


Expand Down
9 changes: 5 additions & 4 deletions backend/run-server.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ if ($args[0] -eq "--test") {
Start-Sleep -Seconds 3
}
} else {
# Get the WORKERS environment variable or use a default value
$workers = if ($env:WORKERS) { $env:WORKERS } else { "1" }
Write-Host "WORKERS: $workers"
# Model-download and global-reclustering job tracking is in-memory and
# per-worker, so the server must run with a single worker; a job started in
# one worker would be invisible to others (missed status polls, duplicate
# jobs). Do not raise the worker count above 1.

# Start the Hypercorn server
$process = Start-Process -FilePath "hypercorn" `
-ArgumentList "main:app --workers $workers --bind 0.0.0.0:8000" `
-ArgumentList "main:app --workers 1 --bind 0.0.0.0:8000" `
-PassThru

# Wait for process termination or Ctrl+C
Expand Down
8 changes: 5 additions & 3 deletions backend/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ if [[ $1 == "--test" ]]; then
sleep 3
done
else
# print the value of the WORKERS environment variable
echo "WORKERS: ${WORKERS:-1}"
hypercorn main:app --workers ${WORKERS:-1} --bind 0.0.0.0:8000
# Model-download and global-reclustering job tracking is in-memory and
# per-worker, so the server must run with a single worker; a job started in
# one worker would be invisible to others (missed status polls, duplicate
# jobs). Do not raise the worker count above 1.
hypercorn main:app --workers 1 --bind 0.0.0.0:8000
fi
Loading
Loading