Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions java_codebase_rag/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,12 @@ def index_dir_has_existing_artifacts(index_dir: Path) -> tuple[bool, list[str]]:
if ku.exists():
paths.append(str(ku.resolve()))
if index_dir.is_dir():
try:
import lancedb

db = lancedb.connect(str(index_dir.resolve()))
for name in db.table_names():
paths.append(str((index_dir / name).resolve()) + " (Lance table)")
except Exception:
pass
# Check for Lance tables via filesystem to avoid importing lancedb,
# which spawns a BackgroundEventLoop daemon thread that causes Kuzu
# C++ segfaults in the same process.
for child in index_dir.iterdir():
if child.is_dir() and (child / "data.lance").exists():
paths.append(str(child.resolve()) + " (Lance table)")
return bool(paths), paths


Expand Down
9 changes: 9 additions & 0 deletions kuzu_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,15 @@ def __init__(self, db_path: str) -> None:
self._conn = kuzu.Connection(self._db)
self._conn_lock = threading.Lock()

def close(self) -> None:
"""Release native Kuzu objects before interpreter shutdown GC."""
if self._conn is not None:
self._conn.close()
self._conn = None
if self._db is not None:
self._db.close()
self._db = None

@classmethod
def get(cls, db_path: str | None = None) -> "KuzuGraph":
resolved = resolve_kuzu_path(db_path)
Expand Down
18 changes: 17 additions & 1 deletion mcp_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,22 @@
from java_ontology import EDGE_SCHEMA, ResolveReason
from kuzu_queries import KuzuGraph, OVERRIDE_AXIS_COMPOSED_EDGE_TYPES
from mcp_hints import generate_hints, MCP_HINTS_STRUCTURED_FIELD_DESCRIPTION
from search_lancedb import TABLES, run_search

# Populated lazily by _init_search() on first call to search_v2().
# Tests monkeypatch mcp_v2.run_search; the None sentinel lets mock
# detection skip the real import.
run_search = None
TABLES = None


def _init_search() -> None:
global run_search, TABLES
if run_search is None:
from search_lancedb import TABLES as _T, run_search as _rs

run_search = _rs
TABLES = _T


