From 2f1554f88db58258b950f9ba21584227de8a15ff Mon Sep 17 00:00:00 2001 From: Vanshaj Poonia Date: Sun, 28 Jun 2026 16:24:13 +0530 Subject: [PATCH 1/6] fix(face-clusters): run global reclustering as async job; remove 10s axios cap (#1345) The shared axios client applied a hard 10s timeout to every backend request with no per-call override, so synchronous endpoints that scale with library size (notably global face reclustering) aborted in the UI after ~10s while the backend kept running to completion, leaving the UI and DB inconsistent. Backend - POST /face-clusters/global-recluster now starts the full DBSCAN pass as a background task and returns a task_id immediately (202 Accepted), running the blocking work via asyncio.to_thread so the event loop is not blocked. - New GET /face-clusters/global-recluster/{task_id} to poll the job status (running | complete | error). - Concurrency guard: a second trigger while one is running rejoins the in-flight task instead of starting a second pass that would race on the cluster tables. - Cleanup loop reaps finished task results after a TTL (registered in the app lifespan); running tasks are never reaped. Terminal results are not deleted on first poll, so repeated polls (multiple tabs/retries) stay consistent. Frontend - Raise the default axios timeout 10s -> 30s and add LONG_REQUEST_TIMEOUT_MS (120s) applied per-call to the face-search / multi-person-search endpoints. - Replace the one-shot reclustering mutation with a useGlobalRecluster polling hook wired into the Settings recluster button (same loader/toast UX). Scoped to the face-clustering side of the issue: memories endpoints are left untouched (handled separately), and a live progress bar / SSE is deferred to a follow-up. --- backend/app/routes/face_clusters.py | 188 +++++++++++++++--- backend/app/schemas/face_clusters.py | 24 ++- backend/main.py | 12 +- .../src/api/api-functions/face_clusters.ts | 35 +++- frontend/src/api/apiEndpoints.ts | 2 + frontend/src/api/axiosConfig.ts | 14 +- frontend/src/hooks/useGlobalRecluster.tsx | 116 +++++++++++ .../components/ApplicationControlsCard.tsx | 38 ++-- 8 files changed, 364 insertions(+), 65 deletions(-) create mode 100644 frontend/src/hooks/useGlobalRecluster.tsx diff --git a/backend/app/routes/face_clusters.py b/backend/app/routes/face_clusters.py index 4951ac960..4fbc4fa0c 100644 --- a/backend/app/routes/face_clusters.py +++ b/backend/app/routes/face_clusters.py @@ -1,7 +1,10 @@ +import asyncio import logging from binascii import Error as Base64Error import base64 -from typing import Annotated +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Annotated, Dict, Optional import uuid import os from fastapi import APIRouter, HTTPException, Query, status @@ -20,8 +23,10 @@ ErrorResponse, GetClustersResponse, GetClustersData, - GlobalReclusterResponse, - GlobalReclusterData, + GlobalReclusterStartData, + GlobalReclusterStartResponse, + GlobalReclusterStatusData, + GlobalReclusterStatusResponse, ClusterMetadata, GetClusterImagesResponse, GetClusterImagesData, @@ -38,6 +43,83 @@ router = APIRouter() +# Global reclustering runs synchronously over every face embedding in the +# library and can take well past any reasonable HTTP timeout on large +# libraries, so it runs as a background task that the client polls instead +# of blocking the request. +@dataclass +class ReclusterTask: + status: str = "running" # running | complete | error + clusters_created: Optional[int] = None + faces_skipped: Optional[int] = None + message: Optional[str] = None + created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + task: Optional[asyncio.Task] = None + + +recluster_tasks: Dict[str, ReclusterTask] = {} + +# Only one global reclustering job may run at a time: a full pass deletes and +# rebuilds every cluster, so two concurrent runs would race on the same tables. +# Holds the task_id of the in-flight job, or None when idle. Safe without a +# lock because it is only read/written from the (single-threaded) event loop +# with no await between check-and-set. +_active_recluster_task_id: Optional[str] = None + +# How long a finished task's result is retained for polling before the cleanup +# loop reaps it. Running tasks are never reaped (they are bounded to one by the +# concurrency guard above). +RECLUSTER_TASK_TTL_MINUTES = 15 + + +async def _run_global_recluster(task_id: str): + global _active_recluster_task_id + entry = recluster_tasks[task_id] + try: + result, total_faces_skipped = await asyncio.to_thread( + cluster_util_face_clusters_sync, force_full_reclustering=True + ) + + entry.status = "complete" + entry.clusters_created = result or 0 + entry.faces_skipped = total_faces_skipped + entry.message = ( + "No faces found to cluster" + if not result + else "Global reclustering completed successfully." + ) + logger.info("Global reclustering completed successfully (task_id=%s)", task_id) + except Exception as e: + logger.error(f"Global reclustering failed: {str(e)}") + entry.status = "error" + entry.message = f"Global reclustering failed: {str(e)}" + finally: + # Release the concurrency guard so a new job can be started, while the + # finished result stays in recluster_tasks for the client to poll. + if _active_recluster_task_id == task_id: + _active_recluster_task_id = None + + +async def _cleanup_stale_recluster_tasks(): + """Periodically drop finished reclustering results once they age out. + + Running tasks are left untouched (a legitimate recluster can run for a + long time, and the concurrency guard already bounds them to one). + """ + while True: + await asyncio.sleep(300) # run every 5 minutes + now = datetime.now(timezone.utc) + stale = [ + tid + for tid, entry in recluster_tasks.items() + if entry.status != "running" + and (now - entry.created_at).total_seconds() + > RECLUSTER_TASK_TTL_MINUTES * 60 + ] + for tid in stale: + recluster_tasks.pop(tid, None) + + @router.put( "/{cluster_id}", response_model=RenameClusterResponse, @@ -308,51 +390,95 @@ def face_tagging( @router.post( "/global-recluster", - response_model=GlobalReclusterResponse, + status_code=status.HTTP_202_ACCEPTED, + response_model=GlobalReclusterStartResponse, responses={code: {"model": ErrorResponse} for code in [500]}, ) -def trigger_global_reclustering(): +async def trigger_global_reclustering(): """ - Manually trigger global face reclustering. + Start a global face reclustering job in the background. This forces full reclustering regardless of the 24-hour rule. + + Returns immediately with a task_id; poll + GET /face-clusters/global-recluster/{task_id} for the result, since + reclustering runs over every face embedding and can take a long time + on large libraries. + + If a reclustering job is already running, its task_id is returned instead + of starting a second (concurrent runs would race on the cluster tables). """ - try: - logger.info("Starting manual global face reclustering...") + global _active_recluster_task_id - result, total_faces_skipped = cluster_util_face_clusters_sync( - force_full_reclustering=True + if _active_recluster_task_id is not None: + logger.info( + "Global reclustering already in progress (task_id=%s); reusing it", + _active_recluster_task_id, + ) + return GlobalReclusterStartResponse( + success=True, + message="Global reclustering already in progress.", + data=GlobalReclusterStartData(task_id=_active_recluster_task_id), ) - if result == 0: - return GlobalReclusterResponse( - success=True, - message="No faces found to cluster", - data=GlobalReclusterData( - clusters_created=0, faces_skipped=total_faces_skipped - ), - ) + task_id = str(uuid.uuid4()) + entry = ReclusterTask() + recluster_tasks[task_id] = entry + _active_recluster_task_id = task_id + entry.task = asyncio.create_task(_run_global_recluster(task_id)) - logger.info("Global reclustering completed successfully") + logger.info("Started manual global face reclustering (task_id=%s)", task_id) - return GlobalReclusterResponse( - success=True, - message="Global reclustering completed successfully.", - data=GlobalReclusterData( - clusters_created=result, faces_skipped=total_faces_skipped - ), - ) + return GlobalReclusterStartResponse( + success=True, + message="Global reclustering started.", + data=GlobalReclusterStartData(task_id=task_id), + ) - except Exception as e: - logger.error(f"Global reclustering failed: {str(e)}") + +@router.get( + "/global-recluster/{task_id}", + response_model=GlobalReclusterStatusResponse, + responses={code: {"model": ErrorResponse} for code in [404]}, +) +async def get_global_recluster_status(task_id: str): + """Poll the status of a previously started global reclustering job.""" + entry = recluster_tasks.get(task_id) + if entry is None: raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + status_code=status.HTTP_404_NOT_FOUND, detail=ErrorResponse( success=False, - error="Internal server error", - message=f"Global reclustering failed: {str(e)}", + error="Task Not Found", + message=f"Recluster task '{task_id}' not found or already consumed.", ).model_dump(), ) + if entry.status == "running": + return GlobalReclusterStatusResponse( + success=True, + data=GlobalReclusterStatusData(status="running"), + ) + + # Terminal state: leave the entry in place so repeated polls (multiple + # tabs, retries) return the same result; _cleanup_stale_recluster_tasks + # reaps it once it ages out. + if entry.status == "error": + return GlobalReclusterStatusResponse( + success=False, + message=entry.message, + data=GlobalReclusterStatusData(status="error"), + ) + + return GlobalReclusterStatusResponse( + success=True, + message=entry.message, + data=GlobalReclusterStatusData( + status="complete", + clusters_created=entry.clusters_created, + faces_skipped=entry.faces_skipped, + ), + ) + @router.post( "/multi-search", diff --git a/backend/app/schemas/face_clusters.py b/backend/app/schemas/face_clusters.py index ff1186108..50978de1a 100644 --- a/backend/app/schemas/face_clusters.py +++ b/backend/app/schemas/face_clusters.py @@ -1,5 +1,5 @@ from pydantic import BaseModel -from typing import List, Optional, Dict, Union, Any +from typing import List, Literal, Optional, Dict, Union, Any # Request Models @@ -74,16 +74,32 @@ class GetClusterImagesResponse(BaseModel): data: Optional[GetClusterImagesData] = None -class GlobalReclusterData(BaseModel): +class GlobalReclusterStartData(BaseModel): + task_id: str + + +class GlobalReclusterStartResponse(BaseModel): + """Returned immediately when a global reclustering job is started.""" + + success: bool + message: Optional[str] = None + error: Optional[str] = None + data: Optional[GlobalReclusterStartData] = None + + +class GlobalReclusterStatusData(BaseModel): + status: Literal["running", "complete", "error"] clusters_created: Optional[int] = None faces_skipped: Optional[int] = None -class GlobalReclusterResponse(BaseModel): +class GlobalReclusterStatusResponse(BaseModel): + """Polled to check on the progress of a global reclustering job.""" + success: bool message: Optional[str] = None error: Optional[str] = None - data: Optional[GlobalReclusterData] = None + data: Optional[GlobalReclusterStatusData] = None class MultiPersonSearchRequest(BaseModel): diff --git a/backend/main.py b/backend/main.py index af10bba49..c8d890dbf 100644 --- a/backend/main.py +++ b/backend/main.py @@ -25,7 +25,10 @@ from app.routes.folders import router as folders_router from app.routes.albums import router as albums_router from app.routes.images import router as images_router -from app.routes.face_clusters import router as face_clusters_router +from app.routes.face_clusters import ( + router as face_clusters_router, + _cleanup_stale_recluster_tasks, +) from app.routes.user_preferences import router as user_preferences_router from app.routes.memories import router as memories_router from app.routes.shutdown import router as shutdown_router @@ -66,12 +69,17 @@ async def lifespan(app: FastAPI): # Start the SSE model download cleanup task cleanup_task = asyncio.create_task(_cleanup_stale_tasks()) + # Start the global-reclustering finished-task cleanup loop + recluster_cleanup_task = asyncio.create_task(_cleanup_stale_recluster_tasks()) try: yield finally: cleanup_task.cancel() - await asyncio.gather(cleanup_task, return_exceptions=True) + recluster_cleanup_task.cancel() + await asyncio.gather( + cleanup_task, recluster_cleanup_task, return_exceptions=True + ) app.state.executor.shutdown(wait=True) diff --git a/frontend/src/api/api-functions/face_clusters.ts b/frontend/src/api/api-functions/face_clusters.ts index 42d2bea32..737a6a759 100644 --- a/frontend/src/api/api-functions/face_clusters.ts +++ b/frontend/src/api/api-functions/face_clusters.ts @@ -1,5 +1,5 @@ import { faceClustersEndpoints } from '../apiEndpoints'; -import { apiClient } from '../axiosConfig'; +import { apiClient, LONG_REQUEST_TIMEOUT_MS } from '../axiosConfig'; import { APIResponse } from '@/types/API'; import { BackendRes } from '@/hooks/useQueryExtension'; import type { Image } from '@/types/Media'; @@ -53,6 +53,7 @@ export const fetchSearchedFaces = async ( const response = await apiClient.post( faceClustersEndpoints.searchForFaces, request, + { timeout: LONG_REQUEST_TIMEOUT_MS }, ); return response.data; }; @@ -63,6 +64,7 @@ export const fetchSearchedFacesBase64 = async ( const response = await apiClient.post>( faceClustersEndpoints.searchForFacesBase64, request, + { timeout: LONG_REQUEST_TIMEOUT_MS }, ); return response.data; }; @@ -72,15 +74,39 @@ export interface GlobalReclusterData { faces_skipped: number | null; } -export const triggerGlobalReclustering = async (): Promise< - BackendRes +export interface GlobalReclusterStartData { + task_id: string; +} + +export interface GlobalReclusterStatusData { + status: 'running' | 'complete' | 'error'; + clusters_created: number | null; + faces_skipped: number | null; +} + +// Global reclustering runs over every face embedding in the library and can +// take well past any reasonable HTTP timeout, so the backend runs it as a +// background job: this kicks it off and returns a task_id immediately. +export const startGlobalReclustering = async (): Promise< + BackendRes > => { - const response = await apiClient.post>( + const response = await apiClient.post>( faceClustersEndpoints.globalRecluster, ); return response.data; }; +// Poll this with the task_id returned by startGlobalReclustering until +// status is 'complete' or 'error'. +export const getGlobalReclusterStatus = async ( + taskId: string, +): Promise> => { + const response = await apiClient.get>( + faceClustersEndpoints.globalReclusterStatus(taskId), + ); + return response.data; +}; + export interface MultiPersonSearchRequest { cluster_ids: string[]; match_mode: 'match_any' | 'match_all'; @@ -92,6 +118,7 @@ export const fetchMultiPersonSearch = async ( const response = await apiClient.post( faceClustersEndpoints.multiPersonSearch, request, + { timeout: LONG_REQUEST_TIMEOUT_MS }, ); return response.data; }; diff --git a/frontend/src/api/apiEndpoints.ts b/frontend/src/api/apiEndpoints.ts index 272479ac3..d957b5c7b 100644 --- a/frontend/src/api/apiEndpoints.ts +++ b/frontend/src/api/apiEndpoints.ts @@ -10,6 +10,8 @@ export const faceClustersEndpoints = { renameCluster: (clusterId: string) => `/face-clusters/${clusterId}`, getClusterImages: (clusterId: string) => `/face-clusters/${clusterId}/images`, globalRecluster: '/face-clusters/global-recluster', + globalReclusterStatus: (taskId: string) => + `/face-clusters/global-recluster/${taskId}`, multiPersonSearch: '/face-clusters/multi-search', }; diff --git a/frontend/src/api/axiosConfig.ts b/frontend/src/api/axiosConfig.ts index bc7fc31fb..8df1a6fd4 100644 --- a/frontend/src/api/axiosConfig.ts +++ b/frontend/src/api/axiosConfig.ts @@ -1,10 +1,20 @@ import axios from 'axios'; import { BACKEND_URL, SYNC_MICROSERVICE_URL } from '@/config/Backend'; +// Default timeout for ordinary requests. Endpoints whose runtime scales with +// library size pass a longer per-call `timeout` override instead of relying +// on this default. +const DEFAULT_TIMEOUT_MS = 30000; + +// For endpoints that run a full scan over the library (face search, +// multi-person search), pass `{ timeout: LONG_REQUEST_TIMEOUT_MS }` per call +// instead of relying on the default above. +export const LONG_REQUEST_TIMEOUT_MS = 120000; + // Create simple axios instance with basic configuration export const apiClient = axios.create({ baseURL: BACKEND_URL, - timeout: 10000, // 10 seconds timeout + timeout: DEFAULT_TIMEOUT_MS, headers: { 'Content-Type': 'application/json', Accept: 'application/json', @@ -12,7 +22,7 @@ export const apiClient = axios.create({ }); export const syncApiClient = axios.create({ baseURL: SYNC_MICROSERVICE_URL, - timeout: 10000, // 10 seconds timeout + timeout: DEFAULT_TIMEOUT_MS, headers: { 'Content-Type': 'application/json', Accept: 'application/json', diff --git a/frontend/src/hooks/useGlobalRecluster.tsx b/frontend/src/hooks/useGlobalRecluster.tsx new file mode 100644 index 000000000..55b4c6e99 --- /dev/null +++ b/frontend/src/hooks/useGlobalRecluster.tsx @@ -0,0 +1,116 @@ +import { useCallback, useEffect, useRef, useState } from 'react'; +import { useQueryClient } from '@tanstack/react-query'; +import { + startGlobalReclustering, + getGlobalReclusterStatus, + GlobalReclusterStatusData, +} from '@/api/api-functions/face_clusters'; +import { getErrorMessage } from '@/lib/utils'; + +const POLL_INTERVAL_MS = 2000; + +interface ReclusterState { + isPending: boolean; + isSuccess: boolean; + isError: boolean; + error: Error | undefined; + successData: GlobalReclusterStatusData | undefined; + successMessage: string | undefined; + errorMessage: string | undefined; +} + +const idleState: ReclusterState = { + isPending: false, + isSuccess: false, + isError: false, + error: undefined, + successData: undefined, + successMessage: undefined, + errorMessage: undefined, +}; + +/** + * Triggers global face reclustering and polls for completion. + * + * Reclustering runs over every face embedding in the library, so the + * backend starts it as a background job and returns a task_id immediately + * instead of blocking the HTTP request. This hook polls the job's status + * endpoint until it reaches a terminal state. + */ +export function useGlobalRecluster() { + const queryClient = useQueryClient(); + const pollHandleRef = useRef | null>(null); + const [state, setState] = useState(idleState); + + const stopPolling = useCallback(() => { + if (pollHandleRef.current) { + clearInterval(pollHandleRef.current); + pollHandleRef.current = null; + } + }, []); + + useEffect(() => stopPolling, [stopPolling]); + + const trigger = useCallback(() => { + stopPolling(); + setState({ ...idleState, isPending: true }); + + startGlobalReclustering() + .then((startRes) => { + const taskId = startRes.data?.task_id; + if (!taskId) { + throw new Error('Backend did not return a task_id for reclustering.'); + } + + pollHandleRef.current = setInterval(async () => { + try { + const statusRes = await getGlobalReclusterStatus(taskId); + + if (statusRes.data?.status === 'running') { + return; + } + + stopPolling(); + queryClient.invalidateQueries({ queryKey: ['clusters'] }); + + if (!statusRes.success || statusRes.data?.status === 'error') { + setState({ + ...idleState, + isError: true, + error: new Error( + statusRes.message || 'Global reclustering failed.', + ), + errorMessage: statusRes.message, + }); + return; + } + + setState({ + ...idleState, + isSuccess: true, + successData: statusRes.data, + successMessage: statusRes.message, + }); + } catch (err) { + stopPolling(); + setState({ + ...idleState, + isError: true, + error: err as Error, + errorMessage: getErrorMessage(err), + }); + } + }, POLL_INTERVAL_MS); + }) + .catch((err) => { + setState({ + ...idleState, + isError: true, + error: err, + errorMessage: getErrorMessage(err), + }); + }); + }, [stopPolling, queryClient]); + + return { trigger, ...state }; +} diff --git a/frontend/src/pages/SettingsPage/components/ApplicationControlsCard.tsx b/frontend/src/pages/SettingsPage/components/ApplicationControlsCard.tsx index dc81ab7d7..525a68801 100644 --- a/frontend/src/pages/SettingsPage/components/ApplicationControlsCard.tsx +++ b/frontend/src/pages/SettingsPage/components/ApplicationControlsCard.tsx @@ -1,4 +1,4 @@ -import React, { useState } from 'react'; +import React, { useEffect, useState } from 'react'; import { Settings as SettingsIcon, RefreshCw, Users } from 'lucide-react'; import { Button } from '@/components/ui/button'; @@ -9,8 +9,7 @@ import { useUpdater } from '@/hooks/useUpdater'; import { useDispatch } from 'react-redux'; import { showLoader, hideLoader } from '@/features/loaderSlice'; import { showInfoDialog } from '@/features/infoDialogSlice'; -import { triggerGlobalReclustering } from '@/api/api-functions/face_clusters'; -import { usePictoMutation } from '@/hooks/useQueryExtension'; +import { useGlobalRecluster } from '@/hooks/useGlobalRecluster'; import { useMutationFeedback } from '@/hooks/useMutationFeedback'; import { showGlobalAlert } from '@/features/globalAlertSlice'; @@ -32,11 +31,11 @@ const ApplicationControlsCard: React.FC = () => { const [updateDialogOpen, setUpdateDialogOpen] = useState(false); - const reclusterMutation = usePictoMutation({ - mutationFn: triggerGlobalReclustering, - autoInvalidateTags: ['clusters'], - onSuccess: (data) => { - const facesSkipped = data.data?.faces_skipped; + const reclusterJob = useGlobalRecluster(); + + useEffect(() => { + if (reclusterJob.isSuccess) { + const facesSkipped = reclusterJob.successData?.faces_skipped; if (facesSkipped != null && facesSkipped > 0) { dispatch( @@ -46,34 +45,29 @@ const ApplicationControlsCard: React.FC = () => { }), ); } - }, - }); + } + }, [reclusterJob.isSuccess, reclusterJob.successData, dispatch]); const feedbackOptions = React.useMemo( () => ({ - loadingMessage: 'Starting global face reclustering...', + loadingMessage: + 'Reclustering faces... this can take a while on large libraries.', successTitle: 'Reclustering Completed', successMessage: - reclusterMutation.successMessage || + reclusterJob.successMessage || 'Global face reclustering completed successfully.', errorTitle: 'Reclustering Failed', errorMessage: - reclusterMutation.errorMessage || + reclusterJob.errorMessage || 'Failed to complete global face reclustering.', - // You can use reclusterMutation.successData?.clusters_created to show the number of clusters created if needed - // Example: `Clusters created: ${reclusterMutation.successData?.clusters_created}` }), - [ - reclusterMutation.successMessage, - reclusterMutation.errorMessage, - reclusterMutation.successData, - ], + [reclusterJob.successMessage, reclusterJob.errorMessage], ); - useMutationFeedback(reclusterMutation, feedbackOptions); + useMutationFeedback(reclusterJob, feedbackOptions); const onGlobalReclusterClick = () => { - reclusterMutation.mutate(undefined); + reclusterJob.trigger(); }; const onCheckUpdatesClick = () => { From 6aa1fd87668792fb1895553cccb4a7031d912900 Mon Sep 17 00:00:00 2001 From: Vanshaj Poonia Date: Sun, 28 Jun 2026 16:44:31 +0530 Subject: [PATCH 2/6] fix(face-clusters): age recluster results from completion; harden poll loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback on the async reclustering flow. - ReclusterTask now records finished_at, and the cleanup loop ages terminal results from completion time instead of creation time. Previously a job that ran close to the TTL could be reaped almost immediately after finishing, making polling clients see a 404 instead of the result. - useGlobalRecluster now polls with a self-scheduling setTimeout (the next tick is queued only after the current request resolves, so status requests can't stack/overlap) and guards every async callback with a monotonically increasing run id. A newer trigger() — or unmount — invalidates older runs so they cannot keep polling or overwrite state, fixing the orphaned-interval leak on repeated triggers. --- backend/app/routes/face_clusters.py | 17 ++++++---- frontend/src/hooks/useGlobalRecluster.tsx | 38 ++++++++++++++++++----- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/backend/app/routes/face_clusters.py b/backend/app/routes/face_clusters.py index 4fbc4fa0c..70e05d6b9 100644 --- a/backend/app/routes/face_clusters.py +++ b/backend/app/routes/face_clusters.py @@ -54,6 +54,7 @@ class ReclusterTask: faces_skipped: Optional[int] = None message: Optional[str] = None created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + finished_at: Optional[datetime] = None task: Optional[asyncio.Task] = None @@ -66,9 +67,10 @@ class ReclusterTask: # with no await between check-and-set. _active_recluster_task_id: Optional[str] = None -# How long a finished task's result is retained for polling before the cleanup -# loop reaps it. Running tasks are never reaped (they are bounded to one by the -# concurrency guard above). +# How long a finished task's result is retained for polling, measured from when +# it finished (not when it started) so a long-running job's result isn't reaped +# almost immediately after completion. Running tasks are never reaped (they are +# bounded to one by the concurrency guard above). RECLUSTER_TASK_TTL_MINUTES = 15 @@ -94,8 +96,10 @@ async def _run_global_recluster(task_id: str): entry.status = "error" entry.message = f"Global reclustering failed: {str(e)}" finally: - # Release the concurrency guard so a new job can be started, while the - # finished result stays in recluster_tasks for the client to poll. + # Stamp completion time so cleanup ages the result from when it finished, + # and release the concurrency guard so a new job can be started, while + # the finished result stays in recluster_tasks for the client to poll. + entry.finished_at = datetime.now(timezone.utc) if _active_recluster_task_id == task_id: _active_recluster_task_id = None @@ -113,7 +117,8 @@ async def _cleanup_stale_recluster_tasks(): tid for tid, entry in recluster_tasks.items() if entry.status != "running" - and (now - entry.created_at).total_seconds() + and entry.finished_at is not None + and (now - entry.finished_at).total_seconds() > RECLUSTER_TASK_TTL_MINUTES * 60 ] for tid in stale: diff --git a/frontend/src/hooks/useGlobalRecluster.tsx b/frontend/src/hooks/useGlobalRecluster.tsx index 55b4c6e99..e4d92e55f 100644 --- a/frontend/src/hooks/useGlobalRecluster.tsx +++ b/frontend/src/hooks/useGlobalRecluster.tsx @@ -39,34 +39,54 @@ const idleState: ReclusterState = { */ export function useGlobalRecluster() { const queryClient = useQueryClient(); - const pollHandleRef = useRef | null>(null); + const pollTimeoutRef = useRef | null>(null); + // Identifies the latest trigger() call. Every async callback captures the id + // it started with and bails if a newer trigger (or unmount) has since bumped + // it, so stale runs can't update state or keep polling. + const runIdRef = useRef(0); const [state, setState] = useState(idleState); const stopPolling = useCallback(() => { - if (pollHandleRef.current) { - clearInterval(pollHandleRef.current); - pollHandleRef.current = null; + if (pollTimeoutRef.current) { + clearTimeout(pollTimeoutRef.current); + pollTimeoutRef.current = null; } }, []); - useEffect(() => stopPolling, [stopPolling]); + // On unmount, invalidate any in-flight run and stop polling. + useEffect(() => { + return () => { + runIdRef.current += 1; + stopPolling(); + }; + }, [stopPolling]); const trigger = useCallback(() => { + const runId = runIdRef.current + 1; + runIdRef.current = runId; + const isActive = () => runId === runIdRef.current; + stopPolling(); setState({ ...idleState, isPending: true }); startGlobalReclustering() .then((startRes) => { + if (!isActive()) return; + const taskId = startRes.data?.task_id; if (!taskId) { throw new Error('Backend did not return a task_id for reclustering.'); } - pollHandleRef.current = setInterval(async () => { + // Self-scheduling poll: the next tick is only queued after the current + // request resolves, so requests can't stack up or overlap. + const poll = async () => { try { const statusRes = await getGlobalReclusterStatus(taskId); + if (!isActive()) return; if (statusRes.data?.status === 'running') { + pollTimeoutRef.current = setTimeout(poll, POLL_INTERVAL_MS); return; } @@ -92,6 +112,7 @@ export function useGlobalRecluster() { successMessage: statusRes.message, }); } catch (err) { + if (!isActive()) return; stopPolling(); setState({ ...idleState, @@ -100,9 +121,12 @@ export function useGlobalRecluster() { errorMessage: getErrorMessage(err), }); } - }, POLL_INTERVAL_MS); + }; + + poll(); }) .catch((err) => { + if (!isActive()) return; setState({ ...idleState, isError: true, From 680372f30ed5d1ec919362534d66524dec5f8b26 Mon Sep 17 00:00:00 2001 From: Vanshaj Poonia Date: Sun, 28 Jun 2026 16:52:43 +0530 Subject: [PATCH 3/6] test(face-clusters): add useGlobalRecluster tests; warn on multi-worker setup - Add unit tests for the useGlobalRecluster polling hook covering the success lifecycle, error from start, error reported by the status poll, polling cleanup on unmount, and that a rapid second trigger does not leave a second poll loop running. - Warn at startup when WORKERS > 1, since model-download and global-reclustering job tracking is in-memory and per-worker; a job started in one worker is invisible to others, so deployments should keep a single worker. This is a non-breaking safeguard (no behaviour change to the default single-worker run). --- backend/main.py | 18 ++ .../__tests__/useGlobalRecluster.test.tsx | 164 ++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 frontend/src/hooks/__tests__/useGlobalRecluster.test.tsx diff --git a/backend/main.py b/backend/main.py index c8d890dbf..cbb98a7f6 100644 --- a/backend/main.py +++ b/backend/main.py @@ -46,6 +46,8 @@ # Configure Uvicorn logging to use our custom formatter configure_uvicorn_logging("backend") +logger = get_logger("backend") + path = os.path.dirname(DATABASE_PATH) os.makedirs(path, exist_ok=True) @@ -64,6 +66,22 @@ async def lifespan(app: FastAPI): db_create_albums_table() db_create_album_images_table() db_create_metadata_table() + # Model-download and global-reclustering job tracking is in-memory and + # per-worker. With more than one worker, a job started in one worker is + # invisible to the others, so status polls can miss it and duplicate jobs + # can start. Warn loudly so deployments keep a single worker (WORKERS=1). + try: + worker_count = int(os.environ.get("WORKERS", "1")) + except ValueError: + worker_count = 1 + if worker_count > 1: + logger.warning( + "WORKERS=%s: model-download and global-reclustering job tracking is " + "in-memory and per-worker. Run with a single worker (WORKERS=1) to " + "avoid duplicate jobs and missed status polls.", + worker_count, + ) + # Create ProcessPoolExecutor and attach it to app.state app.state.executor = ProcessPoolExecutor(max_workers=1) diff --git a/frontend/src/hooks/__tests__/useGlobalRecluster.test.tsx b/frontend/src/hooks/__tests__/useGlobalRecluster.test.tsx new file mode 100644 index 000000000..73cf8bd2c --- /dev/null +++ b/frontend/src/hooks/__tests__/useGlobalRecluster.test.tsx @@ -0,0 +1,164 @@ +import React from 'react'; +import { renderHook, act } from '@testing-library/react'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { useGlobalRecluster } from '@/hooks/useGlobalRecluster'; +import { + startGlobalReclustering, + getGlobalReclusterStatus, +} from '@/api/api-functions/face_clusters'; + +jest.mock('@/api/api-functions/face_clusters', () => ({ + startGlobalReclustering: jest.fn(), + getGlobalReclusterStatus: jest.fn(), +})); + +const mockStart = startGlobalReclustering as jest.MockedFunction< + typeof startGlobalReclustering +>; +const mockStatus = getGlobalReclusterStatus as jest.MockedFunction< + typeof getGlobalReclusterStatus +>; + +const POLL_INTERVAL_MS = 2000; + +const startOk = { success: true, message: 'started', data: { task_id: 'abc' } }; +const running = { + success: true, + data: { + status: 'running' as const, + clusters_created: null, + faces_skipped: null, + }, +}; +const complete = { + success: true, + message: 'done', + data: { status: 'complete' as const, clusters_created: 3, faces_skipped: 1 }, +}; +const errored = { + success: false, + message: 'reclustering failed', + data: { + status: 'error' as const, + clusters_created: null, + faces_skipped: null, + }, +}; + +function setup() { + const queryClient = new QueryClient({ + defaultOptions: { queries: { retry: false } }, + }); + const invalidateSpy = jest.spyOn(queryClient, 'invalidateQueries'); + const wrapper = ({ children }: { children: React.ReactNode }) => ( + {children} + ); + const utils = renderHook(() => useGlobalRecluster(), { wrapper }); + return { ...utils, invalidateSpy }; +} + +// Flush pending promises and advance fake timers together. +const flush = async (ms = 0) => { + await act(async () => { + await Promise.resolve(); + await jest.advanceTimersByTimeAsync(ms); + }); +}; + +beforeEach(() => { + jest.useFakeTimers(); + mockStart.mockReset(); + mockStatus.mockReset(); +}); + +afterEach(() => { + jest.useRealTimers(); +}); + +describe('useGlobalRecluster', () => { + test('polls until the job completes successfully', async () => { + mockStart.mockResolvedValue(startOk); + mockStatus.mockResolvedValueOnce(running).mockResolvedValueOnce(complete); + + const { result, invalidateSpy } = setup(); + + act(() => { + result.current.trigger(); + }); + expect(result.current.isPending).toBe(true); + + await flush(); // start resolves, first poll -> running + expect(mockStatus).toHaveBeenCalledTimes(1); + expect(result.current.isPending).toBe(true); + + await flush(POLL_INTERVAL_MS); // scheduled poll -> complete + expect(result.current.isSuccess).toBe(true); + expect(result.current.successData).toEqual(complete.data); + expect(result.current.successMessage).toBe('done'); + expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['clusters'] }); + }); + + test('sets an error when the job reports failure', async () => { + mockStart.mockResolvedValue(startOk); + mockStatus.mockResolvedValue(errored); + + const { result, invalidateSpy } = setup(); + act(() => { + result.current.trigger(); + }); + await flush(); + + expect(result.current.isError).toBe(true); + expect(result.current.errorMessage).toBe('reclustering failed'); + expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['clusters'] }); + }); + + test('sets an error when starting the job fails', async () => { + mockStart.mockRejectedValue(new Error('network down')); + + const { result } = setup(); + act(() => { + result.current.trigger(); + }); + await flush(); + + expect(result.current.isError).toBe(true); + expect(mockStatus).not.toHaveBeenCalled(); + }); + + test('stops polling after unmount', async () => { + mockStart.mockResolvedValue(startOk); + mockStatus.mockResolvedValue(running); + + const { result, unmount } = setup(); + act(() => { + result.current.trigger(); + }); + await flush(); // first poll + expect(mockStatus).toHaveBeenCalledTimes(1); + + unmount(); + await flush(POLL_INTERVAL_MS * 3); + // No further polls once unmounted. + expect(mockStatus).toHaveBeenCalledTimes(1); + }); + + test('a rapid second trigger does not leave a second poll loop running', async () => { + mockStart.mockResolvedValue(startOk); + mockStatus.mockResolvedValue(running); + + const { result } = setup(); + // Both triggers fire before startGlobalReclustering resolves. + act(() => { + result.current.trigger(); + result.current.trigger(); + }); + + await flush(); // both starts resolve; only the latest run may poll + expect(mockStatus).toHaveBeenCalledTimes(1); + + const callsBefore = mockStatus.mock.calls.length; + await flush(POLL_INTERVAL_MS); // exactly one loop advances + expect(mockStatus.mock.calls.length).toBe(callsBefore + 1); + }); +}); From ffec8bf57f144ea22df067e5306c6b1b8dea85f7 Mon Sep 17 00:00:00 2001 From: Vanshaj Poonia Date: Sun, 28 Jun 2026 16:56:23 +0530 Subject: [PATCH 4/6] chore(face-clusters): drop redundant logger and unused interface - main.py already defines a module-level logger; remove the duplicate one added for the worker warning (the lifespan resolves the existing logger at runtime). - Remove the now-unused GlobalReclusterData interface from the frontend API module (superseded by the start/status data interfaces). --- backend/main.py | 2 -- frontend/src/api/api-functions/face_clusters.ts | 5 ----- 2 files changed, 7 deletions(-) diff --git a/backend/main.py b/backend/main.py index cbb98a7f6..0ffd9109a 100644 --- a/backend/main.py +++ b/backend/main.py @@ -46,8 +46,6 @@ # Configure Uvicorn logging to use our custom formatter configure_uvicorn_logging("backend") -logger = get_logger("backend") - path = os.path.dirname(DATABASE_PATH) os.makedirs(path, exist_ok=True) diff --git a/frontend/src/api/api-functions/face_clusters.ts b/frontend/src/api/api-functions/face_clusters.ts index 737a6a759..006d137ee 100644 --- a/frontend/src/api/api-functions/face_clusters.ts +++ b/frontend/src/api/api-functions/face_clusters.ts @@ -69,11 +69,6 @@ export const fetchSearchedFacesBase64 = async ( return response.data; }; -export interface GlobalReclusterData { - clusters_created: number | null; - faces_skipped: number | null; -} - export interface GlobalReclusterStartData { task_id: string; } From a422cc285eba344f1853c4330bff165b5b45d4a8 Mon Sep 17 00:00:00 2001 From: Vanshaj Poonia Date: Sun, 28 Jun 2026 17:35:16 +0530 Subject: [PATCH 5/6] fix(backend): enforce single-worker deployment for in-memory job tracking Model-download and global-reclustering job state lives in per-worker memory, so running multiple workers would let a job start in one worker while status polls hit another (404s, duplicate jobs). Hardcode the server to a single worker in run.sh and run-server.ps1 (removing the WORKERS override) and document the constraint at the in-memory state. Replaces the previous best-effort startup warning, which could not observe the real worker count. --- backend/main.py | 20 ++++---------------- backend/run-server.ps1 | 9 +++++---- backend/run.sh | 8 +++++--- 3 files changed, 14 insertions(+), 23 deletions(-) diff --git a/backend/main.py b/backend/main.py index 0ffd9109a..9f8eb6501 100644 --- a/backend/main.py +++ b/backend/main.py @@ -64,23 +64,11 @@ async def lifespan(app: FastAPI): db_create_albums_table() db_create_album_images_table() db_create_metadata_table() - # Model-download and global-reclustering job tracking is in-memory and - # per-worker. With more than one worker, a job started in one worker is - # invisible to the others, so status polls can miss it and duplicate jobs - # can start. Warn loudly so deployments keep a single worker (WORKERS=1). - try: - worker_count = int(os.environ.get("WORKERS", "1")) - except ValueError: - worker_count = 1 - if worker_count > 1: - logger.warning( - "WORKERS=%s: model-download and global-reclustering job tracking is " - "in-memory and per-worker. Run with a single worker (WORKERS=1) to " - "avoid duplicate jobs and missed status polls.", - worker_count, - ) - # Create ProcessPoolExecutor and attach it to app.state + # NOTE: model-download and global-reclustering job tracking is in-memory and + # per-worker, so the server is launched with a single worker (see run.sh / + # run-server.ps1). Keep it that way unless that state is moved to a shared + # store. app.state.executor = ProcessPoolExecutor(max_workers=1) # Start the SSE model download cleanup task diff --git a/backend/run-server.ps1 b/backend/run-server.ps1 index d21d635b6..335297ea5 100644 --- a/backend/run-server.ps1 +++ b/backend/run-server.ps1 @@ -44,13 +44,14 @@ if ($args[0] -eq "--test") { Start-Sleep -Seconds 3 } } else { - # Get the WORKERS environment variable or use a default value - $workers = if ($env:WORKERS) { $env:WORKERS } else { "1" } - Write-Host "WORKERS: $workers" + # Model-download and global-reclustering job tracking is in-memory and + # per-worker, so the server must run with a single worker; a job started in + # one worker would be invisible to others (missed status polls, duplicate + # jobs). Do not raise the worker count above 1. # Start the Hypercorn server $process = Start-Process -FilePath "hypercorn" ` - -ArgumentList "main:app --workers $workers --bind 0.0.0.0:8000" ` + -ArgumentList "main:app --workers 1 --bind 0.0.0.0:8000" ` -PassThru # Wait for process termination or Ctrl+C diff --git a/backend/run.sh b/backend/run.sh index dd3e0283d..18e4c9ecd 100644 --- a/backend/run.sh +++ b/backend/run.sh @@ -21,7 +21,9 @@ if [[ $1 == "--test" ]]; then sleep 3 done else - # print the value of the WORKERS environment variable - echo "WORKERS: ${WORKERS:-1}" - hypercorn main:app --workers ${WORKERS:-1} --bind 0.0.0.0:8000 + # Model-download and global-reclustering job tracking is in-memory and + # per-worker, so the server must run with a single worker; a job started in + # one worker would be invisible to others (missed status polls, duplicate + # jobs). Do not raise the worker count above 1. + hypercorn main:app --workers 1 --bind 0.0.0.0:8000 fi \ No newline at end of file From 689fc835c14e0e5efbd8ebed6bd70dfdcb8eb915 Mon Sep 17 00:00:00 2001 From: Vanshaj Poonia Date: Sun, 28 Jun 2026 21:37:54 +0530 Subject: [PATCH 6/6] test(face-clusters): cover status-request rejection in useGlobalRecluster Addresses a CodeRabbit review comment: the suite only covered the {success: false, status: 'error'} payload path, not a rejected status request (e.g. 404 for a missing/aged-out task ID). --- .../__tests__/useGlobalRecluster.test.tsx | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/frontend/src/hooks/__tests__/useGlobalRecluster.test.tsx b/frontend/src/hooks/__tests__/useGlobalRecluster.test.tsx index 73cf8bd2c..3d04ff2d1 100644 --- a/frontend/src/hooks/__tests__/useGlobalRecluster.test.tsx +++ b/frontend/src/hooks/__tests__/useGlobalRecluster.test.tsx @@ -126,6 +126,29 @@ describe('useGlobalRecluster', () => { expect(mockStatus).not.toHaveBeenCalled(); }); + test('sets an error when the status request itself rejects (e.g. 404 for an aged-out task)', async () => { + mockStart.mockResolvedValue(startOk); + const notFound = { + isAxiosError: true, + message: 'Request failed with status code 404', + code: 'ERR_BAD_REQUEST', + response: { status: 404, data: {} }, + }; + mockStatus.mockRejectedValue(notFound); + + const { result, invalidateSpy } = setup(); + act(() => { + result.current.trigger(); + }); + await flush(); + + expect(result.current.isError).toBe(true); + expect(result.current.errorMessage).toBe( + 'ERR_BAD_REQUEST: Request failed with status code 404', + ); + expect(invalidateSpy).not.toHaveBeenCalled(); + }); + test('stops polling after unmount', async () => { mockStart.mockResolvedValue(startOk); mockStatus.mockResolvedValue(running);