Forge
Build Argo Workflows in Go. No YAML required.
Forge is a type-safe Go SDK for building Argo Workflows programmatically — the Go equivalent of Hera (Python).
Define workflows as Go structs, get compile-time safety, and let Forge handle the YAML serialization. All 198 upstream Argo Workflows examples round-trip through Forge models.
| Without Forge | With Forge |
|---|---|
| Hand-write YAML, pray for valid indentation | Type-safe Go structs with compile-time checks |
| Copy-paste templates between files | Compose and reuse templates as Go values |
| String-based parameter references | Expression builder with autocomplete |
| Discover errors at submit time | Catch mistakes before kubectl apply |
| No programmatic workflow generation | Generate workflows dynamically from code |
go get github.com/usetheodev/theo-forgeRequires Go 1.25+.
package main
import (
"fmt"
"log"
forge "github.com/usetheodev/theo-forge"
)
func main() {
w := &forge.Workflow{
GenerateName: "hello-",
Entrypoint: "main",
Templates: []forge.Templatable{
&forge.Container{
Name: "main",
Image: "alpine:3.18",
Command: []string{"echo"},
Args: []string{"hello world"},
},
},
}
yaml, err := w.ToYAML()
if err != nil {
log.Fatal(err)
}
fmt.Println(yaml)
}Output:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-
spec:
entrypoint: main
templates:
- name: main
container:
image: alpine:3.18
command:
- echo
args:
- hello worldBuild complex dependency graphs with a fluent API:
echoTpl := &forge.Container{
Name: "echo",
Image: "alpine:3.18",
Command: []string{"echo"},
Args: []string{forge.InputParam("msg")},
Inputs: []forge.Parameter{{Name: "msg"}},
}
dag := &forge.DAG{Name: "diamond"}
A := &forge.Task{Name: "A", Template: "echo", Arguments: []forge.Parameter{{Name: "msg", Value: forge.Ptr("Task A")}}}
B := &forge.Task{Name: "B", Template: "echo", Arguments: []forge.Parameter{{Name: "msg", Value: forge.Ptr("Task B")}}}
C := &forge.Task{Name: "C", Template: "echo", Arguments: []forge.Parameter{{Name: "msg", Value: forge.Ptr("Task C")}}}
D := &forge.Task{Name: "D", Template: "echo", Arguments: []forge.Parameter{{Name: "msg", Value: forge.Ptr("Task D")}}}
A.Then(B) // A → B
A.Then(C) // A → C
B.Then(D) // B → D
C.Then(D) // C → D
dag.AddTasks(A, B, C, D)
w := &forge.Workflow{
GenerateName: "diamond-",
Entrypoint: "diamond",
Templates: []forge.Templatable{echoTpl, dag},
} A
/ \
B C
\ /
D
flip := &forge.Script{
Name: "flip-coin",
Image: "python:3.11-alpine",
Command: []string{"python"},
Source: `import random; print("heads" if random.randint(0,1) == 0 else "tails")`,
}
heads := &forge.Container{
Name: "heads", Image: "alpine:3.18",
Command: []string{"echo"}, Args: []string{"it was heads"},
}
tails := &forge.Container{
Name: "tails", Image: "alpine:3.18",
Command: []string{"echo"}, Args: []string{"it was tails"},
}
dag := &forge.DAG{Name: "coinflip"}
flipTask := &forge.Task{Name: "flip", Template: "flip-coin"}
headsTask := &forge.Task{Name: "heads", Template: "heads", When: `{{tasks.flip.outputs.result}} == "heads"`}
tailsTask := &forge.Task{Name: "tails", Template: "tails", When: `{{tasks.flip.outputs.result}} == "tails"`}
flipTask.Then(headsTask)
flipTask.Then(tailsTask)
dag.AddTasks(flipTask, headsTask, tailsTask)Submit, list, and lint workflows against a running Argo server:
import "github.com/usetheodev/theo-forge/client"
svc := client.NewWorkflowsService(
"https://argo.example.com",
"my-token",
"default",
)
// Submit a workflow
result, err := svc.CreateWorkflow(ctx, w)
// List workflows
workflows, err := svc.ListWorkflows(ctx, "")
// Lint before submitting
linted, err := svc.LintWorkflow(ctx, w)Build Argo expressions with type safety instead of raw strings:
import "github.com/usetheodev/theo-forge/expr"
// Reference task outputs
ref := expr.Tasks("my-task").Attr("outputs.result")
fmt.Println(ref.Tmpl()) // {{tasks.my-task.outputs.result}}
// Build conditionals
cond := expr.Steps("validate").Attr("outputs.result").Equals(expr.C("success"))| Type | Description |
|---|---|
Container |
Docker container execution |
Script |
Inline scripts (Python, Bash, etc.) |
DAG |
Directed acyclic graph with Task nodes |
Steps |
Sequential/parallel step groups |
ResourceTemplate |
Kubernetes resource create/apply |
HTTPTemplate |
HTTP requests |
Suspend |
Pause workflow execution |
ContainerSet |
Multiple containers in a single pod |
| Type | Description |
|---|---|
WorkflowTemplate |
Namespace-scoped reusable templates |
ClusterWorkflowTemplate |
Cluster-scoped reusable templates |
CronWorkflow |
Scheduled workflow execution |
- Parameters — Named inputs/outputs with defaults and value references
- Artifacts — S3, GCS, HTTP, Git, Raw, Azure, OSS, HDFS
- Environment variables — Literals, Secrets, ConfigMaps
- Retry strategies with backoff
- Timeouts and active deadlines
- Resource requests/limits
- Node selectors and tolerations
- Volume mounting (EmptyDir, Secret, ConfigMap, PVC, etc.)
- Synchronization (mutex, semaphore)
- Lifecycle hooks and memoization
- Parallelism limits and TTL strategies
- Metrics and gauges
- OnExit handlers
- Pod and container security contexts
- Artifact garbage collection
Build workflows commonly share a single ReadWriteOnce PVC across DAG
steps (source checkout, cache, artifacts). When Kubernetes schedules
those steps on different nodes, the second step cannot attach the volume
until the first pod fully terminates and the volume detaches — a race
that surfaces as ~10% build failure under concurrent load.
Forge fixes this at the source. When Workflow.VolumeClaimTemplates is
non-empty AND Affinity is nil, Build() injects a canonical
podAffinity term that co-locates every pod of the workflow on the
same node (matches on workflows.argoproj.io/workflow with topology
kubernetes.io/hostname).
w := &forge.Workflow{
GenerateName: "build-",
Entrypoint: "main",
VolumeClaimTemplates: []forge.PVCVolume{{
BaseVolume: forge.BaseVolume{Name: "scratch", MountPath: "/scratch"},
Size: "1Gi",
AccessModes: []forge.AccessMode{forge.ReadWriteOnce},
}},
// Affinity is nil → Build() injects default podAffinity automatically.
Templates: []forge.Templatable{
&forge.Container{Name: "main", Image: "alpine:3.18"},
},
}For GenerateName-only workflows, the label value is the Argo runtime
template variable {{workflow.name}}, which Argo Controller materializes
after generating the final workflow name.
Opt-out — workflows that legitimately parallelize across nodes (no shared PVC semantics, ReadWriteMany volumes, etc.) can suppress the injection:
w := &forge.Workflow{
Name: "fan-out",
DisableDefaultAffinity: true, // skip default injection
VolumeClaimTemplates: [...],
}A user-supplied Affinity always wins — the default is never injected
when Affinity != nil. Workflows without VolumeClaimTemplates are
unaffected (no PVC, no race).
The canonical term is exposed via forge.DefaultPodAffinityFor(w) for
inspection or programmatic reuse.
| Package | Description |
|---|---|
theo-forge |
Core builder API — main types and fluent constructors |
theo-forge/model |
Serializable types matching the Argo Workflows API schema |
theo-forge/expr |
Expression DSL for conditionals and parameter references |
theo-forge/client |
REST client for the Argo Workflows API |
theo-forge/serialize |
YAML/JSON serialization and file I/O helpers |
theo-forge/validate |
Resource unit validation (binary/decimal units) |
theo-forge/config |
Global configuration and hook management |
go test ./...
# With race detection
go test -race ./...
# Update golden files after intentional changes
go test ./... -update-goldenContributions are welcome! Here's how to get started:
- Fork the repository
- Create a branch from
develop(git checkout -b feat/my-feature develop) - Write tests first — we follow TDD strictly
- Make your changes and ensure all tests pass (
go test -race ./...) - Lint your code (
golangci-lint run) - Open a Pull Request against
develop
Please keep PRs focused — one feature or fix per PR.
- Discord: https://discord.usetheo.dev/
- X: https://x.com/usetheodev
- LinkedIn: https://linkedin.com/company/usetheodev