Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions graphile/graphile-llm/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
},
"dependencies": {
"@agentic-kit/ollama": "^2.0.0",
"@constructive-io/express-context": "workspace:^",
"graphile-cache": "workspace:^"
},
"peerDependencies": {
Expand Down
177 changes: 58 additions & 119 deletions graphile/graphile-llm/src/metering.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
* resolved from `billing_module` metaschema and cached by `config-cache.ts`.
*/

import type { BillingConfig, InferenceLogConfig,PgClient } from './config-cache';
import type { BillingClient } from '@constructive-io/express-context';
import { createBillingClient } from '@constructive-io/express-context';

import type { BillingConfig, InferenceLogConfig, PgClient } from './config-cache';
import type { ChatFunction, ChatMessage, ChatOptions, EmbedderFunction } from './types';

// ─── Types ──────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -77,52 +80,22 @@ export interface MeterResult<T> {
latencyMs: number;
}

// ─── Billing SQL Helpers ────────────────────────────────────────────────────
// ─── Billing Client Adapter ─────────────────────────────────────────────────

/**
* Check if the entity has sufficient quota for the requested amount.
* Returns true if the call is allowed, false if quota is exceeded.
*
* Gracefully returns true if the billing function doesn't exist or errors —
* metering is opt-in, so missing infrastructure means "allow".
* Create a BillingClient from a MeteringContext by adapting Graphile's 2-arg
* withPgClient(pgSettings, fn) to the express-context 1-arg withPgClient(fn).
*/
async function checkQuota(
pgClient: PgClient,
billing: BillingConfig,
entityId: string,
meterSlug: string,
amount: number
): Promise<boolean> {
try {
const sql = `SELECT "${billing.privateSchema}"."${billing.checkBillingQuotaFunction}"($1, $2::uuid, $3) AS allowed`;
const result = await pgClient.query(sql, [meterSlug, entityId, amount]);
return result.rows[0]?.allowed !== false;
} catch (e: unknown) {
const message = e instanceof Error ? e.message : String(e);
console.warn(`[graphile-llm] check_billing_quota failed (allowing): ${message}`);
return true;
}
}

