Skip to content

Commit eaf9c83

Browse files
committed
Eliminate rename() — reindex deletes old DB, index writes directly
Architecture change to prevent concurrent write corruption: Indexing (new project): cbm_write_db writes project.db directly Reindexing (existing): delete project.db, then index as new Incremental: upsert changes into existing project.db (unchanged) Root cause: rename(project.db.tmp, project.db) while watcher/MCP had the old file open via SQLite caused WAL corruption — stale connections wrote to deleted inodes. Changes: - cbm_gbuf_dump_to_sqlite: write to final path, no .tmp + rename - pipeline.c: if DB exists but not eligible for incremental, delete it before proceeding with full index (reindex = delete + index) - Pipeline lock (atomic spinlock) serializes concurrent runs: MCP/autoindex block, watcher skips if busy - delete_project acquires pipeline lock before unlinking - BEGIN IMMEDIATE for all write transactions (respects busy_timeout) - cbm_store_check_integrity: detects corrupt DBs (>5 project rows or invalid root_path), auto-deletes with ERROR log - cbm_store_get_db accessor for test corruption simulation Tests: 5 integrity tests + 4 concurrency lock tests
1 parent 68cc19e commit eaf9c83

7 files changed

Lines changed: 186 additions & 50 deletions

File tree

src/graph_buffer/graph_buffer.c

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -971,23 +971,12 @@ int cbm_gbuf_dump_to_sqlite(cbm_gbuf_t *gb, const char *path) {
971971
cbm_ht_free(gb->edges_by_type);
972972
gb->edges_by_type = NULL;
973973

974-
/* Write to .tmp first, then atomic rename */
975-
char tmp_path[1040];
976-
snprintf(tmp_path, sizeof(tmp_path), "%s.tmp", path);
977-
978-
int rc = cbm_write_db(tmp_path, gb->project, gb->root_path, indexed_at, dump_nodes, node_idx,
974+
/* Write directly to final path — no .tmp + rename.
975+
* Callers must delete the old .db before calling this (reindex)
976+
* or ensure no file exists (first index). */
977+
int rc = cbm_write_db(path, gb->project, gb->root_path, indexed_at, dump_nodes, node_idx,
979978
dump_edges, edge_idx);
980979

981-
if (rc == 0) {
982-
rc = rename(tmp_path, path);
983-
if (rc != 0) {
984-
cbm_log_error("gbuf.dump", "op", "rename", "path", path);
985-
}
986-
} else {
987-
// NOLINTNEXTLINE(cert-err33-c) — best-effort cleanup
988-
remove(tmp_path);
989-
}
990-
991980
{
992981
char b1[16];
993982
char b2[16];

src/main.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,25 @@ static void *http_thread(void *arg) {
7777

7878
static int watcher_index_fn(const char *project_name, const char *root_path, void *user_data) {
7979
(void)user_data;
80+
81+
/* Non-blocking: skip if another pipeline is already running.
82+
* Watcher will retry on next poll cycle (5-60s). */
83+
if (!cbm_pipeline_try_lock()) {
84+
cbm_log_info("watcher.skip", "project", project_name, "reason", "pipeline_busy");
85+
return 0;
86+
}
87+
8088
cbm_log_info("watcher.reindex", "project", project_name, "path", root_path);
8189

8290
cbm_pipeline_t *p = cbm_pipeline_new(root_path, NULL, CBM_MODE_FULL);
8391
if (!p) {
92+
cbm_pipeline_unlock();
8493
return -1;
8594
}
8695

8796
int rc = cbm_pipeline_run(p);
8897
cbm_pipeline_free(p);
98+
cbm_pipeline_unlock();
8999
return rc;
90100
}
91101

src/mcp/mcp.c

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,6 +1113,9 @@ static char *handle_delete_project(cbm_mcp_server_t *srv, const char *args) {
11131113
srv->current_project = NULL;
11141114
}
11151115

1116+
/* Wait for any in-progress pipeline to finish before deleting */
1117+
cbm_pipeline_lock();
1118+
11161119
/* Delete the .db file + WAL/SHM */
11171120
char path[1024];
11181121
project_db_path(name, path, sizeof(path));
@@ -1131,6 +1134,8 @@ static char *handle_delete_project(cbm_mcp_server_t *srv, const char *args) {
11311134
status = "deleted";
11321135
}
11331136

1137+
cbm_pipeline_unlock();
1138+
11341139
yyjson_mut_doc *doc = yyjson_mut_doc_new(NULL);
11351140
yyjson_mut_val *root = yyjson_mut_obj(doc);
11361141
yyjson_mut_doc_set_root(doc, root);
@@ -1437,9 +1442,19 @@ static char *handle_index_repository(cbm_mcp_server_t *srv, const char *args) {
14371442

14381443
char *project_name = heap_strdup(cbm_pipeline_project_name(p));
14391444

1440-
/* Pipeline builds everything in-memory, then dumps to file atomically.
1441-
* No need to close srv->store — pipeline doesn't touch the open store. */
1445+
/* Close cached store — pipeline will delete + recreate the .db file */
1446+
if (srv->owns_store && srv->store) {
1447+
cbm_store_close(srv->store);
1448+
srv->store = NULL;
1449+
}
1450+
free(srv->current_project);
1451+
srv->current_project = NULL;
1452+
1453+
/* Serialize pipeline runs to prevent concurrent writes */
1454+
cbm_pipeline_lock();
14421455
int rc = cbm_pipeline_run(p);
1456+
cbm_pipeline_unlock();
1457+
14431458
cbm_pipeline_free(p);
14441459
cbm_mem_collect(); /* return mimalloc pages to OS after large indexing */
14451460

@@ -2749,7 +2764,11 @@ static void *autoindex_thread(void *arg) {
27492764
return NULL;
27502765
}
27512766

2767+
/* Block until any concurrent pipeline finishes */
2768+
cbm_pipeline_lock();
27522769
int rc = cbm_pipeline_run(p);
2770+
cbm_pipeline_unlock();
2771+
27532772
cbm_pipeline_free(p);
27542773
cbm_mem_collect(); /* return mimalloc pages to OS after indexing */
27552774

src/pipeline/pipeline.c

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,28 @@
3333
#include <sys/stat.h>
3434
#include <time.h>
3535

36+
/* ── Global index lock ─────────────────────────────────────────── */
37+
/* Prevents concurrent pipeline runs on the same DB file.
38+
* Atomic spinlock: 0 = free, 1 = locked. */
39+
static atomic_int g_pipeline_busy = 0;
40+
41+
bool cbm_pipeline_try_lock(void) {
42+
return atomic_exchange(&g_pipeline_busy, 1) == 0;
43+
}
44+
45+
#define LOCK_SPIN_NS 100000000 /* 100ms between lock retries */
46+
47+
void cbm_pipeline_lock(void) {
48+
while (atomic_exchange(&g_pipeline_busy, 1) != 0) {
49+
struct timespec ts = {0, LOCK_SPIN_NS};
50+
cbm_nanosleep(&ts, NULL);
51+
}
52+
}
53+
54+
void cbm_pipeline_unlock(void) {
55+
atomic_store(&g_pipeline_busy, 0);
56+
}
57+
3658
/* ── Internal state ──────────────────────────────────────────────── */
3759

3860
struct cbm_pipeline {
@@ -344,45 +366,44 @@ int cbm_pipeline_run(cbm_pipeline_t *p) {
344366
return -1;
345367
}
346368

347-
/* Check for existing DB with file hashes → incremental path */
369+
/* Check for existing DB → route to incremental or delete for reindex */
348370
{
349371
char *db_path = resolve_db_path(p);
350372
if (db_path) {
351373
struct stat db_st;
352374
if (stat(db_path, &db_st) == 0) {
353-
/* DB exists — check if it has file hashes */
375+
/* DB exists — try incremental path first */
354376
cbm_store_t *check_store = cbm_store_open_path(db_path);
355-
if (check_store) {
356-
/* Integrity check — corrupt DB → delete and fall through to full reindex */
357-
if (!cbm_store_check_integrity(check_store)) {
358-
cbm_log_error("pipeline.corrupt_db", "path", db_path, "action",
359-
"deleting — will do full reindex");
360-
cbm_store_close(check_store);
361-
cbm_unlink(db_path);
362-
char wal[1040];
363-
char shm[1040];
364-
snprintf(wal, sizeof(wal), "%s-wal", db_path);
365-
snprintf(shm, sizeof(shm), "%s-shm", db_path);
366-
cbm_unlink(wal);
367-
cbm_unlink(shm);
368-
} else {
369-
cbm_file_hash_t *hashes = NULL;
370-
int hash_count = 0;
371-
cbm_store_get_file_hashes(check_store, p->project_name, &hashes,
372-
&hash_count);
373-
cbm_store_free_file_hashes(hashes, hash_count);
374-
cbm_store_close(check_store);
375-
376-
if (hash_count > 0) {
377-
cbm_log_info("pipeline.route", "path", "incremental", "stored_hashes",
378-
itoa_buf(hash_count));
379-
rc = cbm_pipeline_run_incremental(p, db_path, files, file_count);
380-
cbm_discover_free(files, file_count);
381-
free(db_path);
382-
return rc;
383-
}
377+
if (check_store && cbm_store_check_integrity(check_store)) {
378+
cbm_file_hash_t *hashes = NULL;
379+
int hash_count = 0;
380+
cbm_store_get_file_hashes(check_store, p->project_name, &hashes, &hash_count);
381+
cbm_store_free_file_hashes(hashes, hash_count);
382+
cbm_store_close(check_store);
383+
384+
if (hash_count > 0) {
385+
cbm_log_info("pipeline.route", "path", "incremental", "stored_hashes",
386+
itoa_buf(hash_count));
387+
rc = cbm_pipeline_run_incremental(p, db_path, files, file_count);
388+
cbm_discover_free(files, file_count);
389+
free(db_path);
390+
return rc;
384391
}
392+
} else if (check_store) {
393+
cbm_store_close(check_store);
385394
}
395+
396+
/* Not eligible for incremental → reindex: delete old DB first.
397+
* cbm_write_db writes directly to the final path (no rename),
398+
* so the old file must be gone before the pipeline dumps. */
399+
cbm_log_info("pipeline.route", "path", "reindex", "action", "deleting old db");
400+
cbm_unlink(db_path);
401+
char wal[1040];
402+
char shm[1040];
403+
snprintf(wal, sizeof(wal), "%s-wal", db_path);
404+
snprintf(shm, sizeof(shm), "%s-shm", db_path);
405+
cbm_unlink(wal);
406+
cbm_unlink(shm);
386407
}
387408
free(db_path);
388409
}

src/pipeline/pipeline.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,20 @@ void cbm_pipeline_cancel(cbm_pipeline_t *p);
5555
* owned by the pipeline. Valid until cbm_pipeline_free(). */
5656
const char *cbm_pipeline_project_name(const cbm_pipeline_t *p);
5757

58+
/* ── Index lock (prevents concurrent pipeline runs on same DB) ──── */
59+
60+
/* Try to acquire the global index lock. Returns true if acquired,
61+
* false if another pipeline is already running (non-blocking).
62+
* Use this in the watcher — skip reindex if busy. */
63+
bool cbm_pipeline_try_lock(void);
64+
65+
/* Acquire the global index lock, blocking until available.
66+
* Use this in MCP handler and autoindex — wait for busy watcher to finish. */
67+
void cbm_pipeline_lock(void);
68+
69+
/* Release the global index lock. */
70+
void cbm_pipeline_unlock(void);
71+
5872
/* ── FQN helpers (used by passes and external callers) ──────────── */
5973

6074
/* Compute a qualified name: project.dir.parts.name

src/store/store.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,7 +1055,7 @@ int cbm_store_upsert_node_batch(cbm_store_t *s, const cbm_node_t *nodes, int cou
10551055
return CBM_STORE_OK;
10561056
}
10571057

1058-
exec_sql(s, "BEGIN;");
1058+
exec_sql(s, "BEGIN IMMEDIATE;");
10591059
for (int i = 0; i < count; i++) {
10601060
int64_t id = cbm_store_upsert_node(s, &nodes[i]);
10611061
if (id == CBM_STORE_ERR) {
@@ -1291,7 +1291,7 @@ int cbm_store_insert_edge_batch(cbm_store_t *s, const cbm_edge_t *edges, int cou
12911291
return CBM_STORE_OK;
12921292
}
12931293

1294-
exec_sql(s, "BEGIN;");
1294+
exec_sql(s, "BEGIN IMMEDIATE;");
12951295
for (int i = 0; i < count; i++) {
12961296
int64_t id = cbm_store_insert_edge(s, &edges[i]);
12971297
if (id == CBM_STORE_ERR) {

tests/test_pipeline.c

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <stdlib.h>
1515
#include <string.h>
1616
#include <stdatomic.h>
17+
#include "foundation/compat_thread.h"
1718
#include <sys/stat.h>
1819
#include <unistd.h>
1920
#include "graph_buffer/graph_buffer.h"
@@ -5213,7 +5214,89 @@ TEST(incremental_kustomize_module_indexed) {
52135214
PASS();
52145215
}
52155216

5217+
/* ── Index lock tests ───────────────────────────────────────────── */
5218+
5219+
TEST(pipeline_lock_try_acquire) {
5220+
/* First try-lock should succeed */
5221+
ASSERT_TRUE(cbm_pipeline_try_lock());
5222+
/* Second try-lock should fail (already held) */
5223+
ASSERT_FALSE(cbm_pipeline_try_lock());
5224+
/* Release, then re-acquire should succeed */
5225+
cbm_pipeline_unlock();
5226+
ASSERT_TRUE(cbm_pipeline_try_lock());
5227+
cbm_pipeline_unlock();
5228+
PASS();
5229+
}
5230+
5231+
TEST(pipeline_lock_blocking) {
5232+
/* Lock, then unlock — basic sanity */
5233+
cbm_pipeline_lock();
5234+
cbm_pipeline_unlock();
5235+
/* Should be immediately re-acquirable */
5236+
cbm_pipeline_lock();
5237+
cbm_pipeline_unlock();
5238+
PASS();
5239+
}
5240+
5241+
/* Thread function that tries to acquire the lock and records result */
5242+
static atomic_int g_thread_acquired = 0;
5243+
static atomic_int g_thread_done = 0;
5244+
5245+
static void *try_lock_thread(void *arg) {
5246+
(void)arg;
5247+
if (cbm_pipeline_try_lock()) {
5248+
atomic_store(&g_thread_acquired, 1);
5249+
cbm_pipeline_unlock();
5250+
} else {
5251+
atomic_store(&g_thread_acquired, 0);
5252+
}
5253+
atomic_store(&g_thread_done, 1);
5254+
return NULL;
5255+
}
5256+
5257+
TEST(pipeline_lock_contention) {
5258+
/* Main thread holds lock, spawned thread should fail try_lock */
5259+
cbm_pipeline_lock();
5260+
atomic_store(&g_thread_acquired, -1);
5261+
atomic_store(&g_thread_done, 0);
5262+
5263+
cbm_thread_t tid;
5264+
int rc = cbm_thread_create(&tid, 0, try_lock_thread, NULL);
5265+
ASSERT_EQ(rc, 0);
5266+
5267+
/* Wait for thread to finish */
5268+
cbm_thread_join(&tid);
5269+
5270+
/* Thread should NOT have acquired the lock */
5271+
ASSERT_EQ(atomic_load(&g_thread_acquired), 0);
5272+
cbm_pipeline_unlock();
5273+
PASS();
5274+
}
5275+
5276+
TEST(pipeline_lock_release_allows_contender) {
5277+
/* Main thread acquires and releases, then spawned thread should succeed */
5278+
cbm_pipeline_lock();
5279+
cbm_pipeline_unlock();
5280+
5281+
atomic_store(&g_thread_acquired, -1);
5282+
atomic_store(&g_thread_done, 0);
5283+
5284+
cbm_thread_t tid;
5285+
int rc = cbm_thread_create(&tid, 0, try_lock_thread, NULL);
5286+
ASSERT_EQ(rc, 0);
5287+
cbm_thread_join(&tid);
5288+
5289+
/* Thread SHOULD have acquired the lock */
5290+
ASSERT_EQ(atomic_load(&g_thread_acquired), 1);
5291+
PASS();
5292+
}
5293+
52165294
SUITE(pipeline) {
5295+
/* Index lock */
5296+
RUN_TEST(pipeline_lock_try_acquire);
5297+
RUN_TEST(pipeline_lock_blocking);
5298+
RUN_TEST(pipeline_lock_contention);
5299+
RUN_TEST(pipeline_lock_release_allows_contender);
52175300
/* Lifecycle */
52185301
RUN_TEST(pipeline_create_free);
52195302
RUN_TEST(pipeline_null_repo);

0 commit comments

Comments
 (0)