Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions packages/app/src/inference/pf-inference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ async function main(): Promise<void> {
tx.env.allowRemoteModels = false
tx.env.localModelPath = 'pf-model:///'
tx.env.useBrowserCache = false
// ORT Runtime Web defaults to fetching its WASM / JS runtime
// from cdn.jsdelivr.net — incompatible with the offline guarantee
// + blocked by our strict CSP. Point it at our protocol's `ort/`
// subpath; main serves those files out of onnxruntime-web's dist.
// The wasm field is loosely typed via `?` — guard the assignment.
if (tx.env.backends.onnx.wasm) {
tx.env.backends.onnx.wasm.wasmPaths = 'pf-model:///ort/'
}
const built = await tx.pipeline('token-classification', PF_MODEL_ID, {
device: runtime === 'webgpu' ? 'webgpu' : 'wasm',
dtype: 'q4',
Expand Down Expand Up @@ -96,16 +104,39 @@ async function analyzeOne(
): Promise<void> {
try {
const output = await pipe(req.text, { aggregation_strategy: 'simple' })
const matches: PfMatch[] = output.map((m) => ({
// openai/privacy-filter emits `private_person`, `private_email`,
// etc. The class-mapping module operates on the short form so
// the same code works for future models without the prefix.
class: m.entity_group.toLowerCase().replace(/^private_/, ''),
value: m.word,
start: m.start,
end: m.end,
score: m.score,
}))
// findings.start_offset + end_offset are NOT NULL in SQLite, so
// a match without character anchors would roll back the entire
// session's scan transaction (taking the regex findings down with
// it). transformers.js with BPE tokenizers sometimes returns
// start/end undefined even with aggregation_strategy='simple',
// so:
// 1) keep what the tokenizer gave us when it's there
// 2) otherwise re-locate the word in the source via indexOf
// 3) only drop if even indexOf comes up empty (rare — the word
// came FROM the tokenizer over the source, so finding it
// back should be almost universal)
let searchCursor = 0
const matches: PfMatch[] = output.flatMap((m) => {
const cls = m.entity_group.toLowerCase().replace(/^private_/, '')
let start = m.start
let end = m.end
if (!Number.isFinite(start) || !Number.isFinite(end)) {
// Use a cursor so multiple instances of the same word land at
// distinct offsets instead of all collapsing onto the first
// occurrence. transformers.js returns matches in document
// order, so the cursor advances monotonically.
const idx = req.text.indexOf(m.word, searchCursor)
if (idx < 0) return [] // word doesn't actually appear — drop
start = idx
end = idx + m.word.length
searchCursor = end
} else {
// Advance cursor past native-offset matches too so a later
// fallback doesn't go backwards in the text.
searchCursor = Math.max(searchCursor, end as number)
}
return [{ class: cls, value: m.word, start: start as number, end: end as number, score: m.score }]
})
bridge.sendAnalyzeResult({ reqId: req.reqId, ok: true, matches })
} catch (err) {
bridge.sendAnalyzeResult({
Expand Down
69 changes: 50 additions & 19 deletions packages/app/src/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ let acpManager: AcpManager
let isSyncActive = false
let scanWorker: ScanWorkerProxy | null = null
let disposeSecurityIpc: (() => void) | null = null
const pfRuntime = makePfRuntime()
const pfRuntime = makePfRuntime({ run: runWithObservability })
// Lazily resolved on first access — pfModelsRoot() reads app.getPath
// which throws before app.ready, but this module evaluates at boot.
let pfCoordinator: ReturnType<typeof makePfCoordinator> | null = null
Expand Down Expand Up @@ -227,24 +227,54 @@ async function shutdownScanWorker(): Promise<void> {
* needs to drop the moment the runtime + backfill have settled
* (success or fail). The ScanBanner then takes over visually. */
async function syncPfRuntime(pfEnabled: boolean): Promise<void> {
try {
if (pfEnabled && pfModelInstalled()) {
await pfRuntime.start()
scanWorker?.notifyPfOnline()
} else {
scanWorker?.notifyPfOffline()
await pfRuntime.stop()
}
if (scanWorker) {
await runWithObservability(scanWorker.backfill())
}
} finally {
const cur = loadSecurityPreferences()
if (cur.pfActivationPending) {
const next = saveSecurityPreferences({ pfActivationPending: false })
mainWindow?.webContents.send(SECURITY_IPC_CHANNELS.EVT_PREFS_CHANGED, next)
}
}
await runWithObservability(
Effect.gen(function* () {
yield* Effect.annotateCurrentSpan('pf.enabled', pfEnabled)
if (pfEnabled && pfModelInstalled()) {
yield* Effect.promise(() => pfRuntime.start())
// pfRuntime.start() resolves even when the handshake failed —
// the hidden window might be up but transformers.js / ONNX
// crashed during model load. Check the actual runtime state
// before telling the scan worker pf is online: otherwise
// currentProfile drifts to `regex@4,pf@1.5b-q4`, every analyze
// round-trips to a dead host that returns [], and the user
// sees regex-only findings tagged with a profile string that
// lies about what scanned them.
const state = yield* Effect.promise(() => pfRuntime.getState())
yield* Effect.annotateCurrentSpan('pf.runtime.status', state?.status ?? 'null')
if (state?.runtime) yield* Effect.annotateCurrentSpan('pf.runtime.kind', state.runtime)
if (state?.error) yield* Effect.annotateCurrentSpan('pf.runtime.error', state.error)
if (state?.status === 'ready') {
yield* Effect.sync(() => scanWorker?.notifyPfOnline())
yield* Effect.annotateCurrentSpan('pf.notified', 'online')
} else {
yield* Effect.logError(
`[security] pf runtime failed to reach ready (status=${state?.status ?? 'unknown'}, error=${state?.error ?? 'unknown'})`,
)
yield* Effect.sync(() => scanWorker?.notifyPfOffline())
yield* Effect.annotateCurrentSpan('pf.notified', 'offline')
}
} else {
yield* Effect.sync(() => scanWorker?.notifyPfOffline())
yield* Effect.promise(() => pfRuntime.stop())
}
if (scanWorker) {
yield* scanWorker.backfill()
}
}).pipe(
// pfActivationPending clears on the way out (success OR fail) so
// the callout's "Activating…" state stops hanging on a permanent
// failure. ScanBanner takes over visually once backfill enqueues.
Effect.ensuring(Effect.sync(() => {
const cur = loadSecurityPreferences()
if (cur.pfActivationPending) {
const next = saveSecurityPreferences({ pfActivationPending: false })
mainWindow?.webContents.send(SECURITY_IPC_CHANNELS.EVT_PREFS_CHANGED, next)
}
})),
Effect.withSpan('pf.sync_runtime'),
),
)
}

function createWindow(): BrowserWindow {
Expand Down Expand Up @@ -449,6 +479,7 @@ app.whenReady().then(async () => {
pfCoordinator = makePfCoordinator({
modelDir: pfModelDir(),
fetch: ((url, init) => net.fetch(url as string, init)) as typeof globalThis.fetch,
run: runWithObservability,
})
// When a download initiated from the callout completes, finish
// the activation handshake on the user's behalf — flip pfEnabled
Expand Down
13 changes: 12 additions & 1 deletion packages/app/src/main/scan-worker-thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ void (async () => {
displayName: 'Privacy Filter (ML)',
available: () => pfOnline && loadSecurityPreferences().pfEnabled,
analyze: async (text: string): Promise<SensitiveMatch[]> => {
// Short-circuit empty / whitespace-only chunks. The model's
// GatherBlockQuantized op fails with "Invalid dispatch group
// size (0, 1, 1)" on zero-token input; cheaper for everyone to
// skip the IPC round-trip entirely.
if (text.trim().length === 0) return []
const reqId = pfNextReqId++
const raw: PfRawMatchWire[] = await new Promise((resolve, reject) => {
pfPending.set(reqId, { resolve, reject })
Expand All @@ -144,7 +149,13 @@ void (async () => {
// the scan engine already ran regex over upstream, but we don't
// have access to those matches here.
const regexMatches = detectWithRegex(text, 'regex')
return mapPfMatches(raw as PfRawMatch[], { regexMatches, fullText: text })
const mapped = mapPfMatches(raw as PfRawMatch[], { regexMatches, fullText: text })
// Diagnostic: see at a glance what % of raw matches survive
// class-mapping suppression. raw=N → mapped=M tells us if
// suppression rules are too aggressive or if labels are landing
// in the default-drop branch.
console.log(`[pf provider] raw=${raw.length} mapped=${mapped.length} kinds=${raw.map(m => m.class).join(',')}`)
return mapped
},
}

Expand Down
86 changes: 55 additions & 31 deletions packages/app/src/main/security/pf-coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// so the renderer's `evt-pf-state` always agrees with what the user
// can act on. ModelHost spawn/unload lands in PR 5c.

import { Effect } from 'effect'
import { downloadModel, type DownloadProgress } from './model-download.js'
import { MODEL_MANIFEST, manifestTotalBytes } from './model-manifest.js'
import { pfInstallStatus } from './model-state.js'
Expand Down Expand Up @@ -31,13 +32,19 @@ export interface PfCoordinator {
export interface PfCoordinatorDeps {
modelDir: string
fetch?: typeof globalThis.fetch
/** Effect runner that carries the main-process observability
* layer. Defaults to bare `Effect.runPromise` for tests; main
* passes `runWithObservability` so `pf.coordinator.download`
* spans land in the same OTel pipeline as the rest of Spool. */
run?: <A, E>(eff: Effect.Effect<A, E>) => Promise<A>
}

export function makePfCoordinator(deps: PfCoordinatorDeps): PfCoordinator {
const subscribers = new Set<(s: PfDownloadState) => void>()
const totalBytes = manifestTotalBytes(MODEL_MANIFEST)
let abortController: AbortController | null = null
let state = initialState(deps.modelDir)
const run = (deps.run ?? Effect.runPromise) as <A, E>(eff: Effect.Effect<A, E>) => Promise<A>

function publish(): void {
for (const fn of subscribers) {
Expand All @@ -54,39 +61,56 @@ export function makePfCoordinator(deps: PfCoordinatorDeps): PfCoordinator {
bytesTotal: state.bytesTotal,
}
publish()
try {
await downloadModel({
modelDir: deps.modelDir,
manifest: MODEL_MANIFEST,
...(deps.fetch ? { fetch: deps.fetch } : {}),
signal: abortController.signal,
onProgress: (p: DownloadProgress) => {
state = {
phase: 'downloading',
bytesDownloaded: p.bytesDownloaded,
bytesTotal: p.bytesTotal,
await run(
Effect.tryPromise({
try: () => downloadModel({
modelDir: deps.modelDir,
manifest: MODEL_MANIFEST,
...(deps.fetch ? { fetch: deps.fetch } : {}),
signal: abortController!.signal,
onProgress: (p: DownloadProgress) => {
state = {
phase: 'downloading',
bytesDownloaded: p.bytesDownloaded,
bytesTotal: p.bytesTotal,
}
publish()
},
}),
catch: (err) => err,
}).pipe(
Effect.tap(() => Effect.sync(() => {
state = { phase: 'installed', bytesDownloaded: totalBytes, bytesTotal: totalBytes }
})),
Effect.tap(() => Effect.annotateCurrentSpan('pf.download.outcome', 'installed')),
Effect.catchAll((err) => Effect.sync(() => {
const aborted = (err as { name?: string } | undefined)?.name === 'AbortError'
if (aborted) {
state = initialState(deps.modelDir)
} else {
state = {
phase: 'failed',
bytesDownloaded: state.bytesDownloaded,
bytesTotal: state.bytesTotal,
error: err instanceof Error ? err.message : String(err),
}
}
}).pipe(Effect.tap(() => Effect.annotateCurrentSpan(
'pf.download.outcome',
(err as { name?: string } | undefined)?.name === 'AbortError' ? 'cancelled' : 'failed',
)))),
Effect.ensuring(Effect.sync(() => {
abortController = null
publish()
},
})
state = { phase: 'installed', bytesDownloaded: totalBytes, bytesTotal: totalBytes }
} catch (err) {
const aborted = (err as { name?: string } | undefined)?.name === 'AbortError'
if (aborted) {
// Cancel → drop back to whatever the filesystem says now.
state = initialState(deps.modelDir)
} else {
state = {
phase: 'failed',
bytesDownloaded: state.bytesDownloaded,
bytesTotal: state.bytesTotal,
error: err instanceof Error ? err.message : String(err),
}
}
} finally {
abortController = null
publish()
}
})),
Effect.withSpan('pf.coordinator.download', {
attributes: {
'pf.download.total_bytes': totalBytes,
'pf.download.files': MODEL_MANIFEST.files.length,
},
}),
),
)
}

function cancelDownload(): void {
Expand Down
Loading