Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions public/agent-skills/componentYamlFormat/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
---
name: Component YAML Format
description: Reference for the Tangle component YAML specification format
---

# Tangle Component YAML Format

## Basic Structure

```yaml
name: Component Name
description: What this component does
metadata:
annotations:
cloud_pipelines.net: "true"

inputs:
- name: input_data
type: String
description: Description of input
- name: config
type: String
default: "default_value"
optional: true

outputs:
- name: output_data
type: String
description: Description of output

implementation:
container:
image: python:3.10
command:
- sh
- -c
- |
python3 -c "
# inline Python code
"
args:
- --input
- { inputPath: input_data }
- --output
- { outputPath: output_data }
- --config
- { inputValue: config }
```

## Types

Common input/output types:

- `String` — text data or file paths
- `Integer` — whole numbers
- `Float` — decimal numbers
- `Boolean` — true/false
- `JsonObject` — structured JSON data
- `JsonArray` — JSON arrays
- `URI` — file URIs or URLs
- `ApacheParquet` — Parquet files (artifact type)
- `CSV` — CSV files (artifact type)

## Graph Implementation (Pipelines)

Pipelines use `implementation.graph` instead of `implementation.container`:

```yaml
name: My Pipeline
implementation:
graph:
tasks:
task-name:
componentRef:
name: Component Name
spec: { ... }
arguments:
input_name: "{{inputs.pipeline_input}}"
other_input:
taskOutput:
taskId: other-task
outputName: output_name
outputValues:
pipeline_output:
taskOutput:
taskId: final-task
outputName: result
```

## Argument References

- `{{inputs.name}}` — reference a pipeline-level input
- `{{tasks.taskName.outputs.outputName}}` — reference a task output
- `{graphInput: {inputName: name}}` — object form of input reference
- `{taskOutput: {taskId: name, outputName: name}}` — object form of task output reference
34 changes: 34 additions & 0 deletions public/agent-skills/tangleBestPractices/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
name: Tangle Pipeline Best Practices
description: Guidelines for building well-structured, maintainable ML pipelines in Tangle
---

# Tangle Pipeline Best Practices

## Pipeline Structure

1. **Logical Stages**: Organize pipelines into clear stages — data ingestion, preprocessing, training, evaluation, deployment.
2. **Subgraph Grouping**: Group related tasks into subgraphs. Each subgraph should represent one logical stage.
3. **Size Limits**: Keep subgraphs under 7 nodes. If a subgraph grows larger, split it.
4. **Naming**: Use descriptive, human-readable names for tasks. "Preprocess Training Data" not "preprocess_1".

## Component Selection

1. **Search First**: Always check the component registry before creating new components.
2. **Single Responsibility**: Prefer components that do one thing well over monolithic ones.
3. **Type Compatibility**: Verify input/output types match when connecting components.
4. **Reuse**: If a component exists that's close to what you need, prefer it over creating a new one.

## Data Flow

1. **No Cycles**: Pipeline graphs must be directed acyclic graphs (DAGs).
2. **Explicit Connections**: Always use bindings to connect data flow — avoid implicit dependencies.
3. **Pipeline Inputs**: Use pipeline-level inputs for configurable parameters that should be settable at run time.
4. **Pipeline Outputs**: Define pipeline-level outputs for results that need to be accessible after the run.

## Common Patterns

- **Fan-out**: One source feeding multiple processors (e.g., train/test split).
- **Fan-in**: Multiple sources feeding into one aggregator.
- **Chain**: Linear sequence of transforms (most common).
- **Conditional**: Use task-level `isEnabled` for optional processing steps.
52 changes: 52 additions & 0 deletions src/agent/agents/subagents/pipelineArchitect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Pipeline-architect sub-agent — designs and builds new pipelines (or
* new stages within an existing pipeline) from a high-level user goal,
* applying edits through the CSOM tool surface inside an undo group.
*
* Skill content is awaited at agent-construction time via
* `session.skillsLoader.getSkill(id)`. The worker warms the loader at
* `init` (fire-and-forget) so by the time a turn starts the underlying
* promises are usually already resolved.
*/
import { Agent } from "@openai/agents";

import { requireOrchestratorModel } from "../../config";
import { attachObservabilityHooks } from "../../middleware/observability";
import architectPrompt from "../../prompts/architect.md?raw";
import type { AgentSession } from "../../session";
import { createCsomTools } from "../../tools/csomTools";
import { createRunTools } from "../../tools/runTools";

const REFERENCE_SKILL_IDS = [
"tangleBestPractices",
"componentYamlFormat",
] as const;

async function buildInstructions(session: AgentSession): Promise<string> {
const sections = (
await Promise.all(
REFERENCE_SKILL_IDS.map((id) => session.skillsLoader.getSkill(id)),
)
).filter((content) => content.length > 0);
if (sections.length === 0) return architectPrompt;
return `${architectPrompt}\n\n## Reference skills\n\n${sections.join("\n\n---\n\n")}`;
}

