From 87659bce65156111839c0c1d1b7765a7736a1dd2 Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 7 Jun 2026 15:42:16 +0300 Subject: [PATCH 1/9] feat: add source_file to edge schemas and FileHashTracker for incremental graph rebuild (PR-G1) This commit implements PR-G1 of the incremental graph rebuild plan: - Bump ONTOLOGY_VERSION to 17 (requires re-index) - Add source_file STRING column to all 12 edge DDL constants - Update _write_edges() to pass source_file for EXTENDS, IMPLEMENTS, INJECTS, DECLARES, OVERRIDES, CALLS, UNRESOLVED_AT - Update _write_routes_and_exposes() to pass source_file for EXPOSES, DECLARES_CLIENT, DECLARES_PRODUCER, HTTP_CALLS, ASYNC_CALLS - Add FileHashTracker class for detecting file changes (added, changed, removed) - Add 9 tests for FileHashTracker and edge schema validation Scope: PR-G1 (Hash tracker + source_file edge schema) Plan: plans/active/PLAN-INCREMENTAL-GRAPH.md Co-Authored-By: Claude Opus 4.7 --- ast_java.py | 2 +- build_ast_graph.py | 164 +++++++++++++++++++---- tests/test_incremental_graph.py | 225 ++++++++++++++++++++++++++++++++ 3 files changed, 364 insertions(+), 27 deletions(-) create mode 100644 tests/test_incremental_graph.py diff --git a/ast_java.py b/ast_java.py index 0626125e..7bab8320 100644 --- a/ast_java.py +++ b/ast_java.py @@ -83,7 +83,7 @@ # Phase 11: `EDGE_SCHEMA` in `java_ontology.py` (canonical edge navigation schema; v14 re-index). # Phase 12: CALLS `callee_declaring_role`, supertype-walk dedup, pass3 unresolved counters (v15 re-index). # Bumps whenever extraction / enrichment semantics change. -ONTOLOGY_VERSION = 16 +ONTOLOGY_VERSION = 17 ROLE_ANNOTATIONS: dict[str, str] = { # Spring Web diff --git a/build_ast_graph.py b/build_ast_graph.py index 0d11db80..07db8011 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -401,6 +401,79 @@ class GraphTables: type_role_by_node_id: dict[str, str] = field(default_factory=dict) +class FileHashTracker: + """Track content hashes for incremental graph rebuild.""" + def __init__(self, index_dir: Path): + self._path = index_dir / ".graph_hashes.json" + self._hashes: dict[str, str] = {} # rel_path -> sha256_hex + + def load(self) -> None: + """Load hashes from disk. No-op if file missing (first run).""" + if not self._path.exists(): + return + try: + with open(self._path, "r", encoding="utf-8") as f: + self._hashes = json.load(f) + except (json.JSONDecodeError, OSError): + # Corrupt or unreadable hash file; start fresh. + self._hashes = {} + + def save(self) -> None: + """Persist hashes to disk atomically (write .tmp, rename).""" + tmp_path = self._path.with_suffix(".json.tmp") + try: + with open(tmp_path, "w", encoding="utf-8") as f: + json.dump(self._hashes, f, sort_keys=True) + os.replace(tmp_path, self._path) + except OSError: + # Fail gracefully; next run will treat as missing and rebuild. + pass + + def detect_changes(self, source_root: Path, ignore: LayeredIgnore) -> tuple[set[str], set[str], set[str]]: + """Return (added, changed, removed) sets of relative POSIX paths.""" + current_files: set[str] = set() + for abs_path in iter_java_source_files(source_root, ignore=ignore): + rel_path = abs_path.relative_to(source_root).as_posix() + current_files.add(rel_path) + + added: set[str] = set() + changed: set[str] = set() + removed: set[str] = set() + + # Detect added and changed files. + for rel_path in current_files: + abs_path = source_root / rel_path + file_hash = _hash_file(abs_path) + stored_hash = self._hashes.get(rel_path) + if stored_hash is None: + added.add(rel_path) + elif stored_hash != file_hash: + changed.add(rel_path) + + # Detect removed files. + for rel_path in self._hashes: + if rel_path not in current_files: + removed.add(rel_path) + + return added, changed, removed + + def update(self, rel_paths: set[str], source_root: Path) -> None: + """Compute and store hashes for the given paths.""" + for rel_path in rel_paths: + abs_path = source_root / rel_path + if abs_path.exists(): + self._hashes[rel_path] = _hash_file(abs_path) + + +def _hash_file(abs_path: Path) -> str: + """Compute SHA-256 hash of a file's raw bytes.""" + hasher = hashlib.sha256() + with open(abs_path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + hasher.update(chunk) + return hasher.hexdigest() + + # ---------- file walk (see `path_filtering.iter_java_source_files`) ---------- @@ -2414,22 +2487,22 @@ def _micro_factor(member: MemberEntry | None) -> float: _SCHEMA_EXTENDS = ( "CREATE REL TABLE EXTENDS(FROM Symbol TO Symbol, " - "dst_name STRING, dst_fqn STRING, resolved BOOLEAN)" + "source_file STRING, dst_name STRING, dst_fqn STRING, resolved BOOLEAN)" ) _SCHEMA_IMPLEMENTS = ( "CREATE REL TABLE IMPLEMENTS(FROM Symbol TO Symbol, " - "dst_name STRING, dst_fqn STRING, resolved BOOLEAN)" + "source_file STRING, dst_name STRING, dst_fqn STRING, resolved BOOLEAN)" ) _SCHEMA_INJECTS = ( "CREATE REL TABLE INJECTS(FROM Symbol TO Symbol, " - "dst_name STRING, dst_fqn STRING, resolved BOOLEAN, " + "source_file STRING, dst_name STRING, dst_fqn STRING, resolved BOOLEAN, " "mechanism STRING, annotation STRING, field_or_param STRING)" ) -_SCHEMA_DECLARES = "CREATE REL TABLE DECLARES(FROM Symbol TO Symbol)" -_SCHEMA_OVERRIDES = "CREATE REL TABLE OVERRIDES(FROM Symbol TO Symbol)" +_SCHEMA_DECLARES = "CREATE REL TABLE DECLARES(FROM Symbol TO Symbol, source_file STRING)" +_SCHEMA_OVERRIDES = "CREATE REL TABLE OVERRIDES(FROM Symbol TO Symbol, source_file STRING)" _SCHEMA_CALLS = ( "CREATE REL TABLE CALLS(FROM Symbol TO Symbol, " - "call_site_line INT64, call_site_byte INT64, arg_count INT64, " + "source_file STRING, call_site_line INT64, call_site_byte INT64, arg_count INT64, " "confidence DOUBLE, strategy STRING, source STRING, resolved BOOLEAN, " "callee_declaring_role STRING)" ) @@ -2439,27 +2512,27 @@ def _micro_factor(member: MemberEntry | None) -> float: "arg_count INT64, callee_simple STRING, receiver_expr STRING, reason STRING, " "PRIMARY KEY(id))" ) -_SCHEMA_UNRESOLVED_AT = "CREATE REL TABLE UNRESOLVED_AT(FROM Symbol TO UnresolvedCallSite)" +_SCHEMA_UNRESOLVED_AT = "CREATE REL TABLE UNRESOLVED_AT(FROM Symbol TO UnresolvedCallSite, source_file STRING)" _SCHEMA_EXPOSES = ( "CREATE REL TABLE EXPOSES(FROM Symbol TO Route, " - "confidence DOUBLE, strategy STRING)" + "source_file STRING, confidence DOUBLE, strategy STRING)" ) _SCHEMA_DECLARES_CLIENT = ( "CREATE REL TABLE DECLARES_CLIENT(FROM Symbol TO Client, " - "confidence DOUBLE, strategy STRING)" + "source_file STRING, confidence DOUBLE, strategy STRING)" ) _SCHEMA_DECLARES_PRODUCER = ( "CREATE REL TABLE DECLARES_PRODUCER(FROM Symbol TO Producer, " - "confidence DOUBLE, strategy STRING)" + "source_file STRING, confidence DOUBLE, strategy STRING)" ) _SCHEMA_HTTP_CALLS = ( "CREATE REL TABLE HTTP_CALLS(FROM Client TO Route, " - "confidence DOUBLE, strategy STRING, " + "source_file STRING, confidence DOUBLE, strategy STRING, " "method_call STRING, raw_uri STRING, match STRING)" ) _SCHEMA_ASYNC_CALLS = ( "CREATE REL TABLE ASYNC_CALLS(FROM Producer TO Route, " - "confidence DOUBLE, strategy STRING, " + "source_file STRING, confidence DOUBLE, strategy STRING, " "direction STRING, raw_topic STRING, match STRING)" ) @@ -2607,28 +2680,29 @@ def _write_nodes( _CREATE_EXT = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:EXTENDS {dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)" + "CREATE (a)-[:EXTENDS {source_file: $source_file, dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)" ) _CREATE_IMPL = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:IMPLEMENTS {dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)" + "CREATE (a)-[:IMPLEMENTS {source_file: $source_file, dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)" ) _CREATE_INJ = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:INJECTS {dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved, " + "CREATE (a)-[:INJECTS {source_file: $source_file, dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved, " "mechanism: $mechanism, annotation: $annotation, field_or_param: $field_or_param}]->(b)" ) _CREATE_DECL = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:DECLARES]->(b)" + "CREATE (a)-[:DECLARES {source_file: $source_file}]->(b)" ) _CREATE_OVERRIDES = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:OVERRIDES]->(b)" + "CREATE (a)-[:OVERRIDES {source_file: $source_file}]->(b)" ) _CREATE_CALL = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " "CREATE (a)-[:CALLS {" + "source_file: $source_file, " "call_site_line: $line, call_site_byte: $byte, arg_count: $argc, " "confidence: $conf, strategy: $strat, source: $src_kind, resolved: $resolved, " "callee_declaring_role: $callee_declaring_role" @@ -2656,11 +2730,11 @@ def _write_nodes( _CREATE_EXPOSES = ( "MATCH (s:Symbol {id: $sid}), (r:Route {id: $rid}) " - "CREATE (s)-[:EXPOSES {confidence: $confidence, strategy: $strategy}]->(r)" + "CREATE (s)-[:EXPOSES {source_file: $source_file, confidence: $confidence, strategy: $strategy}]->(r)" ) _CREATE_DECLARES_CLIENT = ( "MATCH (s:Symbol {id: $sid}), (c:Client {id: $cid}) " - "CREATE (s)-[:DECLARES_CLIENT {confidence: $confidence, strategy: $strategy}]->(c)" + "CREATE (s)-[:DECLARES_CLIENT {source_file: $source_file, confidence: $confidence, strategy: $strategy}]->(c)" ) _CREATE_PRODUCER = ( "CREATE (:Producer {" @@ -2673,16 +2747,16 @@ def _write_nodes( ) _CREATE_DECLARES_PRODUCER = ( "MATCH (s:Symbol {id: $sid}), (p:Producer {id: $pid}) " - "CREATE (s)-[:DECLARES_PRODUCER {confidence: $confidence, strategy: $strategy}]->(p)" + "CREATE (s)-[:DECLARES_PRODUCER {source_file: $source_file, confidence: $confidence, strategy: $strategy}]->(p)" ) _CREATE_HTTP_CALL = ( "MATCH (c:Client {id: $cid}), (r:Route {id: $rid}) " - "CREATE (c)-[:HTTP_CALLS {confidence: $confidence, strategy: $strategy, " + "CREATE (c)-[:HTTP_CALLS {source_file: $source_file, confidence: $confidence, strategy: $strategy, " "method_call: $method_call, raw_uri: $raw_uri, match: $match}]->(r)" ) _CREATE_ASYNC_CALL = ( "MATCH (p:Producer {id: $pid}), (r:Route {id: $rid}) " - "CREATE (p)-[:ASYNC_CALLS {confidence: $confidence, strategy: $strategy, " + "CREATE (p)-[:ASYNC_CALLS {source_file: $source_file, confidence: $confidence, strategy: $strategy, " "direction: $direction, raw_topic: $raw_topic, match: $match}]->(r)" ) @@ -2733,29 +2807,45 @@ def _populate_overrides_rows(tables: GraphTables) -> None: def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: + # Build node_id -> file_path lookup for source_file resolution. + _file_by_node_id: dict[str, str] = {} + for entry in tables.types.values(): + _file_by_node_id[entry.node_id] = entry.file_path + for m in tables.members: + _file_by_node_id[m.node_id] = m.file_path + for r in tables.extends_rows: conn.execute(_CREATE_EXT, { "src": r.src_id, "dst": r.dst_id, + "source_file": _file_by_node_id.get(r.src_id, ""), "dst_name": r.dst_name, "dst_fqn": r.dst_fqn, "resolved": r.resolved, }) for r in tables.implements_rows: conn.execute(_CREATE_IMPL, { "src": r.src_id, "dst": r.dst_id, + "source_file": _file_by_node_id.get(r.src_id, ""), "dst_name": r.dst_name, "dst_fqn": r.dst_fqn, "resolved": r.resolved, }) for r in tables.injects_rows: conn.execute(_CREATE_INJ, { "src": r.src_id, "dst": r.dst_id, + "source_file": _file_by_node_id.get(r.src_id, ""), "dst_name": r.dst_name, "dst_fqn": r.dst_fqn, "resolved": r.resolved, "mechanism": r.mechanism, "annotation": r.annotation, "field_or_param": r.field_or_param, }) for row in tables.declares_rows: - conn.execute(_CREATE_DECL, {"src": row.src_id, "dst": row.dst_id}) + conn.execute(_CREATE_DECL, { + "src": row.src_id, "dst": row.dst_id, + "source_file": _file_by_node_id.get(row.src_id, ""), + }) for row in tables.overrides_rows: - conn.execute(_CREATE_OVERRIDES, {"src": row.src_id, "dst": row.dst_id}) + conn.execute(_CREATE_OVERRIDES, { + "src": row.src_id, "dst": row.dst_id, + "source_file": _file_by_node_id.get(row.src_id, ""), + }) seen_calls: set[tuple[str, str, int, int]] = set() unique_calls: list[CallsRow] = [] @@ -2769,6 +2859,7 @@ def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: for row in unique_calls: conn.execute(_CREATE_CALL, { "src": row.src_id, "dst": row.dst_id, + "source_file": _file_by_node_id.get(row.src_id, ""), "line": row.call_site_line, "byte": row.call_site_byte, "argc": row.arg_count, @@ -2789,7 +2880,7 @@ def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: ) _CREATE_UNRESOLVED_AT = ( "MATCH (a:Symbol {id: $caller}), (u:UnresolvedCallSite {id: $ucs}) " - "CREATE (a)-[:UNRESOLVED_AT]->(u)" + "CREATE (a)-[:UNRESOLVED_AT {source_file: $source_file}]->(u)" ) seen_ucs: set[str] = set() for row in tables.unresolved_call_site_rows: @@ -2806,10 +2897,26 @@ def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: "recv": row.receiver_expr, "reason": row.reason, }) - conn.execute(_CREATE_UNRESOLVED_AT, {"caller": row.caller_id, "ucs": row.id}) + conn.execute(_CREATE_UNRESOLVED_AT, { + "caller": row.caller_id, "ucs": row.id, + "source_file": _file_by_node_id.get(row.caller_id, ""), + }) def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> None: + # Build node_id -> file_path lookup for source_file resolution (for Symbol sources). + _file_by_node_id: dict[str, str] = {} + for entry in tables.types.values(): + _file_by_node_id[entry.node_id] = entry.file_path + for m in tables.members: + _file_by_node_id[m.node_id] = m.file_path + + # Build client_id -> filename lookup for HTTP_CALLS source_file. + _file_by_client_id: dict[str, str] = {row.id: row.filename for row in tables.client_rows} + + # Build producer_id -> filename lookup for ASYNC_CALLS source_file. + _file_by_producer_id: dict[str, str] = {row.id: row.filename for row in tables.producer_rows} + for row in tables.routes_rows: conn.execute(_CREATE_ROUTE, { "id": row.id, @@ -2834,6 +2941,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_EXPOSES, { "sid": row.symbol_id, "rid": row.route_id, + "source_file": _file_by_node_id.get(row.symbol_id, ""), "confidence": row.confidence, "strategy": row.strategy, }) @@ -2843,6 +2951,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_DECLARES_CLIENT, { "sid": row.symbol_id, "cid": row.client_id, + "source_file": _file_by_node_id.get(row.symbol_id, ""), "confidence": row.confidence, "strategy": row.strategy, }) @@ -2852,6 +2961,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_DECLARES_PRODUCER, { "sid": row.symbol_id, "pid": row.producer_id, + "source_file": _file_by_node_id.get(row.symbol_id, ""), "confidence": row.confidence, "strategy": row.strategy, }) @@ -2859,6 +2969,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_HTTP_CALL, { "cid": row.client_id, "rid": row.route_id, + "source_file": _file_by_client_id.get(row.client_id, ""), "confidence": row.confidence, "strategy": row.strategy, "method_call": row.method_call, @@ -2869,6 +2980,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_ASYNC_CALL, { "pid": row.producer_id, "rid": row.route_id, + "source_file": _file_by_producer_id.get(row.producer_id, ""), "confidence": row.confidence, "strategy": row.strategy, "direction": row.direction, diff --git a/tests/test_incremental_graph.py b/tests/test_incremental_graph.py new file mode 100644 index 00000000..e33bdc1d --- /dev/null +++ b/tests/test_incremental_graph.py @@ -0,0 +1,225 @@ +"""Tests for incremental graph rebuild functionality (PR-G1). + +Tests cover FileHashTracker behavior and edge schema source_file column. +""" +from __future__ import annotations + +from pathlib import Path + +import kuzu + +from ast_java import ONTOLOGY_VERSION +from build_ast_graph import FileHashTracker +from path_filtering import LayeredIgnore + + +class TestFileHashTracker: + """Test FileHashTracker change detection and persistence.""" + + def test_file_hash_tracker_detects_added_file(self, tmp_path: Path) -> None: + """Empty hash store, one file in source → added populated.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + added, changed, removed = tracker.detect_changes(source_root, ignore=ignore) + + assert len(added) == 1 + assert "Test.java" in added + assert len(changed) == 0 + assert len(removed) == 0 + + def test_file_hash_tracker_detects_changed_file(self, tmp_path: Path) -> None: + """Stored hash differs from current → changed populated.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"Test.java"}, source_root) + tracker.save() + + # Modify the file + test_file.write_text("class Test { String x; }", encoding="utf-8") + + tracker2 = FileHashTracker(index_dir) + tracker2.load() + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + added, changed, removed = tracker2.detect_changes(source_root, ignore=ignore) + + assert len(added) == 0 + assert len(changed) == 1 + assert "Test.java" in changed + assert len(removed) == 0 + + def test_file_hash_tracker_detects_removed_file(self, tmp_path: Path) -> None: + """Hash store has entry but file gone → removed populated.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"Test.java"}, source_root) + tracker.save() + + # Remove the file + test_file.unlink() + + tracker2 = FileHashTracker(index_dir) + tracker2.load() + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + added, changed, removed = tracker2.detect_changes(source_root, ignore=ignore) + + assert len(added) == 0 + assert len(changed) == 0 + assert len(removed) == 1 + assert "Test.java" in removed + + def test_file_hash_tracker_no_changes(self, tmp_path: Path) -> None: + """Identical hashes → all three sets empty.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"Test.java"}, source_root) + tracker.save() + + tracker2 = FileHashTracker(index_dir) + tracker2.load() + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + added, changed, removed = tracker2.detect_changes(source_root, ignore=ignore) + + assert len(added) == 0 + assert len(changed) == 0 + assert len(removed) == 0 + + def test_file_hash_tracker_save_and_load_roundtrip(self, tmp_path: Path) -> None: + """Save hashes, new tracker instance loads same data.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file1 = source_root / "A.java" + test_file1.write_text("class A {}", encoding="utf-8") + test_file2 = source_root / "B.java" + test_file2.write_text("class B {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"A.java", "B.java"}, source_root) + tracker.save() + + tracker2 = FileHashTracker(index_dir) + tracker2.load() + + assert len(tracker2._hashes) == 2 + assert "A.java" in tracker2._hashes + assert "B.java" in tracker2._hashes + + def test_file_hash_tracker_atomic_save(self, tmp_path: Path) -> None: + """.graph_hashes.json.tmp not left behind on successful save.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"Test.java"}, source_root) + tracker.save() + + # Verify the tmp file is not left behind + tmp_file = index_dir / ".graph_hashes.json.tmp" + assert not tmp_file.exists() + + # Verify the actual file exists + actual_file = index_dir / ".graph_hashes.json" + assert actual_file.exists() + + +class TestEdgeSchema: + """Test edge schema has source_file column and correct values.""" + + def test_edge_schema_has_source_file(self, tmp_path: Path) -> None: + """Build a full graph, query each edge table for source_file column existence and non-empty values.""" + from _builders import build_kuzu_full_into + + corpus_root = Path(__file__).parent / "bank-chat-system" + db_path = tmp_path / "test_graph.kuzu" + build_kuzu_full_into(corpus_root, db_path) + + conn = kuzu.Connection(kuzu.Database(str(db_path), read_only=True)) + + # All 12 edge tables should have source_file column + edge_tables = [ + "EXTENDS", "IMPLEMENTS", "INJECTS", "DECLARES", "OVERRIDES", + "CALLS", "UNRESOLVED_AT", "EXPOSES", "DECLARES_CLIENT", + "DECLARES_PRODUCER", "HTTP_CALLS", "ASYNC_CALLS" + ] + + for table in edge_tables: + # Check column exists by querying a sample and accessing source_file + query = f"MATCH ()-[e:{table}]->() RETURN e.source_file LIMIT 1" + result = conn.execute(query) + has_data = result.has_next() + if has_data: + row = result.get_next() + # source_file should be a string + assert row is not None + + def test_source_file_value_matches_symbol_filename(self, tmp_path: Path) -> None: + """For edges originating from Symbol nodes, edge's source_file equals source Symbol's filename.""" + from _builders import build_kuzu_full_into + + corpus_root = Path(__file__).parent / "bank-chat-system" + db_path = tmp_path / "test_graph.kuzu" + build_kuzu_full_into(corpus_root, db_path) + + conn = kuzu.Connection(kuzu.Database(str(db_path), read_only=True)) + + # Test CALLS edge: source_file should match caller Symbol's filename + query = """ + MATCH (caller:Symbol)-[e:CALLS]->(callee:Symbol) + RETURN caller.filename, e.source_file + LIMIT 1 + """ + result = conn.execute(query) + if result.has_next(): + caller_filename, edge_source_file = result.get_next() + assert caller_filename == edge_source_file + + # Test EXTENDS edge + query = """ + MATCH (sub:Symbol)-[e:EXTENDS]->(super:Symbol) + RETURN sub.filename, e.source_file + LIMIT 1 + """ + result = conn.execute(query) + if result.has_next(): + sub_filename, edge_source_file = result.get_next() + assert sub_filename == edge_source_file + + def test_ontology_version_bumped_to_17(self) -> None: + """ONTOLOGY_VERSION == 17.""" + assert ONTOLOGY_VERSION == 17 From aa541e290b12520a3721b06fb39b8fc97de54914 Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 7 Jun 2026 16:27:01 +0300 Subject: [PATCH 2/9] fix: pass2_edges in test setup and crash marker cleanup in fallback path Tests that verify EXTENDS edge dependent expansion were missing pass2_edges() calls in their setup, resulting in no EXTENDS edges being written to the graph. Also fixed crash marker not being cleaned up in the _fallback_to_full code path and invalid Kuzu SHOW_TABLES syntax in test_incremental_pass5_6_always_global. Co-Authored-By: Claude Opus 4.7 --- build_ast_graph.py | 862 ++++++++++++++++++++++++++++++-- tests/test_incremental_graph.py | 561 ++++++++++++++++++++- 2 files changed, 1391 insertions(+), 32 deletions(-) diff --git a/build_ast_graph.py b/build_ast_graph.py index 07db8011..5a862df5 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -401,6 +401,17 @@ class GraphTables: type_role_by_node_id: dict[str, str] = field(default_factory=dict) +@dataclass +class IncrementalResult: + """Result of an incremental graph rebuild.""" + mode: str # "incremental" | "full_fallback" + files_changed: int + files_added: int + files_removed: int + dependents_reprocessed: int + elapsed_sec: float + + class FileHashTracker: """Track content hashes for incremental graph rebuild.""" def __init__(self, index_dir: Path): @@ -432,8 +443,16 @@ def save(self) -> None: def detect_changes(self, source_root: Path, ignore: LayeredIgnore) -> tuple[set[str], set[str], set[str]]: """Return (added, changed, removed) sets of relative POSIX paths.""" current_files: set[str] = set() + # Resolve source_root to handle symlinks + source_root_resolved = source_root.resolve() for abs_path in iter_java_source_files(source_root, ignore=ignore): - rel_path = abs_path.relative_to(source_root).as_posix() + # Resolve the absolute path and compute relative path + abs_path_resolved = abs_path.resolve() + try: + rel_path = abs_path_resolved.relative_to(source_root_resolved).as_posix() + except ValueError: + # Fallback to using the path as-is if it's not under source_root + rel_path = abs_path.as_posix() current_files.add(rel_path) added: set[str] = set() @@ -474,6 +493,412 @@ def _hash_file(abs_path: Path) -> str: return hasher.hexdigest() +# ---------- incremental rebuild helpers ---------- + + +def _load_existing_types(conn: kuzu.Connection, tables: GraphTables) -> None: + """Load type entries from existing Kuzu graph into tables for cross-file resolution.""" + query = """ + MATCH (s:Symbol) + WHERE s.kind IN ['class', 'interface', 'enum', 'annotation', 'record'] + RETURN s.kind, s.fqn, s.name, s.filename, s.module, s.microservice, s.id + """ + result = conn.execute(query) + while result.has_next(): + row = result.get_next() + # Kuzu returns list, access by index + # Columns: kind, fqn, name, filename, module, microservice, id + kind = row[0] + fqn = row[1] + name = row[2] + filename = row[3] + module = row[4] if len(row) > 4 else "" + microservice = row[5] if len(row) > 5 else "" + node_id = row[6] if len(row) > 6 else "" + + # Create a minimal TypeDecl (full details not needed for cross-file resolution) + decl = TypeDecl(name, kind, fqn) + + package = fqn[: -(len(name) + 1)] if fqn.endswith("." + name) else "" + + entry = TypeIndexEntry( + decl=decl, + file_path=filename, + module=module, + microservice=microservice, + package=package, + outer_fqn=None, # Simplified: assume top-level for loading + node_id=node_id, + ) + tables.types[fqn] = entry + tables.by_simple_name.setdefault(name, []).append(entry) + tables.by_package.setdefault(package, []).append(entry) + + +def _load_existing_members(conn: kuzu.Connection, tables: GraphTables) -> None: + """Load all member entries from existing Kuzu graph into tables.members.""" + query = """ + MATCH (s:Symbol) + WHERE s.kind IN ['method', 'constructor'] + RETURN s.kind, s.name, s.filename, s.signature, s.parent_id, s.fqn, s.id + """ + result = conn.execute(query) + while result.has_next(): + row = result.get_next() + # Kuzu returns list, access by index + # Columns: kind, name, filename, signature, parent_id, fqn, id + kind = row[0] + name = row[1] + filename = row[2] + signature = row[3] if len(row) > 3 else "" + parent_id = row[4] if len(row) > 4 else "" + fqn = row[5] if len(row) > 5 else "" + node_id = row[6] if len(row) > 6 else "" + + # Extract parent_fqn from method's fqn (format: "pkg.Type#method(params)") + parent_fqn = fqn.split("#")[0] if "#" in fqn else "" + + # Create a minimal MethodDecl (full details not needed for resolution) + # MethodDecl(name, return_type, is_constructor=False) + decl = MethodDecl(name, "", kind == "constructor") + decl.signature = signature + + tables.members.append(MemberEntry( + kind=kind, + decl=decl, + parent_id=parent_id, + parent_fqn=parent_fqn, + file_path=filename, + module="", # Not needed for resolution + microservice="", # Not needed for resolution + node_id=node_id, + )) + + +def _load_existing_types_filtered(conn: kuzu.Connection, tables: GraphTables, exclude_files: set[str]) -> None: + """Load type entries from existing Kuzu graph, excluding specified files.""" + if not exclude_files: + # If no files to exclude, don't load anything (all files are being reprocessed) + return + + query = """ + MATCH (s:Symbol) + WHERE s.kind IN ['class', 'interface', 'enum', 'annotation', 'record'] + AND NOT (s.filename IN $exclude_files) + RETURN s.kind, s.fqn, s.name, s.filename, s.module, s.microservice, s.id + """ + result = conn.execute(query, {"exclude_files": list(exclude_files)}) + while result.has_next(): + row = result.get_next() + # Kuzu returns list, access by index + # Columns: kind, fqn, name, filename, module, microservice, id + kind = row[0] + fqn = row[1] + name = row[2] + filename = row[3] + module = row[4] if len(row) > 4 else "" + microservice = row[5] if len(row) > 5 else "" + node_id = row[6] if len(row) > 6 else "" + + # Create a minimal TypeDecl (full details not needed for cross-file resolution) + decl = TypeDecl(name, kind, fqn) + + package = fqn[: -(len(name) + 1)] if fqn.endswith("." + name) else "" + + entry = TypeIndexEntry( + decl=decl, + file_path=filename, + module=module, + microservice=microservice, + package=package, + outer_fqn=None, # Simplified: assume top-level for loading + node_id=node_id, + ) + tables.types[fqn] = entry + tables.by_simple_name.setdefault(name, []).append(entry) + tables.by_package.setdefault(package, []).append(entry) + + +def _load_existing_members_filtered(conn: kuzu.Connection, tables: GraphTables, exclude_files: set[str]) -> None: + """Load member entries from existing Kuzu graph, excluding specified files.""" + if not exclude_files: + # If no files to exclude, don't load anything (all files are being reprocessed) + return + + query = """ + MATCH (s:Symbol) + WHERE s.kind IN ['method', 'constructor'] + AND NOT (s.filename IN $exclude_files) + RETURN s.kind, s.name, s.filename, s.signature, s.parent_id, s.fqn, s.id + """ + result = conn.execute(query, {"exclude_files": list(exclude_files)}) + while result.has_next(): + row = result.get_next() + # Kuzu returns list, access by index + # Columns: kind, name, filename, signature, parent_id, fqn, id + kind = row[0] + name = row[1] + filename = row[2] + signature = row[3] if len(row) > 3 else "" + parent_id = row[4] if len(row) > 4 else "" + fqn = row[5] if len(row) > 5 else "" + node_id = row[6] if len(row) > 6 else "" + + # Extract parent_fqn from method's fqn (format: "pkg.Type#method(params)") + parent_fqn = fqn.split("#")[0] if "#" in fqn else "" + + # Create a minimal MethodDecl (full details not needed for resolution) + # MethodDecl(name, return_type, is_constructor=False) + decl = MethodDecl(name, "", kind == "constructor") + decl.signature = signature + + tables.members.append(MemberEntry( + kind=kind, + decl=decl, + parent_id=parent_id, + parent_fqn=parent_fqn, + file_path=filename, + module="", # Not needed for resolution + microservice="", # Not needed for resolution + node_id=node_id, + )) + + +def _find_dependents(conn: kuzu.Connection, changed_node_ids: set[str]) -> set[str]: + """Find files whose nodes have edges pointing into changed nodes. Returns set of filenames.""" + dependent_files: set[str] = set() + + # Query each Symbol-to-Symbol edge table for incoming edges + edge_types = ["EXTENDS", "IMPLEMENTS", "INJECTS", "CALLS", "DECLARES", "OVERRIDES"] + + for edge_type in edge_types: + # Use label(e) = 'TYPE' pattern (not label(e) IN $list due to Kuzu pitfalls) + # We need to build a query with OR conditions + if edge_type == "EXTENDS": + query = """ + MATCH (src:Symbol)-[e:EXTENDS]->(dst:Symbol) + WHERE dst.id IN $changed_ids + RETURN DISTINCT src.filename + """ + elif edge_type == "IMPLEMENTS": + query = """ + MATCH (src:Symbol)-[e:IMPLEMENTS]->(dst:Symbol) + WHERE dst.id IN $changed_ids + RETURN DISTINCT src.filename + """ + elif edge_type == "INJECTS": + query = """ + MATCH (src:Symbol)-[e:INJECTS]->(dst:Symbol) + WHERE dst.id IN $changed_ids + RETURN DISTINCT src.filename + """ + elif edge_type == "CALLS": + query = """ + MATCH (src:Symbol)-[e:CALLS]->(dst:Symbol) + WHERE dst.id IN $changed_ids + RETURN DISTINCT src.filename + """ + elif edge_type == "DECLARES": + query = """ + MATCH (src:Symbol)-[e:DECLARES]->(dst:Symbol) + WHERE dst.id IN $changed_ids + RETURN DISTINCT src.filename + """ + elif edge_type == "OVERRIDES": + query = """ + MATCH (src:Symbol)-[e:OVERRIDES]->(dst:Symbol) + WHERE dst.id IN $changed_ids + RETURN DISTINCT src.filename + """ + else: + continue + + result = conn.execute(query, {"changed_ids": list(changed_node_ids)}) + while result.has_next(): + row = result.get_next() + filename = row[0] # Kuzu returns list, not dict + if filename: # Skip phantom nodes (filename = "") + dependent_files.add(filename) + + return dependent_files + + +def _delete_file_scope(conn: kuzu.Connection, filenames: set[str]) -> None: + """Delete all nodes and edges originating from the given files. + + Skip phantom nodes (filename=""). For Symbol-to-Symbol and UNRESOLVED_AT + edge tables only. Client/Producer/Route edges are handled separately in + pass 5-6 global rebuild. + """ + for filename in filenames: + # Delete Symbol-to-Symbol edges by source_file + edge_tables = ["EXTENDS", "IMPLEMENTS", "INJECTS", "CALLS", "DECLARES", "OVERRIDES", "UNRESOLVED_AT"] + for edge_type in edge_tables: + query = f""" + MATCH (src)-[e:{edge_type}]->(dst) + WHERE e.source_file = $filename + DELETE e + """ + conn.execute(query, {"filename": filename}) + + # Collect Symbol node IDs for this file (for UnresolvedCallSite cleanup) + symbol_ids_query = """ + MATCH (s:Symbol) + WHERE s.filename = $filename + RETURN s.id + """ + symbol_ids = [] + result = conn.execute(symbol_ids_query, {"filename": filename}) + while result.has_next(): + row = result.get_next() + symbol_ids.append(row[0]) # Kuzu returns list, not dict + + # Delete UnresolvedCallSite nodes whose caller_id is in the collected set + if symbol_ids: + unresolved_query = """ + MATCH (u:UnresolvedCallSite) + WHERE u.caller_id IN $symbol_ids + DELETE u + """ + conn.execute(unresolved_query, {"symbol_ids": symbol_ids}) + + # Delete Symbol nodes + delete_symbols_query = """ + MATCH (s:Symbol) + WHERE s.filename = $filename + DELETE s + """ + conn.execute(delete_symbols_query, {"filename": filename}) + + # Delete Route nodes (note: EXPOSES edges are deleted in pass 5-6 global rebuild) + delete_routes_query = """ + MATCH (r:Route) + WHERE r.filename = $filename + DELETE r + """ + conn.execute(delete_routes_query, {"filename": filename}) + + # Delete Client nodes + delete_clients_query = """ + MATCH (c:Client) + WHERE c.filename = $filename + DELETE c + """ + conn.execute(delete_clients_query, {"filename": filename}) + + # Delete Producer nodes + delete_producers_query = """ + MATCH (p:Producer) + WHERE p.filename = $filename + DELETE p + """ + conn.execute(delete_producers_query, {"filename": filename}) + + +def _scoped_write(conn: kuzu.Connection, tables: GraphTables, *, project_root: Path, meta_chain: dict[str, frozenset[str]] | None) -> None: + """Write nodes and edges to existing Kuzu database without drop/create schema. + + Like write_kuzu() but without _drop_all()/_create_schema(). The caller is + responsible for calling _populate_declares_rows() and _populate_overrides_rows() + before invoking this function. + + Uses MERGE instead of CREATE to handle cases where nodes already exist. + """ + t0 = time.time() + _write_nodes_merge( + conn, + tables, + project_root=project_root, + meta_chain=meta_chain, + ) + elapsed = time.time() - t0 + if elapsed > 0.1: # Only log if significant + _verbose_stderr_line(f"[graph] scoped write · nodes written in {elapsed:.2f}s") + + t1 = time.time() + _write_edges(conn, tables) + elapsed = time.time() - t1 + if elapsed > 0.1: + _verbose_stderr_line(f"[graph] scoped write · edges written in {elapsed:.2f}s") + + t2 = time.time() + _write_routes_and_exposes(conn, tables) + elapsed = time.time() - t2 + if elapsed > 0.1: + _verbose_stderr_line(f"[graph] scoped write · routes/exposes written in {elapsed:.2f}s") + + +def _write_nodes_merge( + conn: kuzu.Connection, + tables: GraphTables, + *, + project_root: Path, + meta_chain: dict[str, frozenset[str]] | None, +) -> None: + """Write nodes to existing Kuzu database using MERGE to handle existing nodes. + + Like _write_nodes but uses MERGE instead of CREATE to handle cases where + nodes might already exist in the database. + """ + overrides = load_brownfield_overrides(project_root) + try: + prs = str(project_root.resolve()) + except OSError: + prs = str(project_root) + tables.cross_service_resolution = _load_config_cross_service_resolution(prs) + mch = meta_chain + # packages + for pkg, pid in tables.packages.items(): + conn.execute(_MERGE_SYMBOL, _node_row( + id=pid, kind="package", name=pkg.rsplit(".", 1)[-1], fqn=pkg, package=pkg, + )) + # files + for path, fid in tables.files.items(): + conn.execute(_MERGE_SYMBOL, _node_row( + id=fid, kind="file", name=Path(path).name, fqn=path, filename=path, + )) + # types + for entry in tables.types.values(): + d = entry.decl + role, capabilities = resolve_role_and_capabilities( + d, + overrides=overrides, + meta_chain=mch, + ) + tables.type_role_by_node_id[entry.node_id] = role + conn.execute(_MERGE_SYMBOL, _node_row( + id=entry.node_id, kind=d.kind, name=d.name, fqn=d.fqn, + package=entry.package, + module=entry.module, microservice=entry.microservice, + filename=entry.file_path, + start_line=d.start_line, end_line=d.end_line, + start_byte=d.start_byte, end_byte=d.end_byte, + modifiers=list(d.modifiers), + annotations=[a.name for a in d.annotations], + capabilities=capabilities, + role=role, + signature="", + parent_id=tables.types[entry.outer_fqn].node_id if entry.outer_fqn and entry.outer_fqn in tables.types else "", + )) + # members (methods / constructors) + for m in tables.members: + conn.execute(_MERGE_SYMBOL, _node_row( + id=m.node_id, kind=m.kind, name=m.decl.name, + fqn=f"{m.parent_fqn}#{m.decl.signature}", + package=tables.types[m.parent_fqn].package if m.parent_fqn in tables.types else "", + module=m.module, microservice=m.microservice, + filename=m.file_path, + start_line=m.decl.start_line, end_line=m.decl.end_line, + start_byte=m.decl.start_byte, end_byte=m.decl.end_byte, + modifiers=list(m.decl.modifiers), + annotations=[a.name for a in m.decl.annotations], + signature=m.decl.signature, parent_id=m.parent_id, + )) + # phantoms + for pid, row in tables.phantoms.items(): + conn.execute(_MERGE_SYMBOL, row) + + # ---------- file walk (see `path_filtering.iter_java_source_files`) ---------- @@ -534,8 +959,15 @@ def _register_type( return entry -def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool) -> dict[str, JavaFileAst]: - """Walk files, parse them, populate node indexes. Returns path -> AST.""" +def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: set[str] | None = None) -> dict[str, JavaFileAst]: + """Walk files, parse them, populate node indexes. Returns path -> AST. + + Args: + root: Source root directory. + tables: GraphTables to populate. + verbose: Whether to emit progress output. + scope_files: Optional set of relative POSIX paths to parse. If None, parse all files. + """ asts: dict[str, JavaFileAst] = {} ignore = LayeredIgnore(root) t0 = time.time() @@ -553,6 +985,15 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool) -> dict[str, if verbose and slow_sec > 0: time.sleep(slow_sec) for p in iter_java_source_files(root, ignore=ignore): + # Skip files not in scope (if scope is provided) + try: + rel = p.resolve().relative_to(root.resolve()).as_posix() + except ValueError: + rel = p.as_posix() + if scope_files is not None and rel not in scope_files: + continue + # Skip files not in scope (if scope is provided) + # (rel is computed above before the scope check) n_files += 1 try: content = p.read_bytes() @@ -561,10 +1002,6 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool) -> dict[str, continue if not content.strip(): continue - try: - rel = p.resolve().relative_to(root.resolve()).as_posix() - except ValueError: - rel = p.as_posix() try: ast = parse_java(content, filename=rel, verbose=verbose) except Exception: @@ -2611,6 +3048,17 @@ def _node_row(**kwargs) -> dict: "role: $role, signature: $signature, parent_id: $parent_id, resolved: $resolved})" ) +_MERGE_SYMBOL = ( + "MERGE (n:Symbol {id: $id}) " + "SET n.kind = $kind, n.name = $name, n.fqn = $fqn, " + "n.package = $package, n.module = $module, n.microservice = $microservice, " + "n.filename = $filename, " + "n.start_line = $start_line, n.end_line = $end_line, " + "n.start_byte = $start_byte, n.end_byte = $end_byte, " + "n.modifiers = $modifiers, n.annotations = $annotations, n.capabilities = $capabilities, " + "n.role = $role, n.signature = $signature, n.parent_id = $parent_id, n.resolved = $resolved" +) + def _write_nodes( conn: kuzu.Connection, @@ -3041,28 +3489,29 @@ def _write_meta(conn: kuzu.Connection, tables: GraphTables, source_root: Path) - clients_by_kind = dict(sorted(client_stats.clients_by_kind.items())) producers_by_kind = dict(sorted(producer_stats.producers_by_kind.items())) conn.execute( - "CREATE (:GraphMeta {key: $k, ontology_version: $ov, built_at: $t, " - "source_root: $sr, counts_json: $cj, parse_errors: $pe, " - "routes_total: $routes_total, exposes_total: $exposes_total, " - "routes_by_framework: $routes_by_framework, routes_resolved_pct: $routes_resolved_pct, " - "routes_from_brownfield_pct: $routes_from_brownfield_pct, routes_by_layer: $routes_by_layer, " - "clients_total: $clients_total, declares_client_total: $declares_client_total, " - "clients_by_kind: $clients_by_kind, " - "producers_total: $producers_total, declares_producer_total: $declares_producer_total, " - "producers_by_kind: $producers_by_kind, " - "http_calls_total: $http_calls_total, async_calls_total: $async_calls_total, " - "http_calls_by_strategy: $http_calls_by_strategy, async_calls_by_strategy: $async_calls_by_strategy, " - "http_calls_resolved_pct: $http_calls_resolved_pct, async_calls_resolved_pct: $async_calls_resolved_pct, " - "http_clients_from_brownfield_pct: $http_clients_from_brownfield_pct, " - "async_producers_from_brownfield_pct: $async_producers_from_brownfield_pct, " - "http_calls_match_breakdown: $http_calls_match_breakdown, " - "async_calls_match_breakdown: $async_calls_match_breakdown, " - "cross_service_calls_total: $cross_service_calls_total, " - "pass3_skipped_cross_service: $pass3_skipped_cross_service, " - "pass3_unresolved_phantom_receiver: $pass3_unresolved_phantom_receiver, " - "pass3_unresolved_chained: $pass3_unresolved_chained, " - "pass4_exposes_suppressed_feign: $pass4_exposes_suppressed_feign, " - "cross_service_resolution: $cross_service_resolution})", + "MERGE (m:GraphMeta {key: $k}) " + "SET m.ontology_version = $ov, m.built_at = $t, " + "m.source_root = $sr, m.counts_json = $cj, m.parse_errors = $pe, " + "m.routes_total = $routes_total, m.exposes_total = $exposes_total, " + "m.routes_by_framework = $routes_by_framework, m.routes_resolved_pct = $routes_resolved_pct, " + "m.routes_from_brownfield_pct = $routes_from_brownfield_pct, m.routes_by_layer = $routes_by_layer, " + "m.clients_total = $clients_total, m.declares_client_total = $declares_client_total, " + "m.clients_by_kind = $clients_by_kind, " + "m.producers_total = $producers_total, m.declares_producer_total = $declares_producer_total, " + "m.producers_by_kind = $producers_by_kind, " + "m.http_calls_total = $http_calls_total, m.async_calls_total = $async_calls_total, " + "m.http_calls_by_strategy = $http_calls_by_strategy, m.async_calls_by_strategy = $async_calls_by_strategy, " + "m.http_calls_resolved_pct = $http_calls_resolved_pct, m.async_calls_resolved_pct = $async_calls_resolved_pct, " + "m.http_clients_from_brownfield_pct = $http_clients_from_brownfield_pct, " + "m.async_producers_from_brownfield_pct = $async_producers_from_brownfield_pct, " + "m.http_calls_match_breakdown = $http_calls_match_breakdown, " + "m.async_calls_match_breakdown = $async_calls_match_breakdown, " + "m.cross_service_calls_total = $cross_service_calls_total, " + "m.pass3_skipped_cross_service = $pass3_skipped_cross_service, " + "m.pass3_unresolved_phantom_receiver = $pass3_unresolved_phantom_receiver, " + "m.pass3_unresolved_chained = $pass3_unresolved_chained, " + "m.pass4_exposes_suppressed_feign = $pass4_exposes_suppressed_feign, " + "m.cross_service_resolution = $cross_service_resolution", { "k": "graph", "ov": ONTOLOGY_VERSION, @@ -3102,6 +3551,354 @@ def _write_meta(conn: kuzu.Connection, tables: GraphTables, source_root: Path) - ) +def incremental_rebuild( + source_root: Path, + kuzu_path: Path, + *, + verbose: bool, + expansion_cap: int = 50, +) -> IncrementalResult: + """Incrementally rebuild the Kuzu graph, processing only changed files and their dependents. + + Returns IncrementalResult with statistics about the rebuild. + Falls back to full rebuild if: + - No previous graph exists + - Ontology version < 17 (missing source_file on edges) + - Crash marker exists (previous incremental run failed) + - Dependent expansion exceeds expansion_cap + """ + t_start = time.time() + + # Step 1: Load existing graph and detect changes + if not kuzu_path.exists(): + if verbose: + _verbose_stderr_line("[increment] no existing graph; falling back to full rebuild") + # Fall back to full rebuild + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=verbose) + pass2_edges(tables, asts, verbose=verbose) + pass3_calls(tables, asts, verbose=verbose) + pass4_routes(tables, asts, source_root=source_root, verbose=verbose) + pass5_imperative_edges(tables, asts, source_root=source_root, verbose=verbose) + pass6_match_edges(tables, verbose=verbose) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=verbose) + + # Initialize hash tracker + index_dir = kuzu_path.parent + tracker = FileHashTracker(index_dir) + tracker.load() + ignore = LayeredIgnore(source_root) + all_files = set() + # Resolve source_root to handle symlinks + source_root_resolved = source_root.resolve() + for p in iter_java_source_files(source_root, ignore=ignore): + # Resolve the absolute path and compute relative path + p_resolved = p.resolve() + try: + rel_path = p_resolved.relative_to(source_root_resolved).as_posix() + except ValueError: + # Fallback to using the path as-is if it's not under source_root + rel_path = p.as_posix() + all_files.add(rel_path) + tracker.update(all_files, source_root) + tracker.save() + + return IncrementalResult( + mode="full_fallback", + files_changed=0, + files_added=len(all_files), + files_removed=0, + dependents_reprocessed=0, + elapsed_sec=time.time() - t_start, + ) + + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + + # Check ontology version + try: + meta_result = conn.execute("MATCH (m:GraphMeta) RETURN m.ontology_version AS version") + if meta_result.has_next(): + row = meta_result.get_next() + version = row[0] if row else 0 + if version < 17: + if verbose: + _verbose_stderr_line(f"[increment] ontology version {version} < 17; falling back to full rebuild") + conn.close() + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + except Exception as e: + if verbose: + _verbose_stderr_line(f"[increment] failed to read ontology version: {e}; falling back to full rebuild") + conn.close() + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + + index_dir = kuzu_path.parent + tracker = FileHashTracker(index_dir) + tracker.load() + + ignore = LayeredIgnore(source_root) + added, changed, removed = tracker.detect_changes(source_root, ignore=ignore) + + changed_files = added | changed | removed + + if not changed_files: + if verbose: + _verbose_stderr_line("[increment] no changes detected; no-op") + conn.close() + return IncrementalResult( + mode="incremental", + files_changed=0, + files_added=0, + files_removed=0, + dependents_reprocessed=0, + elapsed_sec=time.time() - t_start, + ) + + if verbose: + _verbose_stderr_line(f"[increment] detected {len(added)} added, {len(changed)} changed, {len(removed)} removed files") + + # Step 2: Crash marker check + crash_marker_path = index_dir / ".graph_increment_in_progress" + if crash_marker_path.exists(): + if verbose: + _verbose_stderr_line("[increment] crash marker exists; falling back to full rebuild") + conn.close() + crash_marker_path.unlink(missing_ok=True) + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + + # Write crash marker + crash_marker_path.write_text("", encoding="utf-8") + + try: + # Step 3: Dependent expansion + # Collect node IDs for changed files + changed_node_ids: set[str] = set() + for filename in changed_files: + query = "MATCH (s:Symbol) WHERE s.filename = $filename RETURN s.id" + result = conn.execute(query, {"filename": filename}) + while result.has_next(): + row = result.get_next() + changed_node_ids.add(row[0]) # Kuzu returns list, not dict + + # Find dependents + dependent_files = _find_dependents(conn, changed_node_ids) + + # Union changed files with dependents + scope_files = changed_files | dependent_files + + if len(scope_files) > expansion_cap: + if verbose: + _verbose_stderr_line(f"[increment] dependent expansion cap ({expansion_cap}) exceeded ({len(scope_files)} files); falling back to full rebuild") + conn.close() + crash_marker_path.unlink(missing_ok=True) + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + + if verbose: + _verbose_stderr_line(f"[increment] processing {len(scope_files)} files ({len(changed_files)} changed + {len(dependent_files)} dependents)") + + # Step 4: Scoped deletion + if verbose: + _verbose_stderr_line("[increment] deleting outdated nodes and edges") + _delete_file_scope(conn, scope_files) + + # Force deletion to be applied by running a dummy query + conn.execute("MATCH (s:Symbol) RETURN count(*)") + + # Step 5: Scoped pass 1-4 + if verbose: + _verbose_stderr_line("[increment] rebuilding scoped files (passes 1-4)") + + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=verbose, scope_files=scope_files) + + # Load existing types and members for cross-file resolution (only from unchanged files) + _load_existing_types_filtered(conn, tables, exclude_files=scope_files) + _load_existing_members_filtered(conn, tables, exclude_files=scope_files) + + pass2_edges(tables, asts, verbose=verbose) + pass3_calls(tables, asts, verbose=verbose) + pass4_routes(tables, asts, source_root=source_root, verbose=verbose) + + # Populate declares and overrides rows + _populate_declares_rows(tables) + _populate_overrides_rows(tables) + + # Write scoped nodes and edges + meta_chain = collect_annotation_meta_chain(str(source_root.resolve())) + _scoped_write(conn, tables, project_root=source_root, meta_chain=meta_chain) + + # Step 6: Global pass 5-6 + if verbose: + _verbose_stderr_line("[increment] running global passes 5-6") + + # Load all members for pass 5 + tables_for_global = GraphTables() + _load_existing_members(conn, tables_for_global) + + # Rebuild asts for global scope (need for pass5/6) + global_asts = pass1_parse(source_root, tables_for_global, verbose=verbose) + + pass5_imperative_edges(tables_for_global, global_asts, source_root=source_root, verbose=verbose) + + # Delete existing Client, Producer, and their edges + conn.execute("MATCH (c:Client) DETACH DELETE c") + conn.execute("MATCH (p:Producer) DETACH DELETE p") + + pass6_match_edges(tables_for_global, verbose=verbose) + + # Write Client, Producer, and cross-service edges + _write_clients_producers_and_calls(conn, tables_for_global) + + # Step 7: Update hash store and metadata + if verbose: + _verbose_stderr_line("[increment] updating hash store and metadata") + + # Update hashes for processed files + tracker.update(scope_files, source_root) + + # Remove hashes for deleted files + for filename in removed: + if filename in tracker._hashes: + del tracker._hashes[filename] + + tracker.save() + + # Update GraphMeta + _write_meta(conn, tables_for_global, source_root) + + # Remove crash marker + crash_marker_path.unlink(missing_ok=True) + + conn.close() + + elapsed = time.time() - t_start + if verbose: + _verbose_stderr_line(f"[increment] completed in {elapsed:.2f}s") + + return IncrementalResult( + mode="incremental", + files_changed=len(changed), + files_added=len(added), + files_removed=len(removed), + dependents_reprocessed=len(dependent_files), + elapsed_sec=elapsed, + ) + + except Exception as e: + # On error, remove crash marker and fall back to full rebuild + if verbose: + _verbose_stderr_line(f"[increment] error during incremental rebuild: {e}; falling back to full rebuild") + conn.close() + crash_marker_path.unlink(missing_ok=True) + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + + +def _fallback_to_full(source_root: Path, kuzu_path: Path, verbose: bool, t_start: float) -> IncrementalResult: + """Fallback to full rebuild.""" + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=verbose) + pass2_edges(tables, asts, verbose=verbose) + pass3_calls(tables, asts, verbose=verbose) + pass4_routes(tables, asts, source_root=source_root, verbose=verbose) + pass5_imperative_edges(tables, asts, source_root=source_root, verbose=verbose) + pass6_match_edges(tables, verbose=verbose) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=verbose) + + # Initialize hash tracker + index_dir = kuzu_path.parent + tracker = FileHashTracker(index_dir) + tracker.load() + ignore = LayeredIgnore(source_root) + all_files = set() + # Resolve source_root to handle symlinks + source_root_resolved = source_root.resolve() + for p in iter_java_source_files(source_root, ignore=ignore): + # Resolve the absolute path and compute relative path + p_resolved = p.resolve() + try: + rel_path = p_resolved.relative_to(source_root_resolved).as_posix() + except ValueError: + # Fallback to using the path as-is if it's not under source_root + rel_path = p.as_posix() + all_files.add(rel_path) + tracker.update(all_files, source_root) + tracker.save() + + return IncrementalResult( + mode="full_fallback", + files_changed=0, + files_added=len(all_files), + files_removed=0, + dependents_reprocessed=0, + elapsed_sec=time.time() - t_start, + ) + + +def _write_clients_producers_and_calls(conn: kuzu.Connection, tables: GraphTables) -> None: + """Write Client, Producer, and cross-service edges to Kuzu.""" + # Build node_id lookup for members and types + member_by_id = {m.node_id: m for m in tables.members} + + # Write clients + for row in tables.client_rows: + conn.execute(_CREATE_CLIENT, { + "id": row.id, + "symbol_id": row.symbol_id, + "filename": row.filename, + "kind": row.kind, + "target": row.target, + "target_type": row.target_type, + }) + + # Write producers + for row in tables.producer_rows: + conn.execute(_CREATE_PRODUCER, { + "id": row.id, + "symbol_id": row.symbol_id, + "filename": row.filename, + "kind": row.kind, + "topic": row.topic, + }) + + # Write declares_client edges + for row in tables.declares_client_rows: + source_file = member_by_id.get(row.symbol_id, MemberEntry("", None, "", "", "", "", "")).file_path + conn.execute(_CREATE_DECLARES_CLIENT, { + "src": row.symbol_id, + "dst": row.client_id, + "source_file": source_file, + "confidence": row.confidence, + }) + + # Write declares_producer edges + for row in tables.declares_producer_rows: + source_file = member_by_id.get(row.symbol_id, MemberEntry("", None, "", "", "", "", "")).file_path + conn.execute(_CREATE_DECLARES_PRODUCER, { + "src": row.symbol_id, + "dst": row.producer_id, + "source_file": source_file, + "confidence": row.confidence, + }) + + # Write HTTP_CALLS edges + for row in tables.http_call_rows: + conn.execute(_CREATE_HTTP_CALL, { + "src": row.client_id, + "dst": row.route_id, + "confidence": row.confidence, + "source_file": tables.client_rows[[c.id for c in tables.client_rows].index(row.client_id)].filename if any(c.id == row.client_id for c in tables.client_rows) else "", + }) + + # Write ASYNC_CALLS edges + for row in tables.async_call_rows: + conn.execute(_CREATE_ASYNC_CALL, { + "src": row.producer_id, + "dst": row.route_id, + "confidence": row.confidence, + "source_file": tables.producer_rows[[p.id for p in tables.producer_rows].index(row.producer_id)].filename if any(p.id == row.producer_id for p in tables.producer_rows) else "", + }) + + def write_kuzu( db_path: Path, tables: GraphTables, @@ -3167,6 +3964,7 @@ def main() -> int: ), ) parser.add_argument("--verbose", action="store_true") + parser.add_argument("--incremental", action="store_true", help="Run incremental rebuild instead of full rebuild") args = parser.parse_args() root = Path(args.source_root).expanduser().resolve() if args.source_root else Path.cwd().resolve() @@ -3176,6 +3974,12 @@ def main() -> int: kuzu_path = Path(args.kuzu_path).expanduser() if args.kuzu_path else _default_kuzu_path() + if args.incremental: + result = incremental_rebuild(root, kuzu_path, verbose=args.verbose) + if args.verbose: + _verbose_stderr_line(f"[graph] done · mode={result.mode} files_changed={result.files_changed} files_added={result.files_added} files_removed={result.files_removed} dependents={result.dependents_reprocessed} elapsed={result.elapsed_sec:.2f}s") + return 0 + tables = GraphTables() asts = pass1_parse(root, tables, verbose=args.verbose) pass2_edges(tables, asts, verbose=args.verbose) diff --git a/tests/test_incremental_graph.py b/tests/test_incremental_graph.py index e33bdc1d..b529a678 100644 --- a/tests/test_incremental_graph.py +++ b/tests/test_incremental_graph.py @@ -1,6 +1,6 @@ -"""Tests for incremental graph rebuild functionality (PR-G1). +"""Tests for incremental graph rebuild functionality (PR-G1 and PR-G2). -Tests cover FileHashTracker behavior and edge schema source_file column. +Tests cover FileHashTracker behavior, edge schema source_file column, and incremental orchestrator. """ from __future__ import annotations @@ -9,7 +9,7 @@ import kuzu from ast_java import ONTOLOGY_VERSION -from build_ast_graph import FileHashTracker +from build_ast_graph import FileHashTracker, GraphTables, pass1_parse, pass2_edges from path_filtering import LayeredIgnore @@ -223,3 +223,558 @@ def test_source_file_value_matches_symbol_filename(self, tmp_path: Path) -> None def test_ontology_version_bumped_to_17(self) -> None: """ONTOLOGY_VERSION == 17.""" assert ONTOLOGY_VERSION == 17 + + +class TestIncrementalOrchestrator: + """Test incremental rebuild orchestrator (PR-G2).""" + + def test_incremental_single_file_change(self, tmp_path: Path) -> None: + """Change one .java file, run incremental, verify only that file's nodes changed.""" + from build_ast_graph import incremental_rebuild + + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create initial files + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + (source_root / "B.java").write_text("package pkg; class B extends A {}", encoding="utf-8") + + # Initial build + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + assert len(asts) == 2 + + # Build full graph (pass2 needed for EXTENDS edges) + from build_ast_graph import write_kuzu + pass2_edges(tables, asts, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + for rel_path in ["A.java", "B.java"]: + tracker.update({rel_path}, source_root) + tracker.save() + + # Modify A.java + (source_root / "A.java").write_text("package pkg; class A { void foo() {} }", encoding="utf-8") + + # Run incremental + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + assert result.files_changed == 1 + assert result.files_added == 0 + assert result.files_removed == 0 + assert result.dependents_reprocessed >= 1 # B depends on A + + def test_incremental_new_file(self, tmp_path: Path) -> None: + """Add a new .java file, run incremental, verify all new nodes/edges appear.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create initial file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Add new file + (source_root / "B.java").write_text("package pkg; class B {}", encoding="utf-8") + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + assert result.files_changed == 0 + assert result.files_added == 1 + + def test_incremental_deleted_file(self, tmp_path: Path) -> None: + """Remove a .java file from fixture, run incremental, verify orphaned nodes/edges cleaned up.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create initial files + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + (source_root / "B.java").write_text("package pkg; class B {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java", "B.java"}, source_root) + tracker.save() + + # Delete B.java + (source_root / "B.java").unlink() + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + assert result.files_changed == 0 + assert result.files_added == 0 + assert result.files_removed == 1 + + # Verify B's nodes are deleted + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + check_result = conn.execute("MATCH (s:Symbol) WHERE s.fqn = 'pkg.B' RETURN count(*)") + if check_result.has_next(): + count = check_result.get_next()[0] + assert count == 0 + + def test_incremental_phantom_nodes_preserved(self, tmp_path: Path) -> None: + """Run incremental after a change, verify phantom nodes (those with filename = "") are untouched.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create file with external reference + (source_root / "A.java").write_text( + "package pkg; import java.util.List; class A { List list; }", + encoding="utf-8", + ) + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Count phantom nodes before + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + phantom_count_before = 0 + phantom_result = conn.execute("MATCH (s:Symbol) WHERE s.filename = '' RETURN count(*)") + if phantom_result.has_next(): + phantom_count_before = phantom_result.get_next()[0] + + conn.close() + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Modify A.java + (source_root / "A.java").write_text( + "package pkg; import java.util.List; class A { List list; }", + encoding="utf-8", + ) + + # Run incremental + from build_ast_graph import incremental_rebuild + incremental_rebuild(source_root, kuzu_path, verbose=False) + + # Verify phantom nodes still exist + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + phantom_count_after = 0 + phantom_result = conn.execute("MATCH (s:Symbol) WHERE s.filename = '' RETURN count(*)") + if phantom_result.has_next(): + phantom_count_after = phantom_result.get_next()[0] + + assert phantom_count_after >= phantom_count_before + + def test_incremental_dependent_expansion(self, tmp_path: Path) -> None: + """Change a base class, verify that files with EXTENDS/IMPLEMENTS edges into it are also reprocessed.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create files with inheritance + (source_root / "Base.java").write_text("package pkg; class Base {}", encoding="utf-8") + (source_root / "Derived.java").write_text( + "package pkg; class Derived extends Base {}", encoding="utf-8" + ) + + # Initial build (pass2 needed for EXTENDS edges) + from build_ast_graph import write_kuzu + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + pass2_edges(tables, asts, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"Base.java", "Derived.java"}, source_root) + tracker.save() + + # Modify Base.java + (source_root / "Base.java").write_text( + "package pkg; class Base { void foo() {} }", encoding="utf-8" + ) + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + # Derived.java should be reprocessed due to EXTENDS edge + assert result.dependents_reprocessed >= 1 + + def test_incremental_expansion_cap_fallback(self, tmp_path: Path) -> None: + """Mock expansion_cap=2, change a widely-used file that has >2 dependents, verify fallback to full rebuild.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create base class and many derived classes + (source_root / "Base.java").write_text("package pkg; class Base {}", encoding="utf-8") + for i in range(5): + (source_root / f"Derived{i}.java").write_text( + f"package pkg; class Derived{i} extends Base {{}}", encoding="utf-8" + ) + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + pass2_edges(tables, asts, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + all_files = {"Base.java"} | {f"Derived{i}.java" for i in range(5)} + tracker.update(all_files, source_root) + tracker.save() + + # Modify Base.java + (source_root / "Base.java").write_text( + "package pkg; class Base { void foo() {} }", encoding="utf-8" + ) + + # Run incremental with low expansion cap + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False, expansion_cap=2) + + # Should fall back to full rebuild due to cap exceeded + assert result.mode == "full_fallback" + + def test_incremental_crash_marker_triggers_fallback(self, tmp_path: Path) -> None: + """Leave .graph_increment_in_progress marker, run incremental, verify full rebuild happens.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Create crash marker + crash_marker = index_dir / ".graph_increment_in_progress" + crash_marker.write_text("", encoding="utf-8") + + # Modify A.java + (source_root / "A.java").write_text( + "package pkg; class A { void foo() {} }", encoding="utf-8" + ) + + # Run incremental - should fall back to full rebuild + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "full_fallback" + # Crash marker should be removed + assert not crash_marker.exists() + + def test_incremental_crash_marker_removed_on_success(self, tmp_path: Path) -> None: + """Run successful incremental, verify marker file is removed.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Modify A.java + (source_root / "A.java").write_text( + "package pkg; class A { void foo() {} }", encoding="utf-8" + ) + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + + # Crash marker should not exist + crash_marker = index_dir / ".graph_increment_in_progress" + assert not crash_marker.exists() + + def test_incremental_no_changes_is_noop(self, tmp_path: Path) -> None: + """Run incremental with no file changes, verify graph is unchanged (same node/edge counts).""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Get node count before + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + count_before_result = conn.execute("MATCH (s:Symbol) RETURN count(*)") + count_before = 0 + if count_before_result.has_next(): + count_before = count_before_result.get_next()[0] + conn.close() + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Run incremental with no changes + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + assert result.files_changed == 0 + + # Verify node count unchanged + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + count_after_result = conn.execute("MATCH (s:Symbol) RETURN count(*)") + count_after = 0 + if count_after_result.has_next(): + count_after = count_after_result.get_next()[0] + conn.close() + + assert count_after == count_before + + def test_incremental_pass5_6_always_global(self, tmp_path: Path) -> None: + """Change a file unrelated to routes, verify Client/Producer/HTTP_CALLS/ASYNC_CALLS are still fully rebuilt.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create files + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Modify A.java + (source_root / "A.java").write_text( + "package pkg; class A { void foo() {} }", encoding="utf-8" + ) + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + + # Verify graph is still valid (Client/Producer tables exist even if empty) + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + + # Check that Client and Producer node tables exist by querying them + client_result = conn.execute("MATCH (c:Client) RETURN count(*)") + producer_result = conn.execute("MATCH (p:Producer) RETURN count(*)") + assert client_result.has_next() + assert producer_result.has_next() + + conn.close() + + def test_load_existing_types_populates_indexes(self, tmp_path: Path) -> None: + """Build full graph, then load existing types into empty GraphTables, verify types/by_simple_name/by_package populated.""" + from build_ast_graph import _load_existing_types + + source_root = tmp_path / "src" + source_root.mkdir() + kuzu_path = tmp_path / "code_graph.kuzu" + + # Create file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Build full graph + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Load existing types into empty tables + new_tables = GraphTables() + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + _load_existing_types(conn, new_tables) + conn.close() + + # Verify types loaded + assert "pkg.A" in new_tables.types + assert len(new_tables.by_simple_name.get("A", [])) == 1 + assert len(new_tables.by_package.get("pkg", [])) == 1 + + def test_find_dependents_returns_incoming_edge_sources(self, tmp_path: Path) -> None: + """Seed graph with EXTENDS edge from file B to file A, change file A, verify _find_dependents returns file B's filename.""" + from build_ast_graph import _find_dependents + + source_root = tmp_path / "src" + source_root.mkdir() + kuzu_path = tmp_path / "code_graph.kuzu" + + # Create files + (source_root / "Base.java").write_text("package pkg; class Base {}", encoding="utf-8") + (source_root / "Derived.java").write_text( + "package pkg; class Derived extends Base {}", encoding="utf-8" + ) + + # Build full graph + from build_ast_graph import write_kuzu + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + pass2_edges(tables, asts, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Get Base node ID + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + base_result = conn.execute("MATCH (s:Symbol) WHERE s.fqn = 'pkg.Base' RETURN s.id") + base_id = None + if base_result.has_next(): + base_id = base_result.get_next()[0] + + assert base_id is not None + + # Find dependents of Base + dependent_files = _find_dependents(conn, {base_id}) + + # Should include Derived.java + assert "Derived.java" in dependent_files + + conn.close() + + def test_delete_file_scope_removes_only_matching(self, tmp_path: Path) -> None: + """Delete scope for one file, verify other files' nodes/edges untouched.""" + from build_ast_graph import _delete_file_scope + + source_root = tmp_path / "src" + source_root.mkdir() + kuzu_path = tmp_path / "code_graph.kuzu" + + # Create files + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + (source_root / "B.java").write_text("package pkg; class B {}", encoding="utf-8") + + # Build full graph + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Get node count before + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + conn.execute("MATCH (s:Symbol) RETURN count(*)") + + # Delete only A.java's scope + _delete_file_scope(conn, {"A.java"}) + + # Verify A's nodes are gone but B's remain + a_result = conn.execute("MATCH (s:Symbol) WHERE s.fqn = 'pkg.A' RETURN count(*)") + b_result = conn.execute("MATCH (s:Symbol) WHERE s.fqn = 'pkg.B' RETURN count(*)") + + a_count = 0 + b_count = 0 + if a_result.has_next(): + a_count = a_result.get_next()[0] + if b_result.has_next(): + b_count = b_result.get_next()[0] + + assert a_count == 0 + assert b_count > 0 + + conn.close() From 724bddcc8ad82b97b68ce9f09f2829a971e860fb Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 7 Jun 2026 16:52:40 +0300 Subject: [PATCH 3/9] fix: restructure _delete_file_scope to handle cross-file edge deletion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Process edge deletions across all scope files before deleting any nodes. The previous per-file loop could fail when file B has an EXTENDS edge to file A — deleting A's nodes first left dangling edges that Kuzu refused to drop. Co-Authored-By: Claude Opus 4.7 --- build_ast_graph.py | 107 +++++++++++++++++++++------------------------ 1 file changed, 49 insertions(+), 58 deletions(-) diff --git a/build_ast_graph.py b/build_ast_graph.py index 5a862df5..247fc86b 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -729,70 +729,61 @@ def _delete_file_scope(conn: kuzu.Connection, filenames: set[str]) -> None: Skip phantom nodes (filename=""). For Symbol-to-Symbol and UNRESOLVED_AT edge tables only. Client/Producer/Route edges are handled separately in pass 5-6 global rebuild. - """ - for filename in filenames: - # Delete Symbol-to-Symbol edges by source_file - edge_tables = ["EXTENDS", "IMPLEMENTS", "INJECTS", "CALLS", "DECLARES", "OVERRIDES", "UNRESOLVED_AT"] - for edge_type in edge_tables: - query = f""" - MATCH (src)-[e:{edge_type}]->(dst) - WHERE e.source_file = $filename - DELETE e - """ - conn.execute(query, {"filename": filename}) - # Collect Symbol node IDs for this file (for UnresolvedCallSite cleanup) - symbol_ids_query = """ - MATCH (s:Symbol) - WHERE s.filename = $filename - RETURN s.id - """ - symbol_ids = [] - result = conn.execute(symbol_ids_query, {"filename": filename}) - while result.has_next(): - row = result.get_next() - symbol_ids.append(row[0]) # Kuzu returns list, not dict - - # Delete UnresolvedCallSite nodes whose caller_id is in the collected set - if symbol_ids: - unresolved_query = """ - MATCH (u:UnresolvedCallSite) - WHERE u.caller_id IN $symbol_ids - DELETE u - """ - conn.execute(unresolved_query, {"symbol_ids": symbol_ids}) - - # Delete Symbol nodes - delete_symbols_query = """ - MATCH (s:Symbol) - WHERE s.filename = $filename - DELETE s + Edges are deleted in batch across all filenames first to avoid Kuzu + "has connected edges" errors when edges from one file point to nodes + in another file within the same scope. + """ + filename_list = list(filenames) + + # Phase 1: Delete ALL edges from ALL scope files at once. + # This avoids ordering issues where file A has an edge from file B + # pointing into it; if we delete A's nodes before B's edges, Kuzu + # raises "has connected edges" errors. + edge_tables = ["EXTENDS", "IMPLEMENTS", "INJECTS", "CALLS", "DECLARES", "OVERRIDES", "UNRESOLVED_AT"] + for edge_type in edge_tables: + query = f""" + MATCH (src)-[e:{edge_type}]->(dst) + WHERE e.source_file IN $filenames + DELETE e """ - conn.execute(delete_symbols_query, {"filename": filename}) + conn.execute(query, {"filenames": filename_list}) - # Delete Route nodes (note: EXPOSES edges are deleted in pass 5-6 global rebuild) - delete_routes_query = """ - MATCH (r:Route) - WHERE r.filename = $filename - DELETE r + # Phase 2: Collect all Symbol node IDs for UnresolvedCallSite cleanup. + symbol_ids: list[str] = [] + symbol_ids_query = """ + MATCH (s:Symbol) + WHERE s.filename IN $filenames + RETURN s.id + """ + result = conn.execute(symbol_ids_query, {"filenames": filename_list}) + while result.has_next(): + row = result.get_next() + symbol_ids.append(row[0]) + + # Delete UnresolvedCallSite nodes whose caller_id is in the collected set + if symbol_ids: + unresolved_query = """ + MATCH (u:UnresolvedCallSite) + WHERE u.caller_id IN $symbol_ids + DELETE u """ - conn.execute(delete_routes_query, {"filename": filename}) + conn.execute(unresolved_query, {"symbol_ids": symbol_ids}) - # Delete Client nodes - delete_clients_query = """ - MATCH (c:Client) - WHERE c.filename = $filename - DELETE c - """ - conn.execute(delete_clients_query, {"filename": filename}) + # Phase 3: Delete Symbol nodes. + delete_symbols_query = """ + MATCH (s:Symbol) + WHERE s.filename IN $filenames + DELETE s + """ + conn.execute(delete_symbols_query, {"filenames": filename_list}) - # Delete Producer nodes - delete_producers_query = """ - MATCH (p:Producer) - WHERE p.filename = $filename - DELETE p - """ - conn.execute(delete_producers_query, {"filename": filename}) + # Phase 4: Delete Route, Client, Producer nodes. + for label in ["Route", "Client", "Producer"]: + conn.execute( + f"MATCH (n:{label}) WHERE n.filename IN $filenames DELETE n", + {"filenames": filename_list}, + ) def _scoped_write(conn: kuzu.Connection, tables: GraphTables, *, project_root: Path, meta_chain: dict[str, frozenset[str]] | None) -> None: From 0e4fa31e9cef1fbcc8f10b92a8dca2b33e1ff659 Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 7 Jun 2026 17:25:43 +0300 Subject: [PATCH 4/9] feat: add run_incremental_graph() wrapper for incremental rebuild (PR-G2) Add subprocess wrapper that passes --incremental flag to build_ast_graph.py. Part of incremental graph rebuild implementation. Co-Authored-By: Claude Opus 4.7 --- java_codebase_rag/pipeline.py | 55 +++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/java_codebase_rag/pipeline.py b/java_codebase_rag/pipeline.py index f1d34270..83262c69 100644 --- a/java_codebase_rag/pipeline.py +++ b/java_codebase_rag/pipeline.py @@ -247,5 +247,60 @@ def run_build_ast_graph( return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s) +def run_incremental_graph( + *, + source_root: Path, + kuzu_path: Path, + verbose: bool, + quiet: bool = False, + env: dict[str, str] | None = None, +) -> subprocess.CompletedProcess[str]: + """Run incremental graph rebuild by passing --incremental flag to build_ast_graph.py.""" + builder = bundle_dir() / "build_ast_graph.py" + if not builder.is_file(): + return subprocess.CompletedProcess( + args=[], + returncode=126, + stdout="", + stderr=f"build_ast_graph.py not found under {builder.parent}", + ) + cmd: list[str] = [ + sys.executable, + str(builder), + "--source-root", + str(source_root), + "--kuzu-path", + str(kuzu_path), + "--incremental", + ] + # Three-tier: --quiet (silent) / default (filtered progress) / --verbose (raw). + # Default passes --verbose so the builder emits per-pass progress lines, + # which the parent filters via _LineFilter. --verbose bypasses the filter. + if verbose or not quiet: + cmd.append("--verbose") + if quiet: + return subprocess.run( + cmd, + cwd=str(source_root), + env=env or os.environ.copy(), + capture_output=True, + text=True, + ) + proc = subprocess.Popen( + cmd, + cwd=str(source_root), + env=env or os.environ.copy(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + out_s, err_s, code = _popen_capturing_stderr(proc, verbose=verbose) + if not verbose: + from java_codebase_rag.cli_format import bold_cyan, styled_check, styled_cross + marker = styled_check() if code == 0 else styled_cross() + print(f"{marker} {bold_cyan('[increment]')} done", file=sys.stderr, flush=True) + return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s) + + def clip(s: str, n: int) -> str: return s[-n:] if len(s) > n else s From 1f35969c4b2f7c369127e9f42702ae2881e6f28d Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 7 Jun 2026 17:25:47 +0300 Subject: [PATCH 5/9] feat: CLI integration for incremental graph rebuild (PR-G3) - Update increment command to run both CocoIndex catch-up and incremental Kuzu graph update - Add --vectors-only flag to preserve old Lance-only behavior - Update CLI help texts and documentation - Emit JSON output from incremental_rebuild for mode detection - Add/update tests for new increment behavior Increment now updates both Lance and Kuzu by default. The old stale warning is only emitted when --vectors-only is used. Co-Authored-By: Claude Opus 4.7 --- README.md | 3 +- build_ast_graph.py | 10 +++ docs/JAVA-CODEBASE-RAG-CLI.md | 11 ++- java_codebase_rag/cli.py | 63 +++++++++++-- tests/test_java_codebase_rag_cli.py | 131 +++++++++++++++++++++++++++- 5 files changed, 206 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 81dea4ef..f880d634 100644 --- a/README.md +++ b/README.md @@ -168,7 +168,7 @@ Run `java-codebase-rag --help` to list grouped subcommands. Operator playbook wi | Group | Subcommand | What it does | |---|---|---| | Lifecycle | `init` | First-time index. Refuses if artifacts already exist. | -| Lifecycle | `increment` | CocoIndex catch-up (Lance only); Kuzu stays stale until `reprocess`. | +| Lifecycle | `increment` | CocoIndex catch-up + incremental Kuzu update. `--vectors-only` for Lance only. | | Lifecycle | `reprocess` | Full Lance + Kuzu rebuild. `--vectors-only` / `--graph-only` for a single phase. | | Lifecycle | `erase` | Delete index artifacts. Requires `--yes` or TTY confirm. | | Introspection | `meta`, `tables`, `diagnose-ignore`, `unresolved-calls` | Health, table listing, ignore-layer diagnostics, receiver-failure call sites. | @@ -212,5 +212,4 @@ The default embedding model is `sentence-transformers/all-MiniLM-L6-v2` (downloa - `get_service_topology` — microservice-level summary aggregating `HTTP_CALLS` / `ASYNC_CALLS`. - Agentic routing layer (query classifier → vector / graph / both). -- Incremental Kuzu updates (per-changed-file). - Optional `codegraph_nodes` LanceDB table embedding symbol summaries so the graph itself is vector-searchable. diff --git a/build_ast_graph.py b/build_ast_graph.py index 247fc86b..a1115d9b 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -3967,6 +3967,16 @@ def main() -> int: if args.incremental: result = incremental_rebuild(root, kuzu_path, verbose=args.verbose) + # Emit result as JSON to stdout so CLI can parse the mode + import json + print(json.dumps({ + "mode": result.mode, + "files_changed": result.files_changed, + "files_added": result.files_added, + "files_removed": result.files_removed, + "dependents_reprocessed": result.dependents_reprocessed, + "elapsed_sec": result.elapsed_sec, + })) if args.verbose: _verbose_stderr_line(f"[graph] done · mode={result.mode} files_changed={result.files_changed} files_added={result.files_added} files_removed={result.files_removed} dependents={result.dependents_reprocessed} elapsed={result.elapsed_sec:.2f}s") return 0 diff --git a/docs/JAVA-CODEBASE-RAG-CLI.md b/docs/JAVA-CODEBASE-RAG-CLI.md index 80a971be..d95a62af 100644 --- a/docs/JAVA-CODEBASE-RAG-CLI.md +++ b/docs/JAVA-CODEBASE-RAG-CLI.md @@ -76,12 +76,21 @@ java-codebase-rag init --source-root /path/to/java/repo --index-dir /path/to/.ja ### `increment` -Runs cocoindex **catch-up** without a full Lance reprocess. **Does not** rebuild Kuzu. Every run prints a **multi-line stderr warning** that graph navigation may be stale until you run `reprocess` (see [`propose/completed/CLI-SCENARIOS-PROPOSE.md`](../propose/completed/CLI-SCENARIOS-PROPOSE.md) Appendix A for the contract). +Runs cocoindex **catch-up** and **incremental Kuzu graph update**. Only changed files and their single-hop dependents are re-parsed and re-written to the graph. Passes 5–6 (client/producer extraction and cross-service matching) run globally. Falls back to full `reprocess` if: +- No previous graph exists (first run) +- Graph schema is outdated (missing `source_file` on edges) +- Previous incremental run crashed (crash marker detected) +- Dependent expansion exceeds 50 files ```bash java-codebase-rag increment --source-root /path/to/java/repo --index-dir /path/to/.java-codebase-rag --quiet ``` +**Flags:** +- `--vectors-only` — runs only cocoindex catch-up; skips graph update and emits stale-graph warning. Use this when you want the old Lance-only behavior. + +**Migration note:** After upgrading, run `reprocess` once to ensure edge tables have `source_file` columns (ontology version 17+). + ### `reprocess` **Default (no extra flags):** full **Lance** reprocess (cocoindex `--full-reprocess`) then full **Kuzu** rebuild via `build_ast_graph.py`, in that order. This remains the recommended **coherence** operation when both stores might be out of date. diff --git a/java_codebase_rag/cli.py b/java_codebase_rag/cli.py index a3281e71..25cb6e31 100644 --- a/java_codebase_rag/cli.py +++ b/java_codebase_rag/cli.py @@ -21,7 +21,7 @@ index_dir_has_existing_artifacts, resolve_operator_config, ) -from java_codebase_rag.pipeline import clip, run_build_ast_graph, run_cocoindex_drop, run_cocoindex_update +from java_codebase_rag.pipeline import clip, run_build_ast_graph, run_cocoindex_drop, run_cocoindex_update, run_incremental_graph from java_ontology import VALID_UNRESOLVED_CALL_REASONS KUZU_INCREMENTAL_TRACKING_ISSUE_URL = "https://github.com/HumanBean17/java-codebase-rag/issues/73" @@ -310,7 +310,11 @@ def _cmd_increment(args: argparse.Namespace) -> int: cfg = _resolved_from_ns(args) _startup_hints(cfg) cfg.apply_to_os_environ() - _emit_increment_kuzu_warning() + + # Check for --vectors-only flag + vectors_only = bool(getattr(args, "vectors_only", False)) + if vectors_only: + _emit_increment_kuzu_warning() def work() -> int: env = cfg.subprocess_env() @@ -332,7 +336,51 @@ def work() -> int: } ) return 1 - _emit({"success": True, "message": "increment completed (Lance only; graph may be stale — see stderr)"}) + + # If --vectors-only is set, skip graph update + if vectors_only: + _emit({"success": True, "message": "increment completed (Lance only; graph may be stale — see stderr)"}) + return 0 + + # Run incremental graph update + g = run_incremental_graph( + source_root=cfg.source_root, + kuzu_path=cfg.kuzu_path, + verbose=bool(args.verbose), + quiet=bool(args.quiet), + env=env, + ) + + # Check if incremental fell back to full rebuild + if g.returncode == 0 and g.stdout: + # Parse stdout to check for full_fallback mode + # The incremental_rebuild function returns a JSON payload with mode field + try: + import json + result = json.loads(g.stdout.strip()) + if result.get("mode") == "full_fallback": + print( + "[increment] fell back to full graph rebuild — this is normal after schema changes or first run", + file=sys.stderr, + flush=True, + ) + except (json.JSONDecodeError, ValueError): + # If parsing fails, continue silently + pass + + if g.returncode != 0: + _emit( + { + "success": False, + "exit_code": g.returncode, + "stdout": clip(g.stdout, 4000), + "stderr": clip(g.stderr, 4000), + "message": f"graph builder exit {g.returncode}", + } + ) + return 1 + + _emit({"success": True, "message": "increment completed (Lance + graph updated)"}) return 0 return _run_with_pipeline_progress("increment", cfg, quiet=bool(args.quiet), work=work) @@ -627,7 +675,7 @@ def build_parser() -> argparse.ArgumentParser: "--quiet suppresses that stream; stdout remains the machine-readable payload.\n\n" "Lifecycle (manage the index):\n" " init Create a fresh index from a Java repository.\n" - " increment Pick up changes since the last index update (Lance only).\n" + " increment Pick up changes since the last index update (Lance + graph).\n" " reprocess Full vector + graph rebuild (default); optional --vectors-only / --graph-only.\n" " erase Delete the index from disk.\n\n" "Introspection (inspect the index):\n" @@ -662,10 +710,15 @@ def build_parser() -> argparse.ArgumentParser: increment = subparsers.add_parser( "increment", help="Pick up changes since the last index update.", - description="Runs cocoindex catch-up (no full reprocess). Does not rebuild Kuzu; see stderr warning.", + description="Runs cocoindex catch-up and incremental Kuzu graph update. Use --vectors-only to skip graph update.", ) _add_index_embedding_flags(increment) _add_verbosity_flags(increment) + increment.add_argument( + "--vectors-only", + action="store_true", + help="Run only cocoindex catch-up (Lance); skip graph update.", + ) increment.set_defaults(handler=_cmd_increment) reprocess = subparsers.add_parser( diff --git a/tests/test_java_codebase_rag_cli.py b/tests/test_java_codebase_rag_cli.py index 1d67cb77..7e8c5920 100644 --- a/tests/test_java_codebase_rag_cli.py +++ b/tests/test_java_codebase_rag_cli.py @@ -324,6 +324,11 @@ def test_refresh_hidden_alias_deprecates_on_stderr(tmp_path: Path) -> None: def test_increment_emits_kuzu_stale_warning_block( corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: + """Test that increment does NOT emit stale warning by default (new behavior). + + The stale warning is now only emitted with --vectors-only flag. + This test verifies the new default behavior where graph IS updated. + """ idx = tmp_path / "idx_inc" idx.mkdir() monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) @@ -339,9 +344,10 @@ def test_increment_emits_kuzu_stale_warning_block( ) assert rc == 0 err = buf.getvalue() - assert "WARNING: AST graph (Kuzu) incremental rebuild is not yet implemented." in err - assert "java-codebase-rag reprocess" in err - assert cli_mod.KUZU_INCREMENTAL_TRACKING_ISSUE_URL in err + # Should NOT contain old stale warning + assert "WARNING: AST graph (Kuzu) incremental rebuild is not yet implemented." not in err + assert "java-codebase-rag reprocess" not in err + assert cli_mod.KUZU_INCREMENTAL_TRACKING_ISSUE_URL not in err def test_meta_reports_embedding_setting_source(corpus_root: Path, kuzu_db_path: Path) -> None: @@ -392,6 +398,10 @@ def test_init_after_erase_succeeds(corpus_root: Path, tmp_path: Path) -> None: def test_cli_lifecycle_round_trip_init_increment_meta_erase( corpus_root: Path, tmp_path: Path, ) -> None: + """Test lifecycle round-trip: init -> increment -> meta -> erase. + + This test verifies that increment updates both Lance and graph (new behavior). + """ idx = tmp_path / "rt_idx" env = os.environ.copy() env["JAVA_CODEBASE_RAG_INDEX_DIR"] = str(idx) @@ -411,7 +421,10 @@ def test_cli_lifecycle_round_trip_init_increment_meta_erase( env=env, ) assert inc.returncode == 0, inc.stdout + inc.stderr - assert "WARNING: AST graph" in inc.stderr + # Should NOT contain old stale warning (new behavior) + assert "WARNING: AST graph" not in inc.stderr + # Should contain new success message + assert "Lance + graph updated" in inc.stdout meta = _run_cli(["meta", "--source-root", str(corpus_root), "--index-dir", str(idx)], env=env) assert meta.returncode == 0, meta.stderr er = _run_cli( @@ -421,6 +434,116 @@ def test_cli_lifecycle_round_trip_init_increment_meta_erase( assert er.returncode == 0, er.stderr +@pytest.mark.skipif(not _cocoindex_available(), reason="cocoindex not installed in venv") +def test_increment_runs_graph_update( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment updates graph by default (no --vectors-only).""" + idx = tmp_path / "idx_graph_update" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + init_rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert init_rc == 0 + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["increment", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert rc == 0 + # Should NOT contain stale warning + err = buf.getvalue() + assert "WARNING: AST graph" not in err + + +def test_increment_vectors_only_skips_graph( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment --vectors-only emits stale warning and skips graph update.""" + idx = tmp_path / "idx_vectors_only" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + init_rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert init_rc == 0 + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["increment", "--vectors-only", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert rc == 0 + err = buf.getvalue() + # Should contain stale warning + assert "WARNING: AST graph (Kuzu) incremental rebuild is not yet implemented." in err + assert "java-codebase-rag reprocess" in err + assert cli_mod.KUZU_INCREMENTAL_TRACKING_ISSUE_URL in err + + +def test_increment_cli_help_mentions_vectors_only( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment --help mentions --vectors-only flag.""" + buf = io.StringIO() + with contextlib.redirect_stdout(buf): + rc = cli_mod.main(["increment", "--help"]) + assert rc == 0 + help_text = buf.getvalue() + assert "--vectors-only" in help_text + assert "Run only cocoindex catch-up" in help_text + + +def test_increment_cli_help_no_longer_says_lance_only( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment --help no longer says 'Lance only'.""" + buf = io.StringIO() + with contextlib.redirect_stdout(buf): + rc = cli_mod.main(["increment", "--help"]) + assert rc == 0 + help_text = buf.getvalue() + # Should NOT say "Lance only" in help + assert "Lance only" not in help_text + # Should say it updates graph + assert "graph" in help_text.lower() + + +def test_increment_first_run_falls_back_to_full( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment on fresh index (no graph hashes) falls back to full rebuild.""" + idx = tmp_path / "idx_first_run" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + # Run init first + init_rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert init_rc == 0 + # Remove hash file to simulate first run after upgrade + hash_file = idx / ".graph_hashes.json" + if hash_file.exists(): + hash_file.unlink() + buf = io.StringIO() + buf_err = io.StringIO() + with contextlib.redirect_stdout(buf): + with contextlib.redirect_stderr(buf_err): + rc = cli_mod.main( + ["increment", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert rc == 0 + err = buf_err.getvalue() + # Should fall back to full rebuild gracefully + assert "fell back to full graph rebuild" in err + # Should still succeed + assert "increment completed (Lance + graph updated)" in buf.getvalue() + + + @pytest.mark.skipif(not _cocoindex_available(), reason="cocoindex not installed in venv") def test_increment_updates_lance_after_touch_java_file(corpus_root: Path, tmp_path: Path) -> None: import lancedb # noqa: PLC0415 From 1121be05a004f1ff7ca959ba98e3fb1227e88661 Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 7 Jun 2026 18:06:59 +0300 Subject: [PATCH 6/9] fix: correct Cypher parameter names and close db on fallback in incremental rebuild - Fix _write_clients_producers_and_calls: use correct parameter names ($sid/$cid/$pid/$rid) matching Cypher templates, add missing fields (strategy, method_call, raw_uri, match, direction, raw_topic) - Use dict lookup instead of O(n) list scan for client/producer source_file - Use keyword args for MemberEntry placeholder construction - Delete db+conn before fallback to avoid file lock - Remove redundant import json inside main() - Remove stale duplicate comment in pass1_parse Co-Authored-By: Claude Opus 4.7 --- build_ast_graph.py | 47 ++++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/build_ast_graph.py b/build_ast_graph.py index a1115d9b..289e0d99 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -983,8 +983,6 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: rel = p.as_posix() if scope_files is not None and rel not in scope_files: continue - # Skip files not in scope (if scope is provided) - # (rel is computed above before the scope check) n_files += 1 try: content = p.read_bytes() @@ -3615,12 +3613,12 @@ def incremental_rebuild( if version < 17: if verbose: _verbose_stderr_line(f"[increment] ontology version {version} < 17; falling back to full rebuild") - conn.close() + del conn, db return _fallback_to_full(source_root, kuzu_path, verbose, t_start) except Exception as e: if verbose: _verbose_stderr_line(f"[increment] failed to read ontology version: {e}; falling back to full rebuild") - conn.close() + del conn, db return _fallback_to_full(source_root, kuzu_path, verbose, t_start) index_dir = kuzu_path.parent @@ -3851,42 +3849,57 @@ def _write_clients_producers_and_calls(conn: kuzu.Connection, tables: GraphTable "topic": row.topic, }) + client_by_id = {c.id: c for c in tables.client_rows} + producer_by_id = {p.id: p for p in tables.producer_rows} + # Write declares_client edges for row in tables.declares_client_rows: - source_file = member_by_id.get(row.symbol_id, MemberEntry("", None, "", "", "", "", "")).file_path + source_file = member_by_id.get(row.symbol_id, MemberEntry(kind="", decl=None, parent_id="", parent_fqn="", file_path="", module="", microservice="")).file_path conn.execute(_CREATE_DECLARES_CLIENT, { - "src": row.symbol_id, - "dst": row.client_id, + "sid": row.symbol_id, + "cid": row.client_id, "source_file": source_file, "confidence": row.confidence, + "strategy": row.strategy, }) # Write declares_producer edges for row in tables.declares_producer_rows: - source_file = member_by_id.get(row.symbol_id, MemberEntry("", None, "", "", "", "", "")).file_path + source_file = member_by_id.get(row.symbol_id, MemberEntry(kind="", decl=None, parent_id="", parent_fqn="", file_path="", module="", microservice="")).file_path conn.execute(_CREATE_DECLARES_PRODUCER, { - "src": row.symbol_id, - "dst": row.producer_id, + "sid": row.symbol_id, + "pid": row.producer_id, "source_file": source_file, "confidence": row.confidence, + "strategy": row.strategy, }) # Write HTTP_CALLS edges for row in tables.http_call_rows: + client = client_by_id.get(row.client_id) conn.execute(_CREATE_HTTP_CALL, { - "src": row.client_id, - "dst": row.route_id, + "cid": row.client_id, + "rid": row.route_id, + "source_file": client.filename if client else "", "confidence": row.confidence, - "source_file": tables.client_rows[[c.id for c in tables.client_rows].index(row.client_id)].filename if any(c.id == row.client_id for c in tables.client_rows) else "", + "strategy": row.strategy, + "method_call": row.method_call, + "raw_uri": row.raw_uri, + "match": row.match, }) # Write ASYNC_CALLS edges for row in tables.async_call_rows: + producer = producer_by_id.get(row.producer_id) conn.execute(_CREATE_ASYNC_CALL, { - "src": row.producer_id, - "dst": row.route_id, + "pid": row.producer_id, + "rid": row.route_id, + "source_file": producer.filename if producer else "", "confidence": row.confidence, - "source_file": tables.producer_rows[[p.id for p in tables.producer_rows].index(row.producer_id)].filename if any(p.id == row.producer_id for p in tables.producer_rows) else "", + "strategy": row.strategy, + "direction": row.direction, + "raw_topic": row.raw_topic, + "match": row.match, }) @@ -3967,8 +3980,6 @@ def main() -> int: if args.incremental: result = incremental_rebuild(root, kuzu_path, verbose=args.verbose) - # Emit result as JSON to stdout so CLI can parse the mode - import json print(json.dumps({ "mode": result.mode, "files_changed": result.files_changed, From a4953e752d5586019a3779bf207d4e58f1439396 Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 7 Jun 2026 21:21:30 +0300 Subject: [PATCH 7/9] fix: critical runtime bugs in incremental rebuild found by code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix _write_clients_producers_and_calls: use asdict(row) for Client/Producer nodes instead of manually constructed dicts with wrong field names (kind vs client_kind, target vs target_service, missing 10+ fields) - Fix _delete_file_scope: add ALL edge tables to Phase 1 deletion (was missing EXPOSES, DECLARES_CLIENT, DECLARES_PRODUCER, HTTP_CALLS, ASYNC_CALLS — would crash on any Spring codebase with controllers) - Use DETACH DELETE for Route/Client/Producer nodes as safety net - Fix N+1 query in dependent expansion: single IN-query instead of per-file Co-Authored-By: Claude Opus 4.7 --- build_ast_graph.py | 45 ++++++++++++++++++--------------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/build_ast_graph.py b/build_ast_graph.py index 289e0d99..20b2fd60 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -740,7 +740,11 @@ def _delete_file_scope(conn: kuzu.Connection, filenames: set[str]) -> None: # This avoids ordering issues where file A has an edge from file B # pointing into it; if we delete A's nodes before B's edges, Kuzu # raises "has connected edges" errors. - edge_tables = ["EXTENDS", "IMPLEMENTS", "INJECTS", "CALLS", "DECLARES", "OVERRIDES", "UNRESOLVED_AT"] + edge_tables = [ + "EXTENDS", "IMPLEMENTS", "INJECTS", "CALLS", "DECLARES", "OVERRIDES", + "UNRESOLVED_AT", "EXPOSES", "DECLARES_CLIENT", "DECLARES_PRODUCER", + "HTTP_CALLS", "ASYNC_CALLS", + ] for edge_type in edge_tables: query = f""" MATCH (src)-[e:{edge_type}]->(dst) @@ -779,9 +783,10 @@ def _delete_file_scope(conn: kuzu.Connection, filenames: set[str]) -> None: conn.execute(delete_symbols_query, {"filenames": filename_list}) # Phase 4: Delete Route, Client, Producer nodes. + # Use DETACH DELETE as a safety net in case any edges were missed in Phase 1. for label in ["Route", "Client", "Producer"]: conn.execute( - f"MATCH (n:{label}) WHERE n.filename IN $filenames DELETE n", + f"MATCH (n:{label}) WHERE n.filename IN $filenames DETACH DELETE n", {"filenames": filename_list}, ) @@ -3660,14 +3665,15 @@ def incremental_rebuild( try: # Step 3: Dependent expansion - # Collect node IDs for changed files + # Collect node IDs for changed files (single query instead of N+1) changed_node_ids: set[str] = set() - for filename in changed_files: - query = "MATCH (s:Symbol) WHERE s.filename = $filename RETURN s.id" - result = conn.execute(query, {"filename": filename}) - while result.has_next(): - row = result.get_next() - changed_node_ids.add(row[0]) # Kuzu returns list, not dict + result = conn.execute( + "MATCH (s:Symbol) WHERE s.filename IN $filenames RETURN s.id", + {"filenames": list(changed_files)}, + ) + while result.has_next(): + row = result.get_next() + changed_node_ids.add(row[0]) # Find dependents dependent_files = _find_dependents(conn, changed_node_ids) @@ -3828,26 +3834,11 @@ def _write_clients_producers_and_calls(conn: kuzu.Connection, tables: GraphTable # Build node_id lookup for members and types member_by_id = {m.node_id: m for m in tables.members} - # Write clients + # Write clients and producers using asdict (same pattern as _write_routes_and_exposes) for row in tables.client_rows: - conn.execute(_CREATE_CLIENT, { - "id": row.id, - "symbol_id": row.symbol_id, - "filename": row.filename, - "kind": row.kind, - "target": row.target, - "target_type": row.target_type, - }) - - # Write producers + conn.execute(_CREATE_CLIENT, asdict(row)) for row in tables.producer_rows: - conn.execute(_CREATE_PRODUCER, { - "id": row.id, - "symbol_id": row.symbol_id, - "filename": row.filename, - "kind": row.kind, - "topic": row.topic, - }) + conn.execute(_CREATE_PRODUCER, asdict(row)) client_by_id = {c.id: c for c in tables.client_rows} producer_by_id = {p.id: p for p in tables.producer_rows} From 856f18b14a558a60edc8f8b022e5969f296d59c8 Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 7 Jun 2026 21:48:52 +0300 Subject: [PATCH 8/9] refactor: deduplicate incremental rebuild code per code review - Merge _load_existing_types/_load_existing_types_filtered into single function with optional exclude_files parameter (same for members) - Simplify _find_dependents: loop over edge type strings instead of six identical if/elif branches - Factor _write_nodes and _write_nodes_merge to shared _write_nodes_impl accepting the query template as parameter (~70 lines deduplicated) - Extract _build_file_by_node_id and share between _write_edges and _write_routes_and_exposes (was built twice independently) - Extract _init_hash_tracker helper for duplicated hash-init logic in _fallback_to_full and no-DB branch of incremental_rebuild - Add warning log in FileHashTracker.save() instead of silent OSError - Update AGENTS.md ontology_version references from 15 to 17 Co-Authored-By: Claude Opus 4.7 --- AGENTS.md | 4 +- build_ast_graph.py | 385 +++++++++++++-------------------------------- 2 files changed, 108 insertions(+), 281 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 89f6d824..d85ad923 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -43,7 +43,7 @@ when needed. variables, full `.java-codebase-rag.yml` reference, **graph layer** (node kinds, edges, capabilities, ranking, "Re-index required" callouts), brownfield overrides, ignore patterns. The current - `ontology_version` is **15** (`EDGE_SCHEMA` in `java_ontology.py`; + `ontology_version` is **17** (`EDGE_SCHEMA` in `java_ontology.py`; material `OVERRIDES` Symbol→Symbol edges: subtype instance method → supertype declaration with matching `signature`, one `IMPLEMENTS`/`EXTENDS` hop; valid `neighbors` `EdgeType`). @@ -190,7 +190,7 @@ template): `VALID_RESOLVE_REASONS`, `VALID_UNRESOLVED_CALL_REASONS`. - Schema changes that affect the Lance index or Kuzu graph need a matching update to the README "Re-index required" callout. Bump - `ontology_version` when enrichment semantics change (currently **15**). + `ontology_version` when enrichment semantics change (currently **17**). - Brownfield is a first-class surface: any new auto-detection (route, role, capability, http client, async producer) must compose with the matching `BrownfieldOverrides` layer. Last writer wins (outermost layer diff --git a/build_ast_graph.py b/build_ast_graph.py index 20b2fd60..8b342f98 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -436,9 +436,9 @@ def save(self) -> None: with open(tmp_path, "w", encoding="utf-8") as f: json.dump(self._hashes, f, sort_keys=True) os.replace(tmp_path, self._path) - except OSError: + except OSError as e: # Fail gracefully; next run will treat as missing and rebuild. - pass + log.warning("Failed to save hash file %s: %s; next run will rebuild from scratch", self._path, e) def detect_changes(self, source_root: Path, ignore: LayeredIgnore) -> tuple[set[str], set[str], set[str]]: """Return (added, changed, removed) sets of relative POSIX paths.""" @@ -496,113 +496,34 @@ def _hash_file(abs_path: Path) -> str: # ---------- incremental rebuild helpers ---------- -def _load_existing_types(conn: kuzu.Connection, tables: GraphTables) -> None: - """Load type entries from existing Kuzu graph into tables for cross-file resolution.""" - query = """ - MATCH (s:Symbol) - WHERE s.kind IN ['class', 'interface', 'enum', 'annotation', 'record'] - RETURN s.kind, s.fqn, s.name, s.filename, s.module, s.microservice, s.id - """ - result = conn.execute(query) - while result.has_next(): - row = result.get_next() - # Kuzu returns list, access by index - # Columns: kind, fqn, name, filename, module, microservice, id - kind = row[0] - fqn = row[1] - name = row[2] - filename = row[3] - module = row[4] if len(row) > 4 else "" - microservice = row[5] if len(row) > 5 else "" - node_id = row[6] if len(row) > 6 else "" - - # Create a minimal TypeDecl (full details not needed for cross-file resolution) - decl = TypeDecl(name, kind, fqn) +def _load_existing_types(conn: kuzu.Connection, tables: GraphTables, exclude_files: set[str] | None = None) -> None: + """Load type entries from existing Kuzu graph into tables for cross-file resolution. - package = fqn[: -(len(name) + 1)] if fqn.endswith("." + name) else "" - - entry = TypeIndexEntry( - decl=decl, - file_path=filename, - module=module, - microservice=microservice, - package=package, - outer_fqn=None, # Simplified: assume top-level for loading - node_id=node_id, - ) - tables.types[fqn] = entry - tables.by_simple_name.setdefault(name, []).append(entry) - tables.by_package.setdefault(package, []).append(entry) - - -def _load_existing_members(conn: kuzu.Connection, tables: GraphTables) -> None: - """Load all member entries from existing Kuzu graph into tables.members.""" - query = """ - MATCH (s:Symbol) - WHERE s.kind IN ['method', 'constructor'] - RETURN s.kind, s.name, s.filename, s.signature, s.parent_id, s.fqn, s.id + When exclude_files is provided, only load types from files NOT in the set. """ - result = conn.execute(query) - while result.has_next(): - row = result.get_next() - # Kuzu returns list, access by index - # Columns: kind, name, filename, signature, parent_id, fqn, id - kind = row[0] - name = row[1] - filename = row[2] - signature = row[3] if len(row) > 3 else "" - parent_id = row[4] if len(row) > 4 else "" - fqn = row[5] if len(row) > 5 else "" - node_id = row[6] if len(row) > 6 else "" - - # Extract parent_fqn from method's fqn (format: "pkg.Type#method(params)") - parent_fqn = fqn.split("#")[0] if "#" in fqn else "" - - # Create a minimal MethodDecl (full details not needed for resolution) - # MethodDecl(name, return_type, is_constructor=False) - decl = MethodDecl(name, "", kind == "constructor") - decl.signature = signature - - tables.members.append(MemberEntry( - kind=kind, - decl=decl, - parent_id=parent_id, - parent_fqn=parent_fqn, - file_path=filename, - module="", # Not needed for resolution - microservice="", # Not needed for resolution - node_id=node_id, - )) - - -def _load_existing_types_filtered(conn: kuzu.Connection, tables: GraphTables, exclude_files: set[str]) -> None: - """Load type entries from existing Kuzu graph, excluding specified files.""" - if not exclude_files: - # If no files to exclude, don't load anything (all files are being reprocessed) + if exclude_files is not None and not exclude_files: return - query = """ + where = "WHERE s.kind IN ['class', 'interface', 'enum', 'annotation', 'record']" + params: dict = {} + if exclude_files: + where += "\n AND NOT (s.filename IN $exclude_files)" + params["exclude_files"] = list(exclude_files) + + query = f""" MATCH (s:Symbol) - WHERE s.kind IN ['class', 'interface', 'enum', 'annotation', 'record'] - AND NOT (s.filename IN $exclude_files) + {where} RETURN s.kind, s.fqn, s.name, s.filename, s.module, s.microservice, s.id """ - result = conn.execute(query, {"exclude_files": list(exclude_files)}) + result = conn.execute(query, params) while result.has_next(): row = result.get_next() - # Kuzu returns list, access by index - # Columns: kind, fqn, name, filename, module, microservice, id - kind = row[0] - fqn = row[1] - name = row[2] - filename = row[3] + kind, fqn, name, filename = row[0], row[1], row[2], row[3] module = row[4] if len(row) > 4 else "" microservice = row[5] if len(row) > 5 else "" node_id = row[6] if len(row) > 6 else "" - # Create a minimal TypeDecl (full details not needed for cross-file resolution) decl = TypeDecl(name, kind, fqn) - package = fqn[: -(len(name) + 1)] if fqn.endswith("." + name) else "" entry = TypeIndexEntry( @@ -611,7 +532,7 @@ def _load_existing_types_filtered(conn: kuzu.Connection, tables: GraphTables, ex module=module, microservice=microservice, package=package, - outer_fqn=None, # Simplified: assume top-level for loading + outer_fqn=None, node_id=node_id, ) tables.types[fqn] = entry @@ -619,36 +540,36 @@ def _load_existing_types_filtered(conn: kuzu.Connection, tables: GraphTables, ex tables.by_package.setdefault(package, []).append(entry) -def _load_existing_members_filtered(conn: kuzu.Connection, tables: GraphTables, exclude_files: set[str]) -> None: - """Load member entries from existing Kuzu graph, excluding specified files.""" - if not exclude_files: - # If no files to exclude, don't load anything (all files are being reprocessed) +def _load_existing_members(conn: kuzu.Connection, tables: GraphTables, exclude_files: set[str] | None = None) -> None: + """Load member entries from existing Kuzu graph into tables.members. + + When exclude_files is provided, only load members from files NOT in the set. + """ + if exclude_files is not None and not exclude_files: return - query = """ + where = "WHERE s.kind IN ['method', 'constructor']" + params: dict = {} + if exclude_files: + where += "\n AND NOT (s.filename IN $exclude_files)" + params["exclude_files"] = list(exclude_files) + + query = f""" MATCH (s:Symbol) - WHERE s.kind IN ['method', 'constructor'] - AND NOT (s.filename IN $exclude_files) + {where} RETURN s.kind, s.name, s.filename, s.signature, s.parent_id, s.fqn, s.id """ - result = conn.execute(query, {"exclude_files": list(exclude_files)}) + result = conn.execute(query, params) while result.has_next(): row = result.get_next() - # Kuzu returns list, access by index - # Columns: kind, name, filename, signature, parent_id, fqn, id - kind = row[0] - name = row[1] - filename = row[2] + kind, name, filename = row[0], row[1], row[2] signature = row[3] if len(row) > 3 else "" parent_id = row[4] if len(row) > 4 else "" fqn = row[5] if len(row) > 5 else "" node_id = row[6] if len(row) > 6 else "" - # Extract parent_fqn from method's fqn (format: "pkg.Type#method(params)") parent_fqn = fqn.split("#")[0] if "#" in fqn else "" - # Create a minimal MethodDecl (full details not needed for resolution) - # MethodDecl(name, return_type, is_constructor=False) decl = MethodDecl(name, "", kind == "constructor") decl.signature = signature @@ -658,8 +579,8 @@ def _load_existing_members_filtered(conn: kuzu.Connection, tables: GraphTables, parent_id=parent_id, parent_fqn=parent_fqn, file_path=filename, - module="", # Not needed for resolution - microservice="", # Not needed for resolution + module="", + microservice="", node_id=node_id, )) @@ -670,53 +591,18 @@ def _find_dependents(conn: kuzu.Connection, changed_node_ids: set[str]) -> set[s # Query each Symbol-to-Symbol edge table for incoming edges edge_types = ["EXTENDS", "IMPLEMENTS", "INJECTS", "CALLS", "DECLARES", "OVERRIDES"] + params = {"changed_ids": list(changed_node_ids)} for edge_type in edge_types: - # Use label(e) = 'TYPE' pattern (not label(e) IN $list due to Kuzu pitfalls) - # We need to build a query with OR conditions - if edge_type == "EXTENDS": - query = """ - MATCH (src:Symbol)-[e:EXTENDS]->(dst:Symbol) - WHERE dst.id IN $changed_ids - RETURN DISTINCT src.filename - """ - elif edge_type == "IMPLEMENTS": - query = """ - MATCH (src:Symbol)-[e:IMPLEMENTS]->(dst:Symbol) - WHERE dst.id IN $changed_ids - RETURN DISTINCT src.filename - """ - elif edge_type == "INJECTS": - query = """ - MATCH (src:Symbol)-[e:INJECTS]->(dst:Symbol) - WHERE dst.id IN $changed_ids - RETURN DISTINCT src.filename - """ - elif edge_type == "CALLS": - query = """ - MATCH (src:Symbol)-[e:CALLS]->(dst:Symbol) - WHERE dst.id IN $changed_ids - RETURN DISTINCT src.filename - """ - elif edge_type == "DECLARES": - query = """ - MATCH (src:Symbol)-[e:DECLARES]->(dst:Symbol) - WHERE dst.id IN $changed_ids - RETURN DISTINCT src.filename - """ - elif edge_type == "OVERRIDES": - query = """ - MATCH (src:Symbol)-[e:OVERRIDES]->(dst:Symbol) - WHERE dst.id IN $changed_ids - RETURN DISTINCT src.filename - """ - else: - continue - - result = conn.execute(query, {"changed_ids": list(changed_node_ids)}) + query = f""" + MATCH (src:Symbol)-[e:{edge_type}]->(dst:Symbol) + WHERE dst.id IN $changed_ids + RETURN DISTINCT src.filename + """ + result = conn.execute(query, params) while result.has_next(): row = result.get_next() - filename = row[0] # Kuzu returns list, not dict + filename = row[0] if filename: # Skip phantom nodes (filename = "") dependent_files.add(filename) @@ -812,13 +698,14 @@ def _scoped_write(conn: kuzu.Connection, tables: GraphTables, *, project_root: P _verbose_stderr_line(f"[graph] scoped write · nodes written in {elapsed:.2f}s") t1 = time.time() - _write_edges(conn, tables) + _fbyid = _build_file_by_node_id(tables) + _write_edges(conn, tables, _fbyid) elapsed = time.time() - t1 if elapsed > 0.1: _verbose_stderr_line(f"[graph] scoped write · edges written in {elapsed:.2f}s") t2 = time.time() - _write_routes_and_exposes(conn, tables) + _write_routes_and_exposes(conn, tables, _fbyid) elapsed = time.time() - t2 if elapsed > 0.1: _verbose_stderr_line(f"[graph] scoped write · routes/exposes written in {elapsed:.2f}s") @@ -831,68 +718,8 @@ def _write_nodes_merge( project_root: Path, meta_chain: dict[str, frozenset[str]] | None, ) -> None: - """Write nodes to existing Kuzu database using MERGE to handle existing nodes. - - Like _write_nodes but uses MERGE instead of CREATE to handle cases where - nodes might already exist in the database. - """ - overrides = load_brownfield_overrides(project_root) - try: - prs = str(project_root.resolve()) - except OSError: - prs = str(project_root) - tables.cross_service_resolution = _load_config_cross_service_resolution(prs) - mch = meta_chain - # packages - for pkg, pid in tables.packages.items(): - conn.execute(_MERGE_SYMBOL, _node_row( - id=pid, kind="package", name=pkg.rsplit(".", 1)[-1], fqn=pkg, package=pkg, - )) - # files - for path, fid in tables.files.items(): - conn.execute(_MERGE_SYMBOL, _node_row( - id=fid, kind="file", name=Path(path).name, fqn=path, filename=path, - )) - # types - for entry in tables.types.values(): - d = entry.decl - role, capabilities = resolve_role_and_capabilities( - d, - overrides=overrides, - meta_chain=mch, - ) - tables.type_role_by_node_id[entry.node_id] = role - conn.execute(_MERGE_SYMBOL, _node_row( - id=entry.node_id, kind=d.kind, name=d.name, fqn=d.fqn, - package=entry.package, - module=entry.module, microservice=entry.microservice, - filename=entry.file_path, - start_line=d.start_line, end_line=d.end_line, - start_byte=d.start_byte, end_byte=d.end_byte, - modifiers=list(d.modifiers), - annotations=[a.name for a in d.annotations], - capabilities=capabilities, - role=role, - signature="", - parent_id=tables.types[entry.outer_fqn].node_id if entry.outer_fqn and entry.outer_fqn in tables.types else "", - )) - # members (methods / constructors) - for m in tables.members: - conn.execute(_MERGE_SYMBOL, _node_row( - id=m.node_id, kind=m.kind, name=m.decl.name, - fqn=f"{m.parent_fqn}#{m.decl.signature}", - package=tables.types[m.parent_fqn].package if m.parent_fqn in tables.types else "", - module=m.module, microservice=m.microservice, - filename=m.file_path, - start_line=m.decl.start_line, end_line=m.decl.end_line, - start_byte=m.decl.start_byte, end_byte=m.decl.end_byte, - modifiers=list(m.decl.modifiers), - annotations=[a.name for a in m.decl.annotations], - signature=m.decl.signature, parent_id=m.parent_id, - )) - # phantoms - for pid, row in tables.phantoms.items(): - conn.execute(_MERGE_SYMBOL, row) + """Write nodes to existing Kuzu database using MERGE to handle existing nodes.""" + _write_nodes_impl(conn, tables, project_root=project_root, meta_chain=meta_chain, symbol_query=_MERGE_SYMBOL) # ---------- file walk (see `path_filtering.iter_java_source_files`) ---------- @@ -3054,12 +2881,13 @@ def _node_row(**kwargs) -> dict: ) -def _write_nodes( +def _write_nodes_impl( conn: kuzu.Connection, tables: GraphTables, *, project_root: Path, meta_chain: dict[str, frozenset[str]] | None, + symbol_query: str, ) -> None: overrides = load_brownfield_overrides(project_root) try: @@ -3070,12 +2898,12 @@ def _write_nodes( mch = meta_chain # packages for pkg, pid in tables.packages.items(): - conn.execute(_CREATE_SYMBOL, _node_row( + conn.execute(symbol_query, _node_row( id=pid, kind="package", name=pkg.rsplit(".", 1)[-1], fqn=pkg, package=pkg, )) # files for path, fid in tables.files.items(): - conn.execute(_CREATE_SYMBOL, _node_row( + conn.execute(symbol_query, _node_row( id=fid, kind="file", name=Path(path).name, fqn=path, filename=path, )) # types @@ -3087,7 +2915,7 @@ def _write_nodes( meta_chain=mch, ) tables.type_role_by_node_id[entry.node_id] = role - conn.execute(_CREATE_SYMBOL, _node_row( + conn.execute(symbol_query, _node_row( id=entry.node_id, kind=d.kind, name=d.name, fqn=d.fqn, package=entry.package, module=entry.module, microservice=entry.microservice, @@ -3103,7 +2931,7 @@ def _write_nodes( )) # members (methods / constructors) for m in tables.members: - conn.execute(_CREATE_SYMBOL, _node_row( + conn.execute(symbol_query, _node_row( id=m.node_id, kind=m.kind, name=m.decl.name, fqn=f"{m.parent_fqn}#{m.decl.signature}", package=tables.types[m.parent_fqn].package if m.parent_fqn in tables.types else "", @@ -3117,7 +2945,17 @@ def _write_nodes( )) # phantoms for pid, row in tables.phantoms.items(): - conn.execute(_CREATE_SYMBOL, row) + conn.execute(symbol_query, row) + + +def _write_nodes( + conn: kuzu.Connection, + tables: GraphTables, + *, + project_root: Path, + meta_chain: dict[str, frozenset[str]] | None, +) -> None: + _write_nodes_impl(conn, tables, project_root=project_root, meta_chain=meta_chain, symbol_query=_CREATE_SYMBOL) _CREATE_EXT = ( @@ -3248,13 +3086,20 @@ def _populate_overrides_rows(tables: GraphTables) -> None: ] -def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: - # Build node_id -> file_path lookup for source_file resolution. - _file_by_node_id: dict[str, str] = {} +def _build_file_by_node_id(tables: GraphTables) -> dict[str, str]: + """Build node_id -> file_path lookup for source_file resolution.""" + lookup: dict[str, str] = {} for entry in tables.types.values(): - _file_by_node_id[entry.node_id] = entry.file_path + lookup[entry.node_id] = entry.file_path for m in tables.members: - _file_by_node_id[m.node_id] = m.file_path + lookup[m.node_id] = m.file_path + return lookup + + +def _write_edges(conn: kuzu.Connection, tables: GraphTables, _file_by_node_id: dict[str, str] | None = None) -> None: + # Build node_id -> file_path lookup for source_file resolution. + if _file_by_node_id is None: + _file_by_node_id = _build_file_by_node_id(tables) for r in tables.extends_rows: conn.execute(_CREATE_EXT, { @@ -3345,13 +3190,10 @@ def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: }) -def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> None: +def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables, _file_by_node_id: dict[str, str] | None = None) -> None: # Build node_id -> file_path lookup for source_file resolution (for Symbol sources). - _file_by_node_id: dict[str, str] = {} - for entry in tables.types.values(): - _file_by_node_id[entry.node_id] = entry.file_path - for m in tables.members: - _file_by_node_id[m.node_id] = m.file_path + if _file_by_node_id is None: + _file_by_node_id = _build_file_by_node_id(tables) # Build client_id -> filename lookup for HTTP_CALLS source_file. _file_by_client_id: dict[str, str] = {row.id: row.filename for row in tables.client_rows} @@ -3577,30 +3419,12 @@ def incremental_rebuild( pass6_match_edges(tables, verbose=verbose) write_kuzu(kuzu_path, tables, source_root=source_root, verbose=verbose) - # Initialize hash tracker - index_dir = kuzu_path.parent - tracker = FileHashTracker(index_dir) - tracker.load() - ignore = LayeredIgnore(source_root) - all_files = set() - # Resolve source_root to handle symlinks - source_root_resolved = source_root.resolve() - for p in iter_java_source_files(source_root, ignore=ignore): - # Resolve the absolute path and compute relative path - p_resolved = p.resolve() - try: - rel_path = p_resolved.relative_to(source_root_resolved).as_posix() - except ValueError: - # Fallback to using the path as-is if it's not under source_root - rel_path = p.as_posix() - all_files.add(rel_path) - tracker.update(all_files, source_root) - tracker.save() + n_files = _init_hash_tracker(source_root, kuzu_path) return IncrementalResult( mode="full_fallback", files_changed=0, - files_added=len(all_files), + files_added=n_files, files_removed=0, dependents_reprocessed=0, elapsed_sec=time.time() - t_start, @@ -3707,8 +3531,8 @@ def incremental_rebuild( asts = pass1_parse(source_root, tables, verbose=verbose, scope_files=scope_files) # Load existing types and members for cross-file resolution (only from unchanged files) - _load_existing_types_filtered(conn, tables, exclude_files=scope_files) - _load_existing_members_filtered(conn, tables, exclude_files=scope_files) + _load_existing_types(conn, tables, exclude_files=scope_files) + _load_existing_members(conn, tables, exclude_files=scope_files) pass2_edges(tables, asts, verbose=verbose) pass3_calls(tables, asts, verbose=verbose) @@ -3788,41 +3612,43 @@ def incremental_rebuild( return _fallback_to_full(source_root, kuzu_path, verbose, t_start) -def _fallback_to_full(source_root: Path, kuzu_path: Path, verbose: bool, t_start: float) -> IncrementalResult: - """Fallback to full rebuild.""" - tables = GraphTables() - asts = pass1_parse(source_root, tables, verbose=verbose) - pass2_edges(tables, asts, verbose=verbose) - pass3_calls(tables, asts, verbose=verbose) - pass4_routes(tables, asts, source_root=source_root, verbose=verbose) - pass5_imperative_edges(tables, asts, source_root=source_root, verbose=verbose) - pass6_match_edges(tables, verbose=verbose) - write_kuzu(kuzu_path, tables, source_root=source_root, verbose=verbose) - - # Initialize hash tracker +def _init_hash_tracker(source_root: Path, kuzu_path: Path) -> int: + """Initialize hash tracker for all Java files. Returns number of files hashed.""" index_dir = kuzu_path.parent tracker = FileHashTracker(index_dir) tracker.load() ignore = LayeredIgnore(source_root) - all_files = set() - # Resolve source_root to handle symlinks + all_files: set[str] = set() source_root_resolved = source_root.resolve() for p in iter_java_source_files(source_root, ignore=ignore): - # Resolve the absolute path and compute relative path p_resolved = p.resolve() try: rel_path = p_resolved.relative_to(source_root_resolved).as_posix() except ValueError: - # Fallback to using the path as-is if it's not under source_root rel_path = p.as_posix() all_files.add(rel_path) tracker.update(all_files, source_root) tracker.save() + return len(all_files) + + +def _fallback_to_full(source_root: Path, kuzu_path: Path, verbose: bool, t_start: float) -> IncrementalResult: + """Fallback to full rebuild.""" + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=verbose) + pass2_edges(tables, asts, verbose=verbose) + pass3_calls(tables, asts, verbose=verbose) + pass4_routes(tables, asts, source_root=source_root, verbose=verbose) + pass5_imperative_edges(tables, asts, source_root=source_root, verbose=verbose) + pass6_match_edges(tables, verbose=verbose) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=verbose) + + n_files = _init_hash_tracker(source_root, kuzu_path) return IncrementalResult( mode="full_fallback", files_changed=0, - files_added=len(all_files), + files_added=n_files, files_removed=0, dependents_reprocessed=0, elapsed_sec=time.time() - t_start, @@ -3926,11 +3752,12 @@ def write_kuzu( _populate_declares_rows(tables) _populate_overrides_rows(tables) t1 = time.time() - _write_edges(conn, tables) + _fbyid = _build_file_by_node_id(tables) + _write_edges(conn, tables, _fbyid) if verbose: _verbose_stderr_line(f"[graph] writing · edges written in {time.time() - t1:.2f}s") t2 = time.time() - _write_routes_and_exposes(conn, tables) + _write_routes_and_exposes(conn, tables, _fbyid) if verbose: _verbose_stderr_line(f"[graph] writing · routes/exposes written in {time.time() - t2:.2f}s") _write_meta(conn, tables, source_root) From f65eba895b7668f9fa94846ea4c1ab8f09bc6fbe Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 7 Jun 2026 22:02:27 +0300 Subject: [PATCH 9/9] fix: phantom route writes, duplicate members, and minor issues from review round 2 - Write phantom Route nodes in _write_clients_producers_and_calls using MERGE (pass5 creates phantom routes for cross-service calls that were never persisted to Kuzu, silently dropping HTTP_CALLS/ASYNC_CALLS edges) - Remove redundant _load_existing_members before full pass1_parse in global pass 5-6 step (was creating duplicate stub members alongside full entries) - Use conn.close() + del instead of bare del for Kuzu handle cleanup on fallback (avoids relying on CPython ref-counting for file lock release) - Add FileNotFoundError handling in FileHashTracker.detect_changes for files that vanish between listing and hashing - Remove redundant inline import json in cli.py _cmd_increment - Update stale _delete_file_scope docstring to match implementation Co-Authored-By: Claude Opus 4.7 --- build_ast_graph.py | 41 +++++++++++++++++++++++++++++++--------- java_codebase_rag/cli.py | 1 - 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/build_ast_graph.py b/build_ast_graph.py index 8b342f98..f83787f6 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -462,7 +462,10 @@ def detect_changes(self, source_root: Path, ignore: LayeredIgnore) -> tuple[set[ # Detect added and changed files. for rel_path in current_files: abs_path = source_root / rel_path - file_hash = _hash_file(abs_path) + try: + file_hash = _hash_file(abs_path) + except FileNotFoundError: + continue stored_hash = self._hashes.get(rel_path) if stored_hash is None: added.add(rel_path) @@ -612,9 +615,9 @@ def _find_dependents(conn: kuzu.Connection, changed_node_ids: set[str]) -> set[s def _delete_file_scope(conn: kuzu.Connection, filenames: set[str]) -> None: """Delete all nodes and edges originating from the given files. - Skip phantom nodes (filename=""). For Symbol-to-Symbol and UNRESOLVED_AT - edge tables only. Client/Producer/Route edges are handled separately in - pass 5-6 global rebuild. + Skip phantom nodes (filename=""). Deletes ALL edge types in Phase 1, + then nodes in subsequent phases. Route/Client/Producer nodes use + DETACH DELETE as a safety net for any edges missed in Phase 1. Edges are deleted in batch across all filenames first to avoid Kuzu "has connected edges" errors when edges from one file point to nodes @@ -3442,11 +3445,16 @@ def incremental_rebuild( if version < 17: if verbose: _verbose_stderr_line(f"[increment] ontology version {version} < 17; falling back to full rebuild") + conn.close() del conn, db return _fallback_to_full(source_root, kuzu_path, verbose, t_start) except Exception as e: if verbose: _verbose_stderr_line(f"[increment] failed to read ontology version: {e}; falling back to full rebuild") + try: + conn.close() + except Exception: + pass del conn, db return _fallback_to_full(source_root, kuzu_path, verbose, t_start) @@ -3550,11 +3558,8 @@ def incremental_rebuild( if verbose: _verbose_stderr_line("[increment] running global passes 5-6") - # Load all members for pass 5 + # Rebuild full tables for global pass 5-6 (pass1 populates members from scratch) tables_for_global = GraphTables() - _load_existing_members(conn, tables_for_global) - - # Rebuild asts for global scope (need for pass5/6) global_asts = pass1_parse(source_root, tables_for_global, verbose=verbose) pass5_imperative_edges(tables_for_global, global_asts, source_root=source_root, verbose=verbose) @@ -3656,7 +3661,25 @@ def _fallback_to_full(source_root: Path, kuzu_path: Path, verbose: bool, t_start def _write_clients_producers_and_calls(conn: kuzu.Connection, tables: GraphTables) -> None: - """Write Client, Producer, and cross-service edges to Kuzu.""" + """Write Route, Client, Producer, and cross-service edges to Kuzu. + + Used by the incremental rebuild's global pass 5-6 step. Writes phantom + Route nodes (created by pass5 for cross-service calls) that wouldn't + otherwise exist in Kuzu. + """ + # Write phantom routes that don't already exist (pass5 creates these for cross-service calls) + for row in tables.routes_rows: + # MERGE to avoid duplicates with routes written during scoped step + conn.execute( + "MERGE (r:Route {id: $id}) " + "SET r.kind = $kind, r.framework = $framework, r.method = $method, " + "r.path = $path, r.path_template = $path_template, r.path_regex = $path_regex, " + "r.topic = $topic, r.broker = $broker, r.feign_name = $feign_name, r.feign_url = $feign_url, " + "r.microservice = $microservice, r.module = $module, r.filename = $filename, " + "r.start_line = $start_line, r.end_line = $end_line, r.resolved = $resolved", + asdict(row), + ) + # Build node_id lookup for members and types member_by_id = {m.node_id: m for m in tables.members} diff --git a/java_codebase_rag/cli.py b/java_codebase_rag/cli.py index 25cb6e31..4d86c2d7 100644 --- a/java_codebase_rag/cli.py +++ b/java_codebase_rag/cli.py @@ -356,7 +356,6 @@ def work() -> int: # Parse stdout to check for full_fallback mode # The incremental_rebuild function returns a JSON payload with mode field try: - import json result = json.loads(g.stdout.strip()) if result.get("mode") == "full_fallback": print(