Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 47 additions & 36 deletions apps/sim/lib/billing/core/usage-log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,17 @@ export interface RecordUsageParams {
}

/**
* Records usage in a single atomic transaction.
* Records usage by inserting into usage_log and incrementing userStats counters.
*
* Inserts all entries into usage_log and updates userStats counters
* (totalCost, currentPeriodCost, lastActive) within one Postgres transaction.
* The total cost added to userStats is derived from summing entry costs,
* ensuring usage_log and currentPeriodCost can never drift apart.
* The two writes are intentionally not wrapped in a transaction: under high
* concurrency for the same userId, holding BEGIN/COMMIT across the user_stats
* row-lock wait pins pgbouncer connections and exhausts the pool.
*
* If billing is disabled, total cost is zero, or no entries have positive cost,
* this function returns early without writing anything.
* usage_log is the source of truth and the INSERT propagates errors to the
* caller. The userStats UPDATE is best-effort: failures (and missing-row
* cases) are logged as warnings and swallowed. Counter drift is acceptable
* here — the long-term plan is to derive counters from usage_log directly.
* Any drift warning in logs is a signal that needs investigation.
*/
export async function recordUsage(params: RecordUsageParams): Promise<void> {
if (!isBillingEnabled) {
Expand All @@ -103,47 +105,56 @@ export async function recordUsage(params: RecordUsageParams): Promise<void> {
? Object.fromEntries(Object.entries(additionalStats).filter(([k]) => !RESERVED_KEYS.has(k)))
: undefined

await db.transaction(async (tx) => {
if (validEntries.length > 0) {
await tx.insert(usageLog).values(
validEntries.map((entry) => ({
id: generateId(),
userId,
category: entry.category,
source: entry.source,
description: entry.description,
metadata: entry.metadata ?? null,
cost: entry.cost.toString(),
workspaceId: workspaceId ?? null,
workflowId: workflowId ?? null,
executionId: executionId ?? null,
}))
)
}
if (validEntries.length > 0) {
await db.insert(usageLog).values(
validEntries.map((entry) => ({
id: generateId(),
userId,
category: entry.category,
source: entry.source,
description: entry.description,
metadata: entry.metadata ?? null,
cost: entry.cost.toString(),
workspaceId: workspaceId ?? null,
workflowId: workflowId ?? null,
executionId: executionId ?? null,
}))
)
}

const updateFields: Record<string, SQL | Date> = {
lastActive: new Date(),
...(totalCost > 0 && {
totalCost: sql`total_cost + ${totalCost}`,
currentPeriodCost: sql`current_period_cost + ${totalCost}`,
}),
...safeStats,
}
const updateFields: Record<string, SQL | Date> = {
lastActive: new Date(),
...(totalCost > 0 && {
totalCost: sql`total_cost + ${totalCost}`,
currentPeriodCost: sql`current_period_cost + ${totalCost}`,
}),
...safeStats,
}

const result = await tx
try {
const result = await db
.update(userStats)
.set(updateFields)
.where(eq(userStats.userId, userId))
.returning({ userId: userStats.userId })

if (result.length === 0) {
logger.warn('recordUsage: userStats row not found, transaction will roll back', {
logger.warn('recordUsage: userStats row not found; counter increment dropped', {
userId,
totalCost,
hadEntries: validEntries.length > 0,
additionalStatsKeys: safeStats ? Object.keys(safeStats) : [],
})
throw new Error(`userStats row not found for userId: ${userId}`)
}
})
} catch (error) {
logger.warn('recordUsage: userStats update failed; counter increment dropped', {
error: toError(error).message,
userId,
totalCost,
hadEntries: validEntries.length > 0,
additionalStatsKeys: safeStats ? Object.keys(safeStats) : [],
})
}
Comment thread
TheodoreSpeaks marked this conversation as resolved.

logger.debug('Recorded usage', {
userId,
Expand Down
Loading