Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion backend/src/Main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ async fn main() -> anyhow::Result<()> {
"Dispute file analysis queue initialised"
);

let shutdown_pool = pool.clone();
let shutdown_queue = queue_tx.clone();

// ── 4. Application state ────────────────────────────────────────────────
let state = Arc::new(AppState {
db: pool,
Expand All @@ -150,6 +153,11 @@ async fn main() -> anyhow::Result<()> {
.with_graceful_shutdown(shutdown_signal())
.await?;

info!("HTTP listener stopped; closing dispute queue and draining database pool");
shutdown_queue.close();
shutdown_pool.close().await;
info!("Axum server shutdown completed");

Ok(())
}

Expand Down Expand Up @@ -209,4 +217,4 @@ async fn shutdown_signal() {
_ = ctrl_c => { info!("Received Ctrl-C, shutting down") },
_ = terminate => { info!("Received SIGTERM, shutting down") },
}
}
}
18 changes: 5 additions & 13 deletions backend/src/config/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,20 +346,12 @@ export const prisma = globalForPrisma.prisma || createPrismaClient();
if (process.env.NODE_ENV !== "production") globalForPrisma.prisma = prisma;

// ---------------------------------------------------------------------------
// Graceful shutdown — release pool connections on process exit signals
// Graceful pool draining — called by the server-level shutdown handler
// ---------------------------------------------------------------------------
async function gracefulShutdown(signal: string): Promise<void> {
export async function drainDatabasePool(signal = "shutdown"): Promise<void> {
console.log(`[POOL] Received ${signal}. Draining connection pool...`);
stopPoolHealthCheck();
try {
await prisma.$disconnect();
await pool.end();
console.log("[POOL] Connection pool drained successfully.");
} catch (err: any) {
console.error("[POOL] Error during pool shutdown:", err.message);
}
process.exit(0);
await prisma.$disconnect();
await pool.end();
console.log("[POOL] Connection pool drained successfully.");
}

process.on("SIGTERM", () => gracefulShutdown("SIGTERM"));
process.on("SIGINT", () => gracefulShutdown("SIGINT"));
79 changes: 60 additions & 19 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import express, { Express, Request, Response, NextFunction } from "express";
import type { Server } from "node:http";
import cors from "cors";
import cookieParser from "cookie-parser";
import crypto from "crypto";
import dotenv from "dotenv";
import { prisma, connectWithRetry, startPoolHealthCheck } from "./config/db";
import {
connectWithRetry,
drainDatabasePool,
pool,
startPoolHealthCheck,
stopPoolHealthCheck,
} from "./config/db";
import { trace } from "./config/tracing";
import { intakeRateLimit } from "./middleware/intakeRateLimit";
import { sqlInjectionGuard } from "./middleware/sanitize";
import { tracingMiddleware } from "./utils/tracing";
import { metricsMiddleware } from "./middleware/metrics";
import { createMetricsRouter, updatePoolMetrics } from "./utils/metrics";
import { closeHttpServer, createGracefulShutdownHandler } from "./utils/graceful-shutdown";
import authRoutes from "./routes/auth";
import jobsRoutes from "./routes/jobs";
import disputesRoutes from "./routes/disputes";
Expand All @@ -20,7 +28,6 @@ import uploadsRoutes from "./routes/uploads";
import bulkRoutes from "./routes/bulk";
import poolRoutes from "./routes/pool";
import stateRoutes from "./routes/state";
import { pool } from "./config/db";
import { startStorageCleanup, stopStorageCleanup } from "./utils/storage-cleanup";
import { startNonceCleanup, stopNonceCleanup } from "./utils/nonce-cleanup";

Expand All @@ -31,6 +38,16 @@ const port = process.env.PORT || 3001;
const logger = trace.getLogger("server");
const isProduction = process.env.NODE_ENV === "production";
const CSRF_COOKIE_NAME = "lance-csrf-token";
let isShuttingDown = false;
let server: Server | null = null;
let poolMetricsInterval: NodeJS.Timeout | null = null;

function positiveIntEnv(name: string, fallback: number): number {
const parsed = Number.parseInt(process.env[name] || String(fallback), 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}

const SHUTDOWN_TIMEOUT_MS = positiveIntEnv("SHUTDOWN_TIMEOUT_MS", 10_000);

// Enable CORS for frontend requests with credentials support
const FRONTEND_URL = process.env.FRONTEND_URL || "http://localhost:3000";
Expand Down Expand Up @@ -76,6 +93,19 @@ app.get("/api/v1/auth/csrf", (req: Request, res: Response) => {
res.json({ csrfToken });
});

app.use((req: Request, res: Response, next: NextFunction) => {
if (!isShuttingDown) {
return next();
}

logger.warn("Request rejected during graceful shutdown", {
method: req.method,
path: req.path,
});
res.setHeader("Connection", "close");
return res.status(503).json({ error: "Server is shutting down" });
});

app.use(csrfMiddleware);
app.use(tracingMiddleware); // Global request tracing and diagnostics
app.use(intakeRateLimit);
Expand Down Expand Up @@ -148,23 +178,34 @@ app.get("/health", async (req: Request, res: Response) => {
}
});

// Graceful shutdown handler
process.on("SIGTERM", async () => {
logger.info("SIGTERM received, shutting down gracefully");
stopStorageCleanup();
stopNonceCleanup();
try {
await prisma.$disconnect();
logger.info("Database connection closed");
process.exit(0);
} catch (error) {
logger.error("Error during shutdown", {
error: error instanceof Error ? error.message : String(error),
});
process.exit(1);
}
const shutdown = createGracefulShutdownHandler({
logger,
timeoutMs: SHUTDOWN_TIMEOUT_MS,
markShuttingDown: () => {
isShuttingDown = true;
},
closeServer: () => closeHttpServer(server),
stopBackgroundTasks: [
() => {
stopStorageCleanup();
stopNonceCleanup();
stopPoolHealthCheck();
if (poolMetricsInterval) {
clearInterval(poolMetricsInterval);
poolMetricsInterval = null;
}
},
],
drainDatabase: drainDatabasePool,
exit: (code) => process.exit(code),
});

for (const signal of ["SIGINT", "SIGTERM"] as NodeJS.Signals[]) {
process.once(signal, () => {
void shutdown(signal);
});
}

// ---------------------------------------------------------------------------
// Start the server — validate the DB connection with retry backoff first,
// then kick off background pool health-checking.
Expand All @@ -175,10 +216,10 @@ async function bootstrap(): Promise<void> {
startPoolHealthCheck();
startStorageCleanup();
startNonceCleanup();
app.listen(port, () => {
server = app.listen(port, () => {
console.log(`⚡️[server]: Server is running at http://localhost:${port}`);
// Update pool metrics periodically so the Prometheus scrape has fresh data
setInterval(() => {
poolMetricsInterval = setInterval(() => {
updatePoolMetrics(pool.totalCount, pool.idleCount, pool.waitingCount);
}, 15_000).unref();
});
Expand Down
2 changes: 1 addition & 1 deletion backend/src/routes/pool-enhanced.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ router.get("/health", async (req: Request, res: Response) => {
const uptime = stats.uptimeSeconds || 0;

// Determine overall health status
const isPrimary Healthy = stats.primaryStatus === "healthy";
const isPrimaryHealthy = stats.primaryStatus === "healthy";
const hasHealthyReplicas = (stats.replicaStatuses || []).some(
(s: string) => s === "healthy"
);
Expand Down
108 changes: 108 additions & 0 deletions backend/src/utils/graceful-shutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import type { Server } from "node:http";

export type ShutdownLogger = {
info(message: string, context?: Record<string, unknown>): void;
warn(message: string, context?: Record<string, unknown>): void;
error(message: string, context?: Record<string, unknown>): void;
};

export interface GracefulShutdownOptions {
logger: ShutdownLogger;
timeoutMs: number;
markShuttingDown?: () => void;
closeServer?: () => Promise<void>;
stopBackgroundTasks?: Array<() => void | Promise<void>>;
drainDatabase?: (signal: NodeJS.Signals) => Promise<void>;
exit?: (code: number) => void;
}

function describeError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

async function withTimeout<T>(
work: Promise<T>,
timeoutMs: number,
signal: NodeJS.Signals
): Promise<T> {
let timeout: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
timeout = setTimeout(() => {
reject(new Error(`Graceful shutdown timed out after ${timeoutMs}ms for ${signal}`));
}, timeoutMs);
timeout.unref();
});

try {
return await Promise.race([work, timeoutPromise]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}

export function closeHttpServer(server: Server | null | undefined): Promise<void> {
if (!server) {
return Promise.resolve();
}

const serverWithIdleClose = server as Server & {
closeIdleConnections?: () => void;
};

return new Promise((resolve, reject) => {
server.close((error?: Error) => {
if (error) {
reject(error);
return;
}
resolve();
});

serverWithIdleClose.closeIdleConnections?.();
});
}

export function createGracefulShutdownHandler(options: GracefulShutdownOptions) {
let shuttingDown = false;

return async function gracefulShutdown(signal: NodeJS.Signals): Promise<void> {
if (shuttingDown) {
options.logger.warn("Shutdown already in progress; ignoring duplicate signal", { signal });
return;
}

shuttingDown = true;
options.markShuttingDown?.();
options.logger.info("Shutdown signal received; draining API resources", {
signal,
timeoutMs: options.timeoutMs,
});

try {
await withTimeout(
(async () => {
await options.closeServer?.();

for (const stopTask of options.stopBackgroundTasks ?? []) {
await stopTask();
}

await options.drainDatabase?.(signal);
})(),
options.timeoutMs,
signal
);

options.logger.info("Graceful shutdown completed", { signal });
options.exit?.(0);
} catch (error) {
options.logger.error("Graceful shutdown failed", {
signal,
error: describeError(error),
});
options.exit?.(1);
}
};
}
Loading
Loading