export async function createPipelineArchitectAgent(
session: AgentSession,
): Promise<Agent> {
const csom = createCsomTools(session.bridge);
const runTools = createRunTools(session.bridge);
const agent = new Agent({
name: "pipeline-architect",
handoffDescription: `Design and build new pipelines (or new stages within an existing
pipeline) from a high-level goal. Can mutate the pipeline via CSOM tools and submit a run
after a successful build when the user asks. Asks the user for input when design choices
are ambiguous.`,
instructions: await buildInstructions(session),
tools: [...csom.allTools, runTools.submitPipelineRun],
model: requireOrchestratorModel(),
});
attachObservabilityHooks(agent, session.emitStatus);
return agent;
}
11 changes: 9 additions & 2 deletions src/agent/agents/tangleDispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import dispatcherPrompt from "../prompts/dispatcher.md?raw";
import type { AgentSession } from "../session";
import { createDebugAssistantAgent } from "./subagents/debugAssistant";
import { createGeneralHelpAgent } from "./subagents/generalHelp";
import { createPipelineArchitectAgent } from "./subagents/pipelineArchitect";
import { createPipelineRepairAgent } from "./subagents/pipelineRepair";

interface DispatcherInvokeParams {
Expand All @@ -39,9 +40,10 @@ export interface TangleDispatcher {
invoke(params: DispatcherInvokeParams): Promise<DispatcherInvokeResult>;
}

function createDispatcherAgent(session: AgentSession): Agent {
async function createDispatcherAgent(session: AgentSession): Promise<Agent> {
const generalHelp = createGeneralHelpAgent(session);
const pipelineRepair = createPipelineRepairAgent(session);
const pipelineArchitect = await createPipelineArchitectAgent(session);
const debugAssistant = createDebugAssistantAgent(session);

const agent = new Agent({
Expand All @@ -59,6 +61,11 @@ function createDispatcherAgent(session: AgentSession): Agent {
toolDescription:
"Ask the pipeline-repair specialist to inspect, validate, or fix the user's currently-open pipeline, or to apply a specific CSOM mutation directive. Can also submit a pipeline run after a successful fix when the user asked. Input: a clear directive. For open-ended repair use 'Validate and fix the current pipeline.'. For a targeted fix already identified by debug-assistant, pass the exact directive, e.g. 'Set the `label_column_name` input on [Train XGBoost model on CSV](entity://task-abc123) from \"unexistent\" to \"tips\".'. Add 'and resubmit the run' to the input only if the user explicitly asked to rerun.",
}),
pipelineArchitect.asTool({
toolName: "ask_pipeline_architect",
toolDescription:
"Ask the pipeline-architect specialist to design or build new pipeline structure — a whole pipeline from scratch, a new stage in an existing pipeline, or a multi-task subgraph. Can mutate the pipeline via CSOM tools and submit a run after a successful build when the user asked. NOT for fixing validation errors or single-task tweaks (use `ask_pipeline_repair`). Input: a clear design directive, e.g. 'Build a pipeline that loads a CSV, trains an XGBoost model on the `tips` column, and exposes the trained model as a pipeline output.'. Add 'and submit the run' to the input only if the user explicitly asked to run.",
}),
debugAssistant.asTool({
toolName: "ask_debug_assistant",
toolDescription:
Expand All @@ -85,7 +92,7 @@ export function createDispatcher(): TangleDispatcher {
async invoke(params) {
params.session.proxyClient.ensureConfigured(params.token);
const sessionMemory = getOrCreateSessionMemory(params.threadId);
const agent = createDispatcherAgent(params.session);
const agent = await createDispatcherAgent(params.session);
const result = await run(agent, params.message, {
session: sessionMemory,
});
Expand Down
10 changes: 10 additions & 0 deletions src/agent/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export const config = aiAssistantConfig as {
orchestratorModel: string;
subagentModel: string;
embeddingModel: string;
skillsBaseUrl: string;
};

function requireProxyBaseUrl(): string {
Expand Down Expand Up @@ -63,6 +64,15 @@ export function requireEmbeddingModel(): string {
return config.embeddingModel;
}

export function requireSkillsBaseUrl(): string {
if (!config.skillsBaseUrl) {
throw new Error(
"AI assistant: skillsBaseUrl is empty. Set it in src/config/aiAssistantConfig.json.",
);
}
return config.skillsBaseUrl;
}

/**
* Read-only seam used by tools (e.g. `searchDocs`) that need the
* configured `OpenAI` client. `ProxyClient` implements this; tests can
Expand Down
1 change: 1 addition & 0 deletions src/agent/middleware/observability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const TOOL_STATUS_LABELS: Record<string, string> = {
// the dispatcher has no handoffs anymore.
ask_general_help: "Looking up information...",
ask_pipeline_repair: "Asking pipeline-repair...",
ask_pipeline_architect: "Designing pipeline...",
ask_debug_assistant: "Analyzing run failure...",
};

Expand Down
83 changes: 83 additions & 0 deletions src/agent/prompts/architect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Pipeline Architect — System Prompt

You are the **Pipeline Architect** specialist for Tangle Pipeline Studio. Your job is to design and build new pipelines — or new stages within an existing pipeline — from a high-level user goal. You translate intent into a concrete graph of tasks, bindings, inputs, and outputs.

## Your Workflow

1. Call `get_pipeline_state` first to understand whether the canvas is empty or already contains tasks. Note the existing tasks' `$id`, `name`, and component `inputs` / `outputs` so you can wire new structure into the right ports.
2. **Plan before mutating.** Sketch the target graph in your head (or as a short bulleted plan in your reply) before issuing any CSOM tool calls. Identify:
- The stages the user described (e.g. ingest, preprocess, train, evaluate).
- Which existing tasks (if any) cover those stages.
- The new tasks you need to add, the pipeline-level inputs the user must configure at run time, and the pipeline-level outputs that should be exposed.
3. **Build incrementally.** Add tasks and pipeline-level I/O first, then wire bindings, then set literal arguments. Call `validate_pipeline` after major edits and before finishing.
4. For ambiguous decisions (which dataset format? which evaluation metric? which output to expose?), ask the user one clear question instead of guessing. Do not invent component names, IDs, or input/output ports.
5. When the design is structurally sound, summarize what you built using the entity-link summary format below.

## Component-availability constraint (beta)

Component lookup is not yet wired into the architect (it lands in a later release alongside `search_components`). Until then:

- You can freely **add, rename, delete, connect, and configure** tasks whose components are already referenced somewhere in the current pipeline spec — those `componentRef` payloads are visible in `get_pipeline_state`.
- You cannot conjure brand-new components from thin air. If the user asks for a stage that needs a component the spec does not already reference, say so plainly, suggest the closest existing component, and ask whether the user wants to add the component manually first.
- For an empty canvas, ask the user which components they want to start from (or which template), rather than fabricating tasks the system cannot resolve.

## Subgraph design

When the pipeline grows beyond a handful of tasks, group related work into subgraphs (`create_subgraph`) so the canvas stays readable. Follow the guidance in the `## Reference skills` section below (when present): each subgraph should represent one logical stage, stay under ~7 inner tasks, and have a descriptive human-readable name. Never wrap a single task in a subgraph.

## Submitting runs

You have access to `submit_pipeline_run`, which submits the current pipeline to the backend. Use it ONLY when:

1. The dispatcher's `input` explicitly asked to run, rerun, submit, or "build it and run it" (typically the input ends with "and submit the run.").
2. You have completed your edits and the most recent `validate_pipeline` call returned no errors. If validation still has errors, do not submit; explain what is still broken so the user can resolve it.

`submit_pipeline_run` takes no arguments — it always submits whatever pipeline is currently open. After a successful submission, include the returned `runId` in your summary so the dispatcher can mention it to the user.

## CSOM Entity Model

- **Tasks** — nodes referencing components, each with `$id`, `name`, `componentRef`.
- **Inputs** — pipeline-level input ports with `$id`, `name`, `type`.
- **Outputs** — pipeline-level output ports with `$id`, `name`, `type`.
- **Bindings** — directed edges from source entity/port to target entity/port.

Every entity has a stable `$id`. Use these IDs when referencing entities in tool calls.

## Active subgraph context

`get_pipeline_state` may include an `activeSubgraphPath` field — a breadcrumb of subgraph task names from the root pipeline to whatever subgraph the user is currently viewing. Treat this as a hint about where the user's attention is, but remember: every CSOM mutation always applies to the root spec. When you build new structure, prefer extending the root pipeline (or an explicit subgraph the user named) rather than the nested view the user happens to be focused on.

## When to defer to another specialist

- Targeted edits to fix validation errors in an existing pipeline → defer to **pipeline-repair**.
- Diagnosing a failed pipeline run → defer to **debug-assistant**.
- General product / docs questions → defer to **general-help**.

You build new structure. Repair fixes existing structure. The dispatcher routes; if you find yourself about to make a single-task tweak to fix a validation error, stop and explain that pipeline-repair is the right specialist for that.

## Response Formatting

When referring to pipeline entities (tasks, inputs, outputs) in your response, use this markdown link format so the UI can render them as interactive chips:

```
[Entity Name](entity://$id)
```

Examples:

- "Added [Load CSV](entity://task-abc123) to ingest the training data."
- "Wired the model output to [trained_model](entity://output-xyz789)."

After applying changes, include a summary using entity links:

```
## Pipeline Built
- Ingest: [Load CSV](entity://task-abc)
- Preprocess: [Normalize Features](entity://task-def)
- Train: [Train XGBoost](entity://task-ghi)
- Exposed [trained_model](entity://output-xyz) for downstream consumers.
```

## Response Style

Be deliberate and transparent. State your plan first, then execute it, then summarize what you built. If you had to make an assumption (e.g. "I'm assuming the input CSV has a `label` column"), call it out so the user can correct you in the next turn.
Loading
Loading