# Module-level flag set by server.py at startup from resolved config.
_hints_enabled: bool = True
Expand Down Expand Up @@ -921,6 +936,7 @@ def search_v2(
uri_path = Path(uri)
if not uri.startswith(("s3://", "gs://", "az://")) and uri_path.exists():
uri = str(uri_path.resolve())
_init_search()
table_keys = list(TABLES) if table == "all" else [table]
rows = run_search(
query,
Expand Down
13 changes: 9 additions & 4 deletions search_lancedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from collections.abc import Callable
from pathlib import Path

import lancedb
import numpy as np
from sentence_transformers import SentenceTransformer

Expand Down Expand Up @@ -48,13 +47,19 @@
_SCHEMA_LOCK = threading.Lock()


def _connect_lancedb(uri: str):
import lancedb

return lancedb.connect(uri)


def _table_columns(uri: str, lance_table_name: str, db_obj: object | None = None) -> set[str]:
key = (uri, lance_table_name)
with _SCHEMA_LOCK:
cached = _SCHEMA_CACHE.get(key)
if cached is not None:
return cached
db = db_obj if db_obj is not None else lancedb.connect(uri)
db = db_obj if db_obj is not None else _connect_lancedb(uri)
tbl = db.open_table(lance_table_name)
cols = {f.name for f in tbl.schema}
with _SCHEMA_LOCK:
Expand Down Expand Up @@ -428,7 +433,7 @@ def ensure_text_fts_index(uri: str, lance_table_name: str) -> None:
with _FTS_LOCK:
if key in _FTS_READY:
return
db = lancedb.connect(uri)
db = _connect_lancedb(uri)
tbl = db.open_table(lance_table_name)
try:
tbl.create_fts_index("text", replace=False)
Expand Down Expand Up @@ -842,7 +847,7 @@ def run_search(
query_vec = _query_vector(model, query)
fts_for_hybrid = effective_fts if effective_fts is not None else query

db = lancedb.connect(uri)
db = _connect_lancedb(uri)
need = max(limit + offset, 1)

extra_java = _build_extra_predicates(
Expand Down
5 changes: 4 additions & 1 deletion server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from kuzu_queries import KuzuGraph, resolve_kuzu_path
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from search_lancedb import TABLES
# TABLES imported lazily in list_code_index_tables_payload() to avoid
# spawning the LanceDB background event-loop thread on module import.

_COCOINDEX_TARGET = "java_index_flow_lancedb.py:JavaCodeIndexLance"
_INSTRUCTIONS = (
Expand Down Expand Up @@ -236,6 +237,8 @@ def _graph_meta_output() -> GraphMetaOutput:


def list_code_index_tables_payload() -> IndexInfoOutput:
from search_lancedb import TABLES

return IndexInfoOutput(
lancedb_uri=_resolve_lancedb_uri(),
embedding_model=resolved_sbert_model_for_process_env(SBERT_MODEL),
Expand Down
3 changes: 0 additions & 3 deletions tests/bank-chat-system/chat-core/.vscode/settings.json

This file was deleted.

15 changes: 12 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
if TYPE_CHECKING:
from build_ast_graph import GraphTables


BUNDLE_DIR = Path(__file__).resolve().parent.parent
TESTS_DIR = Path(__file__).resolve().parent
CORPUS_ROOT = TESTS_DIR / "bank-chat-system"
Expand Down Expand Up @@ -95,7 +96,11 @@ def kuzu_graph(mcp_env, kuzu_db_path: Path):

KuzuGraph._instance = None
KuzuGraph._instance_path = None
return KuzuGraph.get(str(kuzu_db_path))
graph = KuzuGraph.get(str(kuzu_db_path))
yield graph
graph.close()
KuzuGraph._instance = None
KuzuGraph._instance_path = None


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -134,7 +139,9 @@ def kuzu_graph_route_extraction_smoke(kuzu_db_path_route_extraction_smoke: Path)
"""Read-only ``KuzuGraph`` for ``route_extraction_smoke`` (own DB path; not ``KuzuGraph.get``)."""
from kuzu_queries import KuzuGraph

return KuzuGraph(str(kuzu_db_path_route_extraction_smoke))
graph = KuzuGraph(str(kuzu_db_path_route_extraction_smoke))
yield graph
graph.close()


@pytest.fixture(scope="session")
Expand All @@ -161,7 +168,9 @@ def kuzu_db_path_fqn_collision_smoke(tmp_path_factory) -> Path:
def kuzu_graph_fqn_collision_smoke(kuzu_db_path_fqn_collision_smoke: Path):
from kuzu_queries import KuzuGraph

return KuzuGraph(str(kuzu_db_path_fqn_collision_smoke))
graph = KuzuGraph(str(kuzu_db_path_fqn_collision_smoke))
yield graph
graph.close()


@pytest.fixture(scope="session")
Expand Down
13 changes: 13 additions & 0 deletions tests/test_brownfield_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import io
import os
from contextlib import redirect_stderr
from pathlib import Path

Expand Down Expand Up @@ -483,6 +484,10 @@ def test_fqn_fires_with_enrich_chunk_lance_path(tmp_path: Path) -> None:
assert c.role == "SERVICE"


@pytest.mark.skipif(
os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip() != "1",
reason="imports cocoindex/lancedb which spawns a background thread that causes Kuzu segfaults",
)
def test_tier1_java_lance_chunk_capabilities_list_type_matches_other_lists() -> None:
"""Pre-flight tier 1: `capabilities` uses the same Arrow list<string> as other list cols."""
import java_index_flow_lancedb as java_lance
Expand All @@ -509,6 +514,10 @@ def lance_anno(ftype: object) -> object:
assert l_cap == l_ann == l_sym


@pytest.mark.skipif(
os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip() != "1",
reason="imports cocoindex/lancedb which spawns a background thread that causes Kuzu segfaults",
)
def test_tier2_lance_row_carries_enrich_capabilities_without_lancedb() -> None:
"""Pre-flight tier 2: `JavaLanceChunk` row would carry the same `capabilities` as `enrich_chunk` (CocoIndex wiring)."""
import numpy as np
Expand Down Expand Up @@ -558,6 +567,10 @@ def test_tier2_lance_row_carries_enrich_capabilities_without_lancedb() -> None:
assert "MESSAGE_LISTENER" in row.capabilities


@pytest.mark.skipif(
os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip() != "1",
reason="imports lancedb which spawns a background thread that causes Kuzu segfaults",
)
def test_lance_table_round_trips_list_capabilities(tmp_path: Path) -> None:
"""Lance can store and read list<string> `capabilities` (CocoIndex write path).

Expand Down
4 changes: 4 additions & 0 deletions tests/test_java_codebase_rag_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,10 @@ def test_increment_first_run_falls_back_to_full(



@pytest.mark.skipif(
os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip() != "1",
reason="imports lancedb which spawns background thread that causes Kuzu segfaults",
)
@pytest.mark.skipif(not _cocoindex_available(), reason="cocoindex not installed in venv")
def test_increment_updates_lance_after_touch_java_file(corpus_root: Path, tmp_path: Path) -> None:
import lancedb # noqa: PLC0415
Expand Down
5 changes: 4 additions & 1 deletion tests/test_kuzu_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ def test_trace_flow_empty_seeds_returns_empty(kuzu_graph) -> None:

def _open_stale_ontology_graph(tmp_path: Path, ontology_version: int) -> Path:
db_path = tmp_path / f"stale_ontology_{ontology_version}.kuzu"
conn = kuzu.Connection(kuzu.Database(str(db_path)))
db = kuzu.Database(str(db_path))
conn = kuzu.Connection(db, num_threads=1)
conn.execute(
"CREATE NODE TABLE GraphMeta("
"key STRING PRIMARY KEY, "
Expand All @@ -387,6 +388,8 @@ def _open_stale_ontology_graph(tmp_path: Path, ontology_version: int) -> Path:
"source_root: '', counts_json: '{}', parse_errors: 0})",
{"k": "graph", "ov": ontology_version},
)
conn.close()
db.close()
return db_path


Expand Down
1 change: 1 addition & 0 deletions tests/test_lancedb_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
reason="set JAVA_CODEBASE_RAG_RUN_HEAVY=1 to run the cocoindex + LanceDB end-to-end test",
),
pytest.mark.lance_e2e,
pytest.mark.asyncio,
]

CAPABILITY_SMOKE_ROOT = Path(__file__).resolve().parent / "fixtures" / "capability_smoke"
Expand Down
5 changes: 5 additions & 0 deletions tests/test_mcp_tools.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
"""Tool-surface assertions for the v2 MCP API."""
from __future__ import annotations

import pytest

# Mark all async tests in this module to use asyncio
pytestmark = pytest.mark.asyncio


def _enum_sets(node: object) -> list[set[str]]:
found: list[set[str]] = []
Expand Down
3 changes: 3 additions & 0 deletions tests/test_mcp_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

from java_ontology import VALID_RESOLVE_REASONS

# Mark all async tests in this module to use asyncio
pytestmark = pytest.mark.asyncio

from mcp_v2 import (
Edge,
NodeFilter,
Expand Down
8 changes: 8 additions & 0 deletions tests/test_search_lancedb_capability.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
"""`run_search` with `capability=` — exercises Lance `array_has` + vector path (no CocoIndex)."""
from __future__ import annotations

import os
import uuid

import pytest

from sentence_transformers import SentenceTransformer

from ast_java import ONTOLOGY_VERSION
from index_common import SBERT_MODEL
from search_lancedb import TABLES, _query_vector, run_search

pytestmark = pytest.mark.skipif(
os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip() != "1",
reason="imports lancedb at runtime (spawns background thread that causes Kuzu segfaults)",
)


def _one_java_row_built_for_capability_filter(
*,
Expand Down
Loading