/**
* Record usage after a successful call.
* Gracefully skips if the billing function doesn't exist or errors.
*/
async function recordUsage(
pgClient: PgClient,
billing: BillingConfig,
entityId: string,
meterSlug: string,
amount: number,
metadata: Record<string, unknown>
): Promise<void> {
try {
const sql = `SELECT "${billing.privateSchema}"."${billing.recordUsageFunction}"($1, $2::uuid, $3, $4::jsonb)`;
await pgClient.query(sql, [meterSlug, entityId, amount, JSON.stringify(metadata)]);
} catch (e: unknown) {
const message = e instanceof Error ? e.message : String(e);
console.warn(`[graphile-llm] record_usage failed (non-fatal): ${message}`);
}
function billingClientFromCtx(ctx: MeteringContext): BillingClient {
const adaptedWithPgClient = <T>(fn: (client: any) => Promise<T>): Promise<T> =>
ctx.withPgClient(ctx.pgSettings, fn as any) as unknown as Promise<T>;

return createBillingClient(
adaptedWithPgClient,
ctx.entityId,
ctx.billing,
ctx.inferenceLog
);
}

// ─── Inference Usage Log ────────────────────────────────────────────────────
Expand Down Expand Up @@ -152,52 +125,34 @@ export interface InferenceLogEntry {

/**
* Write a row to the usage_log_inference table.
* Gracefully skips if the inference_log_module is not provisioned.
*
* TODO: Also write to child (generated) database when dual-write is needed.
* Delegates to the shared BillingClient from express-context.
*/
export async function logInferenceUsage(
ctx: MeteringContext,
entry: InferenceLogEntry
): Promise<void> {
if (!ctx.inferenceLog) return;

const { schema, tableName } = ctx.inferenceLog;
const sql = `INSERT INTO "${schema}"."${tableName}" (
database_id, entity_id, actor_id,
model, provider, service, operation,
input_tokens, output_tokens, total_tokens,
cache_read_tokens, cache_write_tokens,
latency_ms, rag_enabled, chunks_retrieved,
embedding_model, embedding_latency_ms,
status, error_type, raw_usage
) VALUES (
$1, $2, $3,
$4, $5, $6, $7,
$8, $9, $10,
$11, $12,
$13, $14, $15,
$16, $17,
$18, $19, $20
)`;

try {
await ctx.withPgClient(ctx.pgSettings, async (pgClient) => {
await pgClient.query(sql, [
entry.databaseId, entry.entityId, entry.actorId,
entry.model, entry.provider, entry.service, entry.operation,
entry.inputTokens, entry.outputTokens, entry.totalTokens,
entry.cacheReadTokens, entry.cacheWriteTokens,
entry.latencyMs, entry.ragEnabled, entry.chunksRetrieved,
entry.embeddingModel, entry.embeddingLatencyMs,
entry.status, entry.errorType,
entry.rawUsage ? JSON.stringify(entry.rawUsage) : null
]);
});
} catch (e: unknown) {
const message = e instanceof Error ? e.message : String(e);
console.warn(`[graphile-llm] inference log INSERT failed (non-fatal): ${message}`);
}
const client = billingClientFromCtx(ctx);
return client.logInference({
entityId: entry.entityId,
actorId: entry.actorId,
model: entry.model,
provider: entry.provider,
service: entry.service,
operation: entry.operation,
inputTokens: entry.inputTokens,
outputTokens: entry.outputTokens,
totalTokens: entry.totalTokens,
latencyMs: entry.latencyMs,
status: entry.status,
cacheReadTokens: entry.cacheReadTokens,
cacheWriteTokens: entry.cacheWriteTokens,
ragEnabled: entry.ragEnabled,
chunksRetrieved: entry.chunksRetrieved,
embeddingModel: entry.embeddingModel,
embeddingLatencyMs: entry.embeddingLatencyMs,
errorType: entry.errorType,
rawUsage: entry.rawUsage
});
}

// ─── Metered Embedder ───────────────────────────────────────────────────────
Expand Down Expand Up @@ -249,14 +204,8 @@ export async function meteredEmbed(
}

// Pre-check: can this entity afford this call?
let allowed = true;
try {
await ctx.withPgClient(ctx.pgSettings, async (pgClient) => {
allowed = await checkQuota(pgClient, ctx.billing, ctx.entityId, meterSlug, 1);
});
} catch {
allowed = true;
}
const billingClient = billingClientFromCtx(ctx);
const allowed = await billingClient.checkQuota(meterSlug);

if (!allowed) {
logInferenceUsage(ctx, {
Expand Down Expand Up @@ -294,14 +243,12 @@ export async function meteredEmbed(
const { embedding, promptTokens } = await embedder(text);
const latencyMs = Date.now() - startTime;

ctx.withPgClient(ctx.pgSettings, async (pgClient) => {
await recordUsage(pgClient, ctx.billing, ctx.entityId, meterSlug, promptTokens, {
request_id: ctx.requestId,
input_chars: text.length,
prompt_tokens: promptTokens,
dims: embedding.length,
latency_ms: latencyMs
});
billingClient.recordUsage(meterSlug, promptTokens, {
request_id: ctx.requestId,
input_chars: text.length,
prompt_tokens: promptTokens,
dims: embedding.length,
latency_ms: latencyMs
}).catch(() => {});

// Log to inference usage table
Expand Down Expand Up @@ -382,14 +329,8 @@ export async function meteredChat(
}

// Pre-check: can this entity afford this call?
let allowed = true;
try {
await ctx.withPgClient(ctx.pgSettings, async (pgClient) => {
allowed = await checkQuota(pgClient, ctx.billing, ctx.entityId, meterSlug, 1);
});
} catch {
allowed = true;
}
const billingClient = billingClientFromCtx(ctx);
const allowed = await billingClient.checkQuota(meterSlug);

if (!allowed) {
const estimatedInputTokens = Math.ceil(messages.reduce((sum, m) => sum + m.content.length, 0) / 4);
Expand Down Expand Up @@ -429,16 +370,14 @@ export async function meteredChat(
const latencyMs = Date.now() - startTime;
const usage = chatResult.usage;

ctx.withPgClient(ctx.pgSettings, async (pgClient) => {
await recordUsage(pgClient, ctx.billing, ctx.entityId, meterSlug, usage.totalTokens, {
request_id: ctx.requestId,
input_tokens: usage.input,
output_tokens: usage.output,
cache_read_tokens: usage.cacheRead,
cache_write_tokens: usage.cacheWrite,
messages_count: messages.length,
latency_ms: latencyMs
});
billingClient.recordUsage(meterSlug, usage.totalTokens, {
request_id: ctx.requestId,
input_tokens: usage.input,
output_tokens: usage.output,
cache_read_tokens: usage.cacheRead,
cache_write_tokens: usage.cacheWrite,
messages_count: messages.length,
latency_ms: latencyMs
}).catch(() => {});

// Log to inference usage table with real provider token counts
Expand Down
95 changes: 0 additions & 95 deletions packages/agentic-server/src/billing.ts

This file was deleted.

17 changes: 9 additions & 8 deletions packages/agentic-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
* ```
*/

export { createAgenticRouter } from './router';
export { getEnvOptions } from './env';
export type { EnvOptions, ProviderConfig } from './env';
export { getAgentDiscovery, getDatabaseConfig, clearAgentCache, clearConfigCache } from './discovery';
export { TtlCache } from './cache';
export type {
AgentDiscovery,
AgentTableInfo,
BillingConfig,
DatabaseConfig,
InferenceLogConfig,
InferenceLogConfig
} from './discovery';
export { checkQuota, recordUsage, logInference } from './billing';
export type { InferenceLogEntry } from './billing';
export { TtlCache } from './cache';
export { clearAgentCache, clearConfigCache,getAgentDiscovery, getDatabaseConfig } from './discovery';
export type { EnvOptions, ProviderConfig } from './env';
export { getEnvOptions } from './env';
export { createAgenticRouter } from './router';

// Re-export billing client from express-context for convenience
export type { BillingClient, InferenceLogEntry } from '@constructive-io/express-context';
Loading
Loading