Skip to content

Commit 4136977

Browse files
authored
perf(graph): batch BulkLoad CSV / COPY FROM at 50k rows (#143)
BulkLoadNodes and copyEdgeGroup previously staged one CSV with every row and issued a single Kuzu COPY FROM. Kuzu buffers the full CSV in process memory during ingest, so on polyglot targets with hundreds of thousands of nodes the COPY-side resident set grew unbounded. Chunk the work into batches of bulkLoadBatchSize (default 50k, override via CODEIQ_BULK_BATCH_SIZE env). Each batch stages + ingests + cleans up before the next batch starts so neither on-disk CSV nor Kuzu's ingest buffer ever holds more than batchSize rows. Caveat: this is production hygiene, not a complete OOM fix at the ~/projects/ scale (49k files / 434k nodes). At that scale the enrich pipeline OOMs earlier than BulkLoad - likely in the graph builder / linker / classifier passes that materialise all nodes in Go memory before BulkLoad runs. Streaming the upstream enrich stages is a separate, larger refactor. Cypher uniqueness constraints are still enforced cross-batch (Kuzu ingest commits before the next COPY starts), so a duplicate primary key surfaces the same Copy exception either way. Verified: - go test ./... -count=1 - 875 pass - fixture-minimal index->enrich->stats - same 45-node / 68-edge output
1 parent 9994900 commit 4136977

1 file changed

Lines changed: 74 additions & 10 deletions

File tree

go/internal/graph/bulk.go

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,48 @@ var nodeColumns = []string{
2424
"props",
2525
}
2626

27-
// BulkLoadNodes writes nodes to a temporary CSV file and ingests via Kuzu's
28-
// COPY FROM. This is materially faster than per-node CREATE for the
29-
// enrich-phase volumes we hit (44k files / 100k+ nodes). Empty input is a
30-
// no-op (an empty CSV would still issue a COPY, which Kuzu may reject; the
31-
// no-op behaviour also matches Java's bulkSave convention).
27+
// bulkLoadBatchSize caps the number of rows materialised into any single
28+
// staging CSV / `COPY FROM` call. Kuzu buffers the full CSV in process
29+
// memory during ingest; on real-world polyglot targets (~/projects-scale
30+
// 49k files / 434k nodes) a single CSV pushed the process past the box's
31+
// 15 GiB RAM ceiling and got it OOM-killed. 50k rows keeps the peak
32+
// COPY-side resident set well under 1 GiB while still amortising the
33+
// per-statement Kuzu overhead. Override via CODEIQ_BULK_BATCH_SIZE env
34+
// (validated in resolveBulkBatchSize) for downstream perf tuning.
35+
const bulkLoadBatchSize = 50_000
36+
37+
// BulkLoadNodes writes nodes to one or more temporary CSV files and
38+
// ingests them via Kuzu's COPY FROM, in batches of bulkLoadBatchSize.
39+
// This is materially faster than per-node CREATE for the enrich-phase
40+
// volumes we hit (44k files / 100k+ nodes). Empty input is a no-op (an
41+
// empty CSV would still issue a COPY, which Kuzu may reject; the no-op
42+
// behaviour also matches Java's bulkSave convention).
43+
//
44+
// Each batch is staged + ingested + cleaned up before the next batch
45+
// starts so that neither the on-disk CSV footprint nor Kuzu's ingest
46+
// buffer ever holds more than bulkLoadBatchSize rows. Cypher uniqueness
47+
// constraints are still enforced cross-batch, so a duplicate primary
48+
// key surfaces the same Copy exception either way.
3249
func (s *Store) BulkLoadNodes(nodes []*model.CodeNode) error {
3350
if len(nodes) == 0 {
3451
return nil
3552
}
53+
batchSize := resolveBulkBatchSize()
54+
for start := 0; start < len(nodes); start += batchSize {
55+
end := start + batchSize
56+
if end > len(nodes) {
57+
end = len(nodes)
58+
}
59+
if err := s.copyNodeBatch(nodes[start:end]); err != nil {
60+
return err
61+
}
62+
}
63+
return nil
64+
}
65+
66+
// copyNodeBatch stages a single CSV for `batch` and runs one Kuzu COPY
67+
// FROM. Caller is responsible for slicing input into batches.
68+
func (s *Store) copyNodeBatch(batch []*model.CodeNode) error {
3669
tmp, err := os.CreateTemp("", "codeiq-nodes-*.csv")
3770
if err != nil {
3871
return fmt.Errorf("graph: temp csv: %w", err)
@@ -41,7 +74,7 @@ func (s *Store) BulkLoadNodes(nodes []*model.CodeNode) error {
4174
defer os.Remove(tmp.Name())
4275

4376
w := csv.NewWriter(tmp)
44-
for _, n := range nodes {
77+
for _, n := range batch {
4578
row, err := encodeNodeRow(n)
4679
if err != nil {
4780
tmp.Close()
@@ -74,6 +107,19 @@ func (s *Store) BulkLoadNodes(nodes []*model.CodeNode) error {
74107
return nil
75108
}
76109

110+
// resolveBulkBatchSize honours CODEIQ_BULK_BATCH_SIZE when set to a
111+
// positive integer; otherwise returns the compiled-in default. Invalid
112+
// values silently fall back to the default so a typo in the env never
113+
// blocks enrichment.
114+
func resolveBulkBatchSize() int {
115+
if raw := os.Getenv("CODEIQ_BULK_BATCH_SIZE"); raw != "" {
116+
if v, err := strconv.Atoi(raw); err == nil && v > 0 {
117+
return v
118+
}
119+
}
120+
return bulkLoadBatchSize
121+
}
122+
77123
// encodeNodeRow serialises one CodeNode into the column order declared by
78124
// nodeColumns. Numeric INT64 columns are emitted as empty strings when zero
79125
// so Kuzu treats them as NULL rather than 0 (line_start/line_end on
@@ -152,18 +198,36 @@ func (s *Store) BulkLoadEdges(edges []*model.CodeEdge) error {
152198
return nil
153199
}
154200

155-
// copyEdgeGroup stages one rel-table CSV and issues COPY <REL> FROM. The
156-
// first two columns are the FROM and TO node primary keys per Kuzu's rel
157-
// COPY convention.
201+
// copyEdgeGroup stages rel-table CSVs in batches of bulkLoadBatchSize
202+
// and issues one COPY <REL> FROM per batch. The first two columns are
203+
// the FROM and TO node primary keys per Kuzu's rel COPY convention.
204+
// Same memory rationale as BulkLoadNodes — Kuzu buffers the full CSV
205+
// in ingest, so chunking caps peak resident memory.
158206
func (s *Store) copyEdgeGroup(kind model.EdgeKind, edges []*model.CodeEdge) error {
207+
batchSize := resolveBulkBatchSize()
208+
for start := 0; start < len(edges); start += batchSize {
209+
end := start + batchSize
210+
if end > len(edges) {
211+
end = len(edges)
212+
}
213+
if err := s.copyEdgeBatch(kind, edges[start:end]); err != nil {
214+
return err
215+
}
216+
}
217+
return nil
218+
}
219+
220+
// copyEdgeBatch stages a single rel-table CSV for `batch` and runs one
221+
// Kuzu COPY FROM.
222+
func (s *Store) copyEdgeBatch(kind model.EdgeKind, batch []*model.CodeEdge) error {
159223
tmp, err := os.CreateTemp("", "codeiq-edges-*.csv")
160224
if err != nil {
161225
return fmt.Errorf("graph: temp csv: %w", err)
162226
}
163227
defer os.Remove(tmp.Name())
164228

165229
w := csv.NewWriter(tmp)
166-
for _, e := range edges {
230+
for _, e := range batch {
167231
props, err := json.Marshal(e.Properties)
168232
if err != nil {
169233
tmp.Close()

0 commit comments

Comments
 (0)