diff --git a/functions/process-file-embedding/__tests__/handler.test.ts b/functions/process-file-embedding/__tests__/handler.test.ts new file mode 100644 index 000000000..c23a598b0 --- /dev/null +++ b/functions/process-file-embedding/__tests__/handler.test.ts @@ -0,0 +1,287 @@ +import { createMockContext } from '../../../tests/helpers/mock-context'; + +jest.mock('@aws-sdk/client-s3', () => ({ + S3Client: jest.fn().mockImplementation(() => ({ + send: jest.fn() + })), + GetObjectCommand: jest.fn() +})); + +jest.mock('graphile-llm', () => ({ + buildEmbedderFromEnv: jest.fn() +})); + +const createMockS3Response = (content: string) => ({ + Body: { + async *[Symbol.asyncIterator]() { + yield Buffer.from(content); + } + } +}); + +const loadHandler = () => { + const mod = require('../handler'); + return mod.default ?? mod; +}; + +describe('process-file-embedding handler', () => { + beforeEach(() => { + jest.resetModules(); + process.env.BUCKET_PROVIDER = 'minio'; + process.env.AWS_REGION = 'us-east-1'; + process.env.S3_ENDPOINT = 'http://localhost:9000'; + process.env.AWS_ACCESS_KEY = 'minioadmin'; + process.env.AWS_SECRET_KEY = 'minioadmin'; + }); + + afterEach(() => { + delete process.env.BUCKET_PROVIDER; + delete process.env.AWS_REGION; + delete process.env.S3_ENDPOINT; + delete process.env.AWS_ACCESS_KEY; + delete process.env.AWS_SECRET_KEY; + delete process.env.PROCESS_FILE_EMBEDDING_DRY_RUN; + }); + + describe('validation', () => { + it('throws on missing file_id', async () => { + const handler = loadHandler(); + await expect( + handler( + { key: 'test.txt', mime_type: 'text/plain', bucket_id: 'bucket' }, + createMockContext() + ) + ).rejects.toThrow('Missing required fields'); + }); + + it('throws on missing key', async () => { + const handler = loadHandler(); + await expect( + handler( + { file_id: '123', mime_type: 'text/plain', bucket_id: 'bucket' }, + createMockContext() + ) + ).rejects.toThrow('Missing required fields'); + }); + + it('throws on missing bucket_id', async () => { + const handler = loadHandler(); + await expect( + handler( + { file_id: '123', key: 'test.txt', mime_type: 'text/plain' }, + createMockContext() + ) + ).rejects.toThrow('Missing required fields'); + }); + }); + + describe('extraction mode delegation', () => { + it('delegates to extraction pipeline when extraction config present', async () => { + const handler = loadHandler(); + + const result = await handler( + { + file_id: '123', + key: 'document.pdf', + mime_type: 'application/pdf', + bucket_id: 'bucket', + extraction: { + task_identifier: 'extract:process_document_extraction' + } + }, + createMockContext({ env: { PROCESS_FILE_EMBEDDING_DRY_RUN: 'true' } }) + ); + + expect(result).toEqual({ + complete: true, + dryRun: true, + delegated: true, + task_identifier: 'extract:process_document_extraction' + }); + }); + + it('uses default extraction task_identifier when not specified', async () => { + const handler = loadHandler(); + + const result = await handler( + { + file_id: '123', + key: 'document.pdf', + mime_type: 'application/pdf', + bucket_id: 'bucket', + extraction: {} + }, + createMockContext({ env: { PROCESS_FILE_EMBEDDING_DRY_RUN: 'true' } }) + ); + + expect(result.task_identifier).toBe('extract:process_document_extraction'); + }); + }); + + describe('direct mode embedding', () => { + beforeEach(() => { + const { S3Client } = require('@aws-sdk/client-s3'); + S3Client.mockImplementation(() => ({ + send: jest.fn().mockResolvedValue(createMockS3Response('Hello world')) + })); + + const { buildEmbedderFromEnv } = require('graphile-llm'); + buildEmbedderFromEnv.mockReturnValue( + jest.fn().mockResolvedValue({ + embedding: [0.1, 0.2, 0.3], + promptTokens: 2 + }) + ); + }); + + it('processes text file and updates embedding', async () => { + const handler = loadHandler(); + const mockContext = createMockContext(); + + const result = await handler( + { + file_id: '123', + key: 'test.txt', + mime_type: 'text/plain', + bucket_id: 'bucket' + }, + mockContext + ); + + expect(result).toEqual({ complete: true, dimensions: 3 }); + expect(mockContext.client.request).toHaveBeenCalled(); + }); + + it('skips database update in dry-run mode', async () => { + const handler = loadHandler(); + const mockContext = createMockContext({ + env: { PROCESS_FILE_EMBEDDING_DRY_RUN: 'true' } + }); + + const result = await handler( + { + file_id: '123', + key: 'test.txt', + mime_type: 'text/plain', + bucket_id: 'bucket' + }, + mockContext + ); + + expect(result).toEqual({ complete: true, dryRun: true, dimensions: 3 }); + expect(mockContext.client.request).not.toHaveBeenCalled(); + }); + + it('throws when embedder not configured', async () => { + const { buildEmbedderFromEnv } = require('graphile-llm'); + buildEmbedderFromEnv.mockReturnValue(null); + + const handler = loadHandler(); + + await expect( + handler( + { + file_id: '123', + key: 'test.txt', + mime_type: 'text/plain', + bucket_id: 'bucket' + }, + createMockContext() + ) + ).rejects.toThrow('No embedder configured'); + }); + + it('throws for unsupported mime types in direct mode', async () => { + const handler = loadHandler(); + + await expect( + handler( + { + file_id: '123', + key: 'image.png', + mime_type: 'image/png', + bucket_id: 'bucket' + }, + createMockContext() + ) + ).rejects.toThrow('Direct embedding for image/png not yet supported'); + }); + + it('throws when file content is empty', async () => { + const { S3Client } = require('@aws-sdk/client-s3'); + S3Client.mockImplementation(() => ({ + send: jest.fn().mockResolvedValue(createMockS3Response(' ')) + })); + + const handler = loadHandler(); + + await expect( + handler( + { + file_id: '123', + key: 'empty.txt', + mime_type: 'text/plain', + bucket_id: 'bucket' + }, + createMockContext() + ) + ).rejects.toThrow('File content is empty'); + }); + }); + + describe('mime type detection', () => { + beforeEach(() => { + const { S3Client } = require('@aws-sdk/client-s3'); + S3Client.mockImplementation(() => ({ + send: jest.fn().mockResolvedValue(createMockS3Response('{"key":"value"}')) + })); + + const { buildEmbedderFromEnv } = require('graphile-llm'); + buildEmbedderFromEnv.mockReturnValue( + jest.fn().mockResolvedValue({ embedding: [0.1], promptTokens: 1 }) + ); + }); + + it('accepts application/json as text', async () => { + const handler = loadHandler(); + const result = await handler( + { + file_id: '123', + key: 'data.json', + mime_type: 'application/json', + bucket_id: 'bucket' + }, + createMockContext() + ); + expect(result.complete).toBe(true); + }); + + it('accepts application/xml as text', async () => { + const handler = loadHandler(); + const result = await handler( + { + file_id: '123', + key: 'data.xml', + mime_type: 'application/xml', + bucket_id: 'bucket' + }, + createMockContext() + ); + expect(result.complete).toBe(true); + }); + + it('accepts application/javascript as text', async () => { + const handler = loadHandler(); + const result = await handler( + { + file_id: '123', + key: 'script.js', + mime_type: 'application/javascript', + bucket_id: 'bucket' + }, + createMockContext() + ); + expect(result.complete).toBe(true); + }); + }); +}); diff --git a/functions/process-file-embedding/handler.json b/functions/process-file-embedding/handler.json new file mode 100644 index 000000000..cb121db49 --- /dev/null +++ b/functions/process-file-embedding/handler.json @@ -0,0 +1,15 @@ +{ + "name": "process-file-embedding", + "version": "1.0.0", + "type": "node-graphql", + "port": 8084, + "taskIdentifier": "embed:process_file_embedding", + "description": "Processes file uploads into vector embeddings (direct mode: whole file to single vector)", + "dependencies": { + "@aws-sdk/client-s3": "^3.1060.0", + "@pgpmjs/env": "^2.15.3", + "@pgpmjs/logger": "^2.4.3", + "graphile-llm": "^0.11.0", + "pg": "^8.16.0" + } +} diff --git a/functions/process-file-embedding/handler.ts b/functions/process-file-embedding/handler.ts new file mode 100644 index 000000000..c45b00a86 --- /dev/null +++ b/functions/process-file-embedding/handler.ts @@ -0,0 +1,366 @@ +import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3'; +import type { FunctionHandler } from '@constructive-io/fn-runtime'; +import { parseEnvBoolean } from '@pgpmjs/env'; +import { createLogger } from '@pgpmjs/logger'; +import { buildEmbedderFromEnv } from 'graphile-llm'; +import { Client as PgClient } from 'pg'; + +type ProcessFileEmbeddingPayload = { + file_id: string; + key: string; + mime_type: string; + bucket_id: string; + schema?: string; + table?: string; + extraction?: { + task_identifier?: string; + text_field?: string; + metadata_field?: string; + status_field?: string; + }; +}; + +const logger = createLogger('process-file-embedding'); + +const createPgClient = (): PgClient => { + return new PgClient({ + host: process.env.PGHOST || 'localhost', + port: Number(process.env.PGPORT) || 5432, + user: process.env.PGUSER || 'postgres', + password: process.env.PGPASSWORD || 'password', + database: process.env.PGDATABASE || 'constructive' + }); +}; + +const updateFileEmbedding = async ( + fileId: string, + embedding: number[], + tableName: string = 'app.files', + embeddingColumn: string = 'embedding' +): Promise => { + const pg = createPgClient(); + try { + await pg.connect(); + const vectorStr = `[${embedding.join(',')}]`; + await pg.query( + `UPDATE ${tableName} SET ${embeddingColumn} = $1::vector WHERE id = $2`, + [vectorStr, fileId] + ); + } finally { + await pg.end(); + } +}; + +const enqueueJob = async ( + taskIdentifier: string, + payload: Record, + databaseId?: string +): Promise => { + const pg = createPgClient(); + try { + await pg.connect(); + if (databaseId) { + await pg.query(`SELECT set_config('jwt.claims.database_id', $1, true)`, [ + databaseId + ]); + } + await pg.query(`SELECT app_jobs.add_job($1::text, $2::json)`, [ + taskIdentifier, + JSON.stringify(payload) + ]); + } finally { + await pg.end(); + } +}; + +const createS3ClientFromEnv = (): S3Client => { + const provider = process.env.BUCKET_PROVIDER || 'minio'; + const pathStyleProviders = new Set(['minio', 'r2', 'gcs']); + + return new S3Client({ + region: process.env.AWS_REGION || 'us-east-1', + endpoint: process.env.S3_ENDPOINT || process.env.CDN_ENDPOINT, + forcePathStyle: pathStyleProviders.has(provider), + credentials: { + accessKeyId: process.env.AWS_ACCESS_KEY_ID || process.env.AWS_ACCESS_KEY || '', + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || process.env.AWS_SECRET_KEY || '' + } + }); +}; + +const streamToString = async ( + stream: NodeJS.ReadableStream +): Promise => { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + return Buffer.concat(chunks).toString('utf-8'); +}; + +const isTextMimeType = (mimeType: string): boolean => { + return ( + mimeType.startsWith('text/') || + mimeType === 'application/json' || + mimeType === 'application/xml' || + mimeType === 'application/javascript' + ); +}; + +const getBucketKey = async ( + bucketId: string, + schema: string +): Promise => { + const pg = createPgClient(); + try { + await pg.connect(); + const result = await pg.query( + `SELECT key FROM "${schema}".app_buckets WHERE id = $1`, + [bucketId] + ); + if (result.rows.length === 0) { + throw new Error(`Bucket not found: ${bucketId}`); + } + return result.rows[0].key; + } finally { + await pg.end(); + } +}; + +/** + * Resolve bucket_id to full S3 bucket name. + * S3 bucket name format: {prefix}-{bucketType}-{databaseId} + * e.g., "test-bucket-public-019e9171-8c95-7e7f-99e5-aa4784b86f93" + * + * Always derive database_id from schema (more reliable than job context). + */ +const resolveS3BucketName = async ( + bucketId: string, + schema: string +): Promise => { + const pg = createPgClient(); + try { + await pg.connect(); + + // Get bucket type from app_buckets (S3 bucket name uses type, not key) + const bucketResult = await pg.query( + `SELECT type FROM "${schema}".app_buckets WHERE id = $1`, + [bucketId] + ); + if (bucketResult.rows.length === 0) { + throw new Error(`Bucket not found: ${bucketId}`); + } + const bucketType = bucketResult.rows[0].type; + + // Get database_id from schema (always reliable) + const schemaResult = await pg.query( + `SELECT database_id FROM metaschema_public.schema WHERE schema_name = $1`, + [schema] + ); + if (schemaResult.rows.length === 0) { + throw new Error(`Schema not found in metaschema: ${schema}`); + } + const databaseId = schemaResult.rows[0].database_id; + + // Build S3 bucket name: {prefix}-{bucketType}-{databaseId} + const prefix = process.env.CDN_BUCKET_NAME || process.env.BUCKET_NAME || 'test-bucket'; + return `${prefix}-${bucketType}-${databaseId}`; + } finally { + await pg.end(); + } +}; + +const downloadFile = async ( + s3: S3Client, + bucket: string, + key: string +): Promise => { + const command = new GetObjectCommand({ Bucket: bucket, Key: key }); + const response = await s3.send(command); + + if (!response.Body) { + throw new Error('Empty response body from S3'); + } + + const chunks: Buffer[] = []; + for await (const chunk of response.Body as AsyncIterable) { + chunks.push(Buffer.from(chunk)); + } + return Buffer.concat(chunks); +}; + +const processDirectEmbedding = async ( + fileContent: Buffer, + mimeType: string +): Promise => { + const embedder = buildEmbedderFromEnv(); + if (!embedder) { + throw new Error( + 'No embedder configured. Set EMBEDDER_PROVIDER, EMBEDDER_MODEL, EMBEDDER_BASE_URL' + ); + } + + if (!isTextMimeType(mimeType)) { + throw new Error( + `Direct embedding for ${mimeType} not yet supported. Use extraction mode for non-text files.` + ); + } + + const text = fileContent.toString('utf-8'); + if (!text.trim()) { + throw new Error('File content is empty'); + } + + const result = await embedder(text); + logger.info('Generated embedding', { + dimensions: result.embedding.length, + promptTokens: result.promptTokens + }); + + return result.embedding; +}; + +const handler: FunctionHandler = async ( + params, + context +) => { + const { client, job, log, env } = context; + const isDryRun = parseEnvBoolean(env.PROCESS_FILE_EMBEDDING_DRY_RUN) ?? false; + + const { file_id, key, mime_type, bucket_id, schema, table, extraction } = params; + + if (!file_id || !key || !bucket_id) { + throw new Error('Missing required fields: file_id, key, or bucket_id'); + } + + log.info('[process-file-embedding] Processing file', { + file_id, + key, + mime_type, + bucket_id, + schema, + table, + hasExtraction: Boolean(extraction) + }); + + if (extraction) { + const textField = extraction.text_field || 'extracted_text'; + const metadataField = extraction.metadata_field || 'extracted_metadata'; + + log.info('[process-file-embedding] Extraction mode - extracting text', { + text_field: textField, + metadata_field: metadataField + }); + + if (isDryRun) { + log.info('[process-file-embedding] DRY RUN - skipping extraction'); + return { + complete: true, + dryRun: true, + mode: 'extraction' + }; + } + + const s3 = createS3ClientFromEnv(); + + // Resolve bucket_id to full S3 bucket name (derive database_id from schema) + let bucketName = bucket_id; + const storageSchema = schema || 'storage_public'; + try { + bucketName = await resolveS3BucketName(bucket_id, storageSchema); + log.info('[process-file-embedding] Resolved S3 bucket name', { bucket_id, bucketName, schema: storageSchema }); + } catch (err) { + log.error('[process-file-embedding] Could not resolve bucket', { error: (err as Error).message }); + } + + const fileContent = await downloadFile(s3, bucketName, key); + log.info('[process-file-embedding] Downloaded file for extraction', { size: fileContent.length }); + + // Extract text (for now, just use the raw content for text files) + let extractedText = ''; + const metadata: Record = { mime_type, size: fileContent.length }; + + if (isTextMimeType(mime_type)) { + extractedText = fileContent.toString('utf-8'); + } else { + // TODO: Add PDF extraction, etc. + extractedText = `[Binary file: ${mime_type}, ${fileContent.length} bytes]`; + metadata.binary = true; + } + + // Update the database + const targetSchema = schema || 'app'; + const targetTable = table || 'app_files'; + const fullTableName = `"${targetSchema}"."${targetTable}"`; + + const pg = createPgClient(); + try { + await pg.connect(); + await pg.query( + `UPDATE ${fullTableName} SET "${textField}" = $1, "${metadataField}" = $2 WHERE id = $3`, + [extractedText, JSON.stringify(metadata), file_id] + ); + log.info('[process-file-embedding] Updated extracted text', { + file_id, + textLength: extractedText.length, + table: fullTableName + }); + } finally { + await pg.end(); + } + + return { + complete: true, + mode: 'extraction', + textLength: extractedText.length + }; + } + + const s3 = createS3ClientFromEnv(); + + // Resolve bucket_id (UUID) to full S3 bucket name (derive database_id from schema) + let bucketName = bucket_id; + if (schema) { + const storageSchema = schema.replace(/-app-public$/, '-storage-public'); + try { + bucketName = await resolveS3BucketName(bucket_id, storageSchema); + log.info('[process-file-embedding] Resolved S3 bucket name', { bucket_id, bucketName, schema: storageSchema }); + } catch (err) { + log.info('[process-file-embedding] Could not resolve bucket, using bucket_id as name', { + bucket_id, + error: (err as Error).message + }); + } + } + + const fileContent = await downloadFile(s3, bucketName, key); + + log.info('[process-file-embedding] Downloaded file', { + size: fileContent.length, + mime_type + }); + + const embedding = await processDirectEmbedding(fileContent, mime_type); + + if (isDryRun) { + log.info('[process-file-embedding] DRY RUN - skipping database update', { + file_id, + embeddingDimensions: embedding.length + }); + return { complete: true, dryRun: true, dimensions: embedding.length }; + } + + // Use schema.table from payload if available, otherwise fall back to env + const tableName = schema && table ? `"${schema}"."${table}"` : (env.EMBEDDING_TABLE || 'app.files'); + const embeddingColumn = env.EMBEDDING_COLUMN || 'embedding'; + await updateFileEmbedding(file_id, embedding, tableName, embeddingColumn); + + log.info('[process-file-embedding] Updated file embedding', { + file_id, + dimensions: embedding.length + }); + + return { complete: true, dimensions: embedding.length }; +}; + +export default handler; diff --git a/scripts/dev.ts b/scripts/dev.ts index f7f6177e5..60d8f6f7d 100644 --- a/scripts/dev.ts +++ b/scripts/dev.ts @@ -63,6 +63,17 @@ const sharedEnv: Record = { LOCAL_APP_PORT: '3000', SEND_VERIFICATION_LINK_DRY_RUN: 'true', SEND_EMAIL_DRY_RUN: 'true', + // MinIO/S3 (matches docker-compose minio service) + S3_ENDPOINT: 'http://localhost:9000', + AWS_ACCESS_KEY_ID: 'minioadmin', + AWS_SECRET_ACCESS_KEY: 'minioadmin', + BUCKET_PROVIDER: 'minio', + // Ollama embeddings (requires local ollama with nomic-embed-text model) + EMBEDDER_PROVIDER: 'ollama', + EMBEDDER_MODEL: 'nomic-embed-text', + EMBEDDER_BASE_URL: 'http://localhost:11434', + // Dry run by default for embedding function + PROCESS_FILE_EMBEDDING_DRY_RUN: 'false', }; // --- Process definitions (built from manifest) --- diff --git a/skaffold.yaml b/skaffold.yaml index f0923139a..a23b8fbf8 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -51,6 +51,50 @@ profiles: namespace: constructive-functions port: 3000 localPort: 3002 + - name: process-file-embedding + build: + artifacts: + - image: constructive-functions + context: . + docker: + dockerfile: Dockerfile.dev + sync: + manual: + - src: 'functions/**/*.ts' + dest: /usr/src/app + local: + push: false + manifests: + kustomize: + paths: + - k8s/overlays/local-simple + rawYaml: + - generated/process-file-embedding/k8s/local-deployment.yaml + - generated/process-file-embedding/k8s/functions-configmap.yaml + deploy: + kubectl: + defaultNamespace: constructive-functions + portForward: + - resourceType: service + resourceName: process-file-embedding + namespace: constructive-functions + port: 80 + localPort: 8084 + - resourceType: service + resourceName: knative-job-service + namespace: constructive-functions + port: 8080 + localPort: 8080 + - resourceType: service + resourceName: postgres + namespace: constructive-functions + port: 5432 + localPort: 5432 + - resourceType: service + resourceName: constructive-server + namespace: constructive-functions + port: 3000 + localPort: 3002 - name: python-example build: artifacts: @@ -268,6 +312,7 @@ profiles: - k8s/overlays/local-simple rawYaml: - generated/example/k8s/local-deployment.yaml + - generated/process-file-embedding/k8s/local-deployment.yaml - generated/python-example/k8s/local-deployment.yaml - generated/send-email/k8s/local-deployment.yaml - generated/send-verification-link/k8s/local-deployment.yaml @@ -282,6 +327,11 @@ profiles: namespace: constructive-functions port: 80 localPort: 8083 + - resourceType: service + resourceName: process-file-embedding + namespace: constructive-functions + port: 80 + localPort: 8084 - resourceType: service resourceName: python-example namespace: constructive-functions @@ -342,6 +392,11 @@ profiles: namespace: constructive-functions port: 80 localPort: 8083 + - resourceType: service + resourceName: process-file-embedding + namespace: constructive-functions + port: 80 + localPort: 8084 - resourceType: service resourceName: python-example namespace: constructive-functions