Skip to content

[BREAKING] feat: route data retrieval through lumid-data-app via type:lumid (sql/s3/agent)#67

Open
timzsu wants to merge 3 commits into
mainfrom
zsu/lumid-data-app
Open

[BREAKING] feat: route data retrieval through lumid-data-app via type:lumid (sql/s3/agent)#67
timzsu wants to merge 3 commits into
mainfrom
zsu/lumid-data-app

Conversation

@timzsu
Copy link
Copy Markdown
Collaborator

@timzsu timzsu commented Jun 4, 2026

Purpose

Unify FlowMesh's data retrieval on a single connector that talks to lumid-data-app over HTTP. A new type: lumid retrieval (with mode: sql | s3 | agent) covers all three patterns through one LumidDataConnector, so the Postgres and S3 connectors can be retired later. Replaces the old type: agent path (which imported the lumid_data SDK).

Results are never returned inline: each lumid node materializes to object storage and the executor surfaces a pointer (materialized_uri under /blobs/) plus the access_chain.

Changes

  • src/worker/connectors/lumid_data_connector.py (new) — LumidDataConnector (httpx) with three modes: sqlPOST /retrieve, agentPOST /agent/v1 (buffered SSE), objectGET /blobs/<key>; plus list_objects via GET /blobs. Forwards lumid_data_token as the bearer.
  • src/worker/connectors/agent_connector.py (deleted) and __init__.pyAgentConnectorLumidDataConnector.
  • src/worker/executors/data_retrieval_executor.py — new _run_lumid (sql/s3/agent), _run_agent removed; lumid_data_token is a required field on lumid nodes.
  • pyproject.toml, uv.lock, src/worker/requirements/requirements.txt — drop the lumid-data-sdk dependency (relocked + regenerated).
  • examples/templates/data_retrieval.yaml (new) — one example covering all three modes + a downstream node consuming access_chain; replaces data_retrieval_agent.yaml and data_retrieval_then_inference.yaml (removed).
  • tests/worker/test_lumid_data_connector.py (replaces test_agent_connector.py) — respx-mocked connector + executor-branch coverage.
  • docs/EXECUTORS.md, docs/WORKFLOWS.md, docs/ENV.md — document type: lumid, the required lumid_data_token, and remove the obsolete LUMID_DATA_TOKEN worker env.

type: sql and type: s3 (the connection-string connectors) are unchanged — they will retire after the lumid path becomes stable.

Design

lumid-data-app exposes one auth-gated surface: POST /retrieve ({sql} or {plan}), GET /blobs?prefix=&delimiter= / GET /blobs/<key>, and POST /agent/v1. The connector maps each retrieval mode onto it and forwards lumid_data_token as the bearer — all systems share lum.id auth, so this is the caller's PAT (or a local dev key from lumid-data-app's LUMID_API_KEYS). The token is a required per-node spec field; FlowMesh does not inject or default it. Responses are pointer-based (no inline rows), matching the RetrievalResult contract.

Test Plan

Full-stack e2e against a live lumid-data-app with demo data:

export FLOWMESH_BASE_URL=xxx
export LUMID_PAT=xxx
export LUMID_BASE_URL=xxx
export NEWS_KEY=$(curl -fsS -H "Authorization: Bearer $LUMID_PAT" \
   "$LUMID_BASE_URL/blobs?prefix=demo/unstructured/news-html/&limit=50" \
   | python3 -c "import sys,json;print([o['key'] for o in json.load(sys.stdin)['objects'] if o['key'].endswith('.html')][0])")
envsubst '${LUMID_BASE_URL} ${LUMID_PAT} ${NEWS_KEY}' < examples/templates/data_retrieval.yaml > /tmp/wf.yaml
WFL=$(uv run flowmesh workflow submit /tmp/wf.yaml | python3 -c "import sys,json;print(json.load(sys.stdin)['workflow_id'])")
uv run flowmesh workflow watch "$WFL"

Test Result

  • Full-stack e2e (sql + s3 + agent) validated end-to-end against a live lumid-data-app.

Migration (breaking)

  • type: agent is removed → use type: lumid with mode: agent.
  • Every type: lumid node now requires lumid_data_token (set to your lum.id PAT, or a local dev key).
  • The lumid-data-sdk dependency is gone; the worker reaches lumid-data-app over HTTP.

Pre-submission Checklist
  • I have read the contribution guidelines.
  • I have run pre-commit run --all-files and fixed any issues.
  • I have added or updated tests covering my changes (if applicable).
  • I have verified that uv run pytest tests/ passes locally.
  • If I changed shared schemas or proto definitions, I have checked downstream compatibility across Server and Worker.
  • If I changed the SDK or CLI, I have verified the affected packages work.
  • If this is a breaking change, I have prefixed the PR title with [BREAKING] and described migration steps above.
  • I have updated documentation or config examples if user-facing behavior changed.

timzsu added 3 commits June 4, 2026 19:42
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu marked this pull request as ready for review June 4, 2026 16:29
@timzsu timzsu requested a review from kaiitunnz as a code owner June 4, 2026 16:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant