From 419973aa84c62d64d9ce506839315833aa566573 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 15:23:19 +0530 Subject: [PATCH 01/16] add new health check endpoint with plugin interface --- internal/pkg/heimdall/health.go | 151 ++++++++++++++++++++++++++++++ internal/pkg/heimdall/heimdall.go | 1 + pkg/object/command/command.go | 1 + pkg/plugin/plugin.go | 4 + 4 files changed, 157 insertions(+) create mode 100644 internal/pkg/heimdall/health.go diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go new file mode 100644 index 00000000..83d31c13 --- /dev/null +++ b/internal/pkg/heimdall/health.go @@ -0,0 +1,151 @@ +package heimdall + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/patterninc/heimdall/pkg/object/cluster" + "github.com/patterninc/heimdall/pkg/object/command" + "github.com/patterninc/heimdall/pkg/object/job" + "github.com/patterninc/heimdall/pkg/object/status" + "github.com/patterninc/heimdall/pkg/plugin" +) + +const ( + healthCheckTimeout = 30 * time.Second + healthCheckUser = `heimdall-health` + healthStatusOK = `ok` + healthStatusError = `error` +) + +type healthCheckResult struct { + CommandID string `json:"command_id"` + ClusterID string `json:"cluster_id"` + Status string `json:"status"` + LatencyMs int64 `json:"latency_ms"` + Error string `json:"error,omitempty"` +} + +type healthChecksResponse struct { + Healthy bool `json:"healthy"` + Checks []healthCheckResult `json:"checks"` +} + +type healthPair struct { + cmd *command.Command + cluster *cluster.Cluster + handler plugin.Handler +} + +func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) + defer cancel() + + results := h.runHealthChecks(ctx, h.resolveHealthPairs()) + + healthy := true + for _, res := range results { + if res.Status == healthStatusError { + healthy = false + break + } + } + + resp := healthChecksResponse{Healthy: healthy, Checks: results} + data, _ := json.Marshal(resp) + + w.Header().Set(contentTypeKey, contentTypeJSON) + if healthy { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + w.Write(data) +} + +func (h *Heimdall) resolveHealthPairs() []*healthPair { + var pairs []*healthPair + for _, cmd := range h.Commands { + if !cmd.HealthCheck || cmd.Status != status.Active { + continue + } + for _, cl := range h.Clusters { + if cl.Status != status.Active { + continue + } + if cl.Tags.Contains(cmd.ClusterTags) { + pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]}) + } + } + } + return pairs +} + +func (h *Heimdall) runHealthChecks(ctx context.Context, pairs []*healthPair) []healthCheckResult { + results := make([]healthCheckResult, len(pairs)) + var wg sync.WaitGroup + for i, pair := range pairs { + wg.Add(1) + go func(i int, pair *healthPair) { + defer wg.Done() + results[i] = h.checkPair(ctx, pair) + }(i, pair) + } + wg.Wait() + return results +} + +func (h *Heimdall) checkPair(ctx context.Context, pair *healthPair) healthCheckResult { + start := time.Now() + res := healthCheckResult{CommandID: pair.cmd.ID, ClusterID: pair.cluster.ID} + + var err error + if hc, ok := pair.handler.(plugin.HealthChecker); ok { + err = hc.HealthCheck(ctx, pair.cluster) + } else { + err = h.pluginProbe(ctx, pair.cluster, pair.handler) + } + + res.LatencyMs = time.Since(start).Milliseconds() + if err != nil { + res.Status = healthStatusError + res.Error = err.Error() + } else { + res.Status = healthStatusOK + } + return res +} + +func (h *Heimdall) pluginProbe(ctx context.Context, cl *cluster.Cluster, handler plugin.Handler) error { + j := &job.Job{} + j.ID = uuid.NewString() + j.User = healthCheckUser + + tmpDir, err := os.MkdirTemp("", "heimdall-health-*") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + + runtime := &plugin.Runtime{ + WorkingDirectory: tmpDir, + ResultDirectory: tmpDir + separator + "result", + Version: h.Version, + UserAgent: fmt.Sprintf(formatUserAgent, h.Version), + } + + if err := runtime.Set(); err != nil { + return err + } + defer runtime.Stdout.Close() + defer runtime.Stderr.Close() + + return handler.Execute(ctx, runtime, j, cl) +} diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index 10ac68a5..36c303b0 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -172,6 +172,7 @@ func (h *Heimdall) Start() error { apiRouter := router.PathPrefix(defaultAPIPrefix).Subrouter() // job(s) endpoints... + apiRouter.Methods(methodGET).Path(`/health`).HandlerFunc(h.healthHandler) apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus)) apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob)) diff --git a/pkg/object/command/command.go b/pkg/object/command/command.go index d31fadd3..aef7de8a 100644 --- a/pkg/object/command/command.go +++ b/pkg/object/command/command.go @@ -20,6 +20,7 @@ type Command struct { Plugin string `yaml:"plugin,omitempty" json:"plugin,omitempty"` IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"` ClusterTags *set.Set[string] `yaml:"cluster_tags,omitempty" json:"cluster_tags,omitempty"` + HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"` Handler plugin.Handler `yaml:"-" json:"-"` } diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 2f72af2e..ef7c6910 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -11,3 +11,7 @@ type Handler interface { Execute(context.Context, *Runtime, *job.Job, *cluster.Cluster) error Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error } + +type HealthChecker interface { + HealthCheck(ctx context.Context, c *cluster.Cluster) error +} From b93cbd64e89b6410051573465091e63d7014018a Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 15:26:59 +0530 Subject: [PATCH 02/16] change endpoint --- internal/pkg/heimdall/heimdall.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index 36c303b0..be23c386 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -172,7 +172,7 @@ func (h *Heimdall) Start() error { apiRouter := router.PathPrefix(defaultAPIPrefix).Subrouter() // job(s) endpoints... - apiRouter.Methods(methodGET).Path(`/health`).HandlerFunc(h.healthHandler) + apiRouter.Methods(methodGET).Path(`/command/health`).HandlerFunc(h.healthHandler) apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus)) apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob)) From 2132855a303733ddf7b9dd391a6a9fba39fcc035 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 16:56:02 +0530 Subject: [PATCH 03/16] formatting and in memory changes --- configs/local.yaml | 1 + internal/pkg/heimdall/command_dal.go | 5 ++ internal/pkg/heimdall/heimdall.go | 2 +- internal/pkg/janitor/janitor.go | 6 +-- .../object/command/clickhouse/column_types.go | 49 +++++++++---------- .../pkg/object/command/postgres/postgres.go | 4 +- 6 files changed, 36 insertions(+), 31 deletions(-) diff --git a/configs/local.yaml b/configs/local.yaml index a61c8da9..f2a49bea 100644 --- a/configs/local.yaml +++ b/configs/local.yaml @@ -28,6 +28,7 @@ commands: version: 0.0.1 store_result_sync: false description: Test ping command + health_check: true tags: - type:ping cluster_tags: diff --git a/internal/pkg/heimdall/command_dal.go b/internal/pkg/heimdall/command_dal.go index 22748014..6c5a7be4 100644 --- a/internal/pkg/heimdall/command_dal.go +++ b/internal/pkg/heimdall/command_dal.go @@ -269,6 +269,11 @@ func (h *Heimdall) updateCommandStatus(ctx context.Context, c *command.Command) return nil, ErrUnknownCommandID } + // keep in-memory state in sync with the DB + if cmd, ok := h.Commands[c.ID]; ok { + cmd.Status = c.Status + } + updateCommandStatusMethod.CountSuccess() return h.getCommandStatus(ctx, c) diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index be23c386..f5adcf15 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -172,7 +172,7 @@ func (h *Heimdall) Start() error { apiRouter := router.PathPrefix(defaultAPIPrefix).Subrouter() // job(s) endpoints... - apiRouter.Methods(methodGET).Path(`/command/health`).HandlerFunc(h.healthHandler) + apiRouter.Methods(methodGET).PathPrefix(`/command/health`).HandlerFunc(h.healthHandler) apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus)) apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob)) diff --git a/internal/pkg/janitor/janitor.go b/internal/pkg/janitor/janitor.go index a2698882..543deef7 100644 --- a/internal/pkg/janitor/janitor.go +++ b/internal/pkg/janitor/janitor.go @@ -21,10 +21,10 @@ type Janitor struct { Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"` - CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"` + CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"` db *database.Database - commandHandlers map[string]plugin.Handler - clusters cluster.Clusters + commandHandlers map[string]plugin.Handler + clusters cluster.Clusters } func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin.Handler, clusters cluster.Clusters) error { diff --git a/internal/pkg/object/command/clickhouse/column_types.go b/internal/pkg/object/command/clickhouse/column_types.go index 79b05a98..aafd5c79 100644 --- a/internal/pkg/object/command/clickhouse/column_types.go +++ b/internal/pkg/object/command/clickhouse/column_types.go @@ -148,32 +148,31 @@ func handleDecimal(nullable bool) (any, func() any) { } } func handleTuple(nullable bool) (any, func() any) { - if nullable { - var p *any - return &p, func() any { - if p == nil || *p == nil { - return nil - } - return *p - } - } - var v any - return &v, func() any { return v } + if nullable { + var p *any + return &p, func() any { + if p == nil || *p == nil { + return nil + } + return *p + } + } + var v any + return &v, func() any { return v } } - func handleArray(nullable bool) (any, func() any) { - if nullable { - var p *any - return &p, func() any { - if p == nil || *p == nil { - return nil - } - return *p - } - } - var v any - return &v, func() any { return v } + if nullable { + var p *any + return &p, func() any { + if p == nil || *p == nil { + return nil + } + return *p + } + } + var v any + return &v, func() any { return v } } func handleDefault(nullable bool) (any, func() any) { @@ -207,8 +206,8 @@ func unwrapCHType(t string) (base string, nullable bool) { return "Array", nullable } if strings.HasPrefix(s, "Tuple(") { - return "Tuple", nullable - } + return "Tuple", nullable + } // Decimal(N,S) normalize to "Decimal" if isDecimal(s) { diff --git a/internal/pkg/object/command/postgres/postgres.go b/internal/pkg/object/command/postgres/postgres.go index 56dbd87c..034a13c5 100644 --- a/internal/pkg/object/command/postgres/postgres.go +++ b/internal/pkg/object/command/postgres/postgres.go @@ -156,7 +156,7 @@ func splitAndTrimQueries(query string) []string { return queries } -func (p *postgresCommandContext)Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error{ +func (p *postgresCommandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // implement me return nil -} \ No newline at end of file +} From cb3438cd7cabf622320354e18c75560b444ecb36 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 17:07:47 +0530 Subject: [PATCH 04/16] remove in memory changes and use DB for status --- internal/pkg/heimdall/command_dal.go | 5 ----- internal/pkg/heimdall/health.go | 23 ++++++++++++++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/internal/pkg/heimdall/command_dal.go b/internal/pkg/heimdall/command_dal.go index 6c5a7be4..22748014 100644 --- a/internal/pkg/heimdall/command_dal.go +++ b/internal/pkg/heimdall/command_dal.go @@ -269,11 +269,6 @@ func (h *Heimdall) updateCommandStatus(ctx context.Context, c *command.Command) return nil, ErrUnknownCommandID } - // keep in-memory state in sync with the DB - if cmd, ok := h.Commands[c.ID]; ok { - cmd.Status = c.Status - } - updateCommandStatusMethod.CountSuccess() return h.getCommandStatus(ctx, c) diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index 83d31c13..ca888b53 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -48,7 +48,7 @@ func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) defer cancel() - results := h.runHealthChecks(ctx, h.resolveHealthPairs()) + results := h.runHealthChecks(ctx, h.resolveHealthPairs(ctx)) healthy := true for _, res := range results { @@ -70,18 +70,27 @@ func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { w.Write(data) } -func (h *Heimdall) resolveHealthPairs() []*healthPair { +func (h *Heimdall) resolveHealthPairs(ctx context.Context) []*healthPair { var pairs []*healthPair for _, cmd := range h.Commands { - if !cmd.HealthCheck || cmd.Status != status.Active { + if !cmd.HealthCheck { continue } - for _, cl := range h.Clusters { - if cl.Status != status.Active { + // check DB for command status to avoid unnecessary health checks for inactive commands + dbCmd, err := h.getCommandStatus(ctx, cmd) + if err != nil { + continue + } + if dbCmd.(*command.Command).Status != status.Active { + continue + } + // find active clusters matching command's cluster tags + for _, cluster := range h.Clusters { + if cluster.Status != status.Active { continue } - if cl.Tags.Contains(cmd.ClusterTags) { - pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]}) + if cluster.Tags.Contains(cmd.ClusterTags) { + pairs = append(pairs, &healthPair{cmd, cluster, h.commandHandlers[cmd.ID]}) } } } From a550c567401606a1ca50071456fcdefd8f6250ef Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 18:33:10 +0530 Subject: [PATCH 05/16] add per command endpoint --- internal/pkg/heimdall/health.go | 75 +++++++++++++++++++++++-------- internal/pkg/heimdall/heimdall.go | 3 +- 2 files changed, 58 insertions(+), 20 deletions(-) diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index ca888b53..c9372fb3 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/gorilla/mux" "github.com/patterninc/heimdall/pkg/object/cluster" "github.com/patterninc/heimdall/pkg/object/command" @@ -47,8 +48,29 @@ type healthPair struct { func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) defer cancel() + h.writeHealthResponse(w, ctx, nil) +} - results := h.runHealthChecks(ctx, h.resolveHealthPairs(ctx)) +func (h *Heimdall) commandHealthHandler(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) + defer cancel() + + commandID := mux.Vars(r)[idKey] + cmd, found := h.Commands[commandID] + if !found { + writeAPIError(w, ErrUnknownCommandID, nil) + return + } + if !cmd.HealthCheck { + writeAPIError(w, fmt.Errorf("command %s has not opted into health checks", commandID), nil) + return + } + + h.writeHealthResponse(w, ctx, &commandID) +} + +func (h *Heimdall) writeHealthResponse(w http.ResponseWriter, ctx context.Context, commandID *string) { + results := h.runHealthChecks(ctx, h.resolveHealthPairs(ctx, commandID)) healthy := true for _, res := range results { @@ -70,28 +92,43 @@ func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { w.Write(data) } -func (h *Heimdall) resolveHealthPairs(ctx context.Context) []*healthPair { +func (h *Heimdall) resolveHealthPairs(ctx context.Context, commandID *string) []*healthPair { var pairs []*healthPair - for _, cmd := range h.Commands { - if !cmd.HealthCheck { - continue - } - // check DB for command status to avoid unnecessary health checks for inactive commands - dbCmd, err := h.getCommandStatus(ctx, cmd) - if err != nil { - continue + if commandID != nil { + cmd, found := h.Commands[*commandID] + if !found { + return pairs } - if dbCmd.(*command.Command).Status != status.Active { + return h.resolveHealthPairsForCommand(ctx, cmd) + } + for _, cmd := range h.Commands { + pairs = append(pairs, h.resolveHealthPairsForCommand(ctx, cmd)...) + } + return pairs +} + +func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *command.Command) []*healthPair { + var pairs []*healthPair + if cmd == nil { + return pairs + } + if !cmd.HealthCheck { + return pairs + } + // check DB for command status to avoid unnecessary health checks for inactive commands + dbCmd, err := h.getCommandStatus(ctx, cmd) + if err != nil { + return pairs + } + if dbCmd.(*command.Command).Status != status.Active { + return pairs + } + for _, cl := range h.Clusters { + if cl.Status != status.Active { continue } - // find active clusters matching command's cluster tags - for _, cluster := range h.Clusters { - if cluster.Status != status.Active { - continue - } - if cluster.Tags.Contains(cmd.ClusterTags) { - pairs = append(pairs, &healthPair{cmd, cluster, h.commandHandlers[cmd.ID]}) - } + if cl.Tags.Contains(cmd.ClusterTags) { + pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]}) } } return pairs diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index f5adcf15..e158960d 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -172,7 +172,6 @@ func (h *Heimdall) Start() error { apiRouter := router.PathPrefix(defaultAPIPrefix).Subrouter() // job(s) endpoints... - apiRouter.Methods(methodGET).PathPrefix(`/command/health`).HandlerFunc(h.healthHandler) apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus)) apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob)) @@ -183,6 +182,8 @@ func (h *Heimdall) Start() error { apiRouter.Methods(methodGET).PathPrefix(`/command/statuses`).HandlerFunc(payloadHandler(h.getCommandStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/command/{id}/status`).HandlerFunc(payloadHandler(h.getCommandStatus)) apiRouter.Methods(methodPUT).PathPrefix(`/command/{id}/status`).HandlerFunc(payloadHandler(h.updateCommandStatus)) + apiRouter.Methods(methodGET).PathPrefix(`/command/health`).HandlerFunc(h.healthHandler) + apiRouter.Methods(methodGET).PathPrefix(`/command/{id}/health`).HandlerFunc(h.commandHealthHandler) apiRouter.Methods(methodPUT).PathPrefix(`/command/{id}`).HandlerFunc(payloadHandler(h.submitCommand)) apiRouter.Methods(methodGET).PathPrefix(`/command/{id}`).HandlerFunc(payloadHandler(h.getCommand)) apiRouter.Methods(methodGET).PathPrefix(`/commands`).HandlerFunc(payloadHandler(h.getCommands)) From c1afd8635ff9c1652c8b7dd056c5795f5185b1fa Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 6 Apr 2026 20:08:12 +0530 Subject: [PATCH 06/16] add semaphore --- internal/pkg/heimdall/health.go | 43 +++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index c9372fb3..b3f37e86 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -20,10 +20,11 @@ import ( ) const ( - healthCheckTimeout = 30 * time.Second - healthCheckUser = `heimdall-health` - healthStatusOK = `ok` - healthStatusError = `error` + healthCheckTimeout = 30 * time.Second + healthCheckUser = `heimdall-health` + healthStatusOK = `ok` + healthStatusError = `error` + healthCheckConcurrency = 10 ) type healthCheckResult struct { @@ -70,7 +71,12 @@ func (h *Heimdall) commandHealthHandler(w http.ResponseWriter, r *http.Request) } func (h *Heimdall) writeHealthResponse(w http.ResponseWriter, ctx context.Context, commandID *string) { - results := h.runHealthChecks(ctx, h.resolveHealthPairs(ctx, commandID)) + pairs, err := h.resolveHealthPairs(ctx, commandID) + if err != nil { + writeAPIError(w, fmt.Errorf("error resolving health check pairs: %w", err), nil) + return + } + results := h.runHealthChecks(ctx, pairs) healthy := true for _, res := range results { @@ -92,36 +98,40 @@ func (h *Heimdall) writeHealthResponse(w http.ResponseWriter, ctx context.Contex w.Write(data) } -func (h *Heimdall) resolveHealthPairs(ctx context.Context, commandID *string) []*healthPair { +func (h *Heimdall) resolveHealthPairs(ctx context.Context, commandID *string) ([]*healthPair, error) { var pairs []*healthPair if commandID != nil { cmd, found := h.Commands[*commandID] if !found { - return pairs + return pairs, nil } return h.resolveHealthPairsForCommand(ctx, cmd) } for _, cmd := range h.Commands { - pairs = append(pairs, h.resolveHealthPairsForCommand(ctx, cmd)...) + cmdPairs, err := h.resolveHealthPairsForCommand(ctx, cmd) + if err != nil { + return pairs, err + } + pairs = append(pairs, cmdPairs...) } - return pairs + return pairs, nil } -func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *command.Command) []*healthPair { +func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *command.Command) ([]*healthPair, error) { var pairs []*healthPair if cmd == nil { - return pairs + return pairs, nil } if !cmd.HealthCheck { - return pairs + return pairs, nil } // check DB for command status to avoid unnecessary health checks for inactive commands dbCmd, err := h.getCommandStatus(ctx, cmd) if err != nil { - return pairs + return pairs, err } if dbCmd.(*command.Command).Status != status.Active { - return pairs + return pairs, nil } for _, cl := range h.Clusters { if cl.Status != status.Active { @@ -131,16 +141,19 @@ func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *comman pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]}) } } - return pairs + return pairs, nil } func (h *Heimdall) runHealthChecks(ctx context.Context, pairs []*healthPair) []healthCheckResult { results := make([]healthCheckResult, len(pairs)) + sem := make(chan struct{}, healthCheckConcurrency) var wg sync.WaitGroup for i, pair := range pairs { wg.Add(1) go func(i int, pair *healthPair) { defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() results[i] = h.checkPair(ctx, pair) }(i, pair) } From 765d5972e65acc2660d91dcfb24d729888e2e57c Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Tue, 28 Apr 2026 12:13:33 +0530 Subject: [PATCH 07/16] shift to cluter healthchecks --- internal/pkg/heimdall/health.go | 176 +++++++++--------------------- internal/pkg/heimdall/heimdall.go | 3 +- pkg/object/cluster/cluster.go | 1 + pkg/object/command/command.go | 1 - 4 files changed, 52 insertions(+), 129 deletions(-) diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index b3f37e86..e23a5551 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -3,36 +3,36 @@ package heimdall import ( "context" "encoding/json" - "fmt" "net/http" - "os" "sync" "time" - "github.com/google/uuid" - "github.com/gorilla/mux" - "github.com/patterninc/heimdall/pkg/object/cluster" - "github.com/patterninc/heimdall/pkg/object/command" - "github.com/patterninc/heimdall/pkg/object/job" "github.com/patterninc/heimdall/pkg/object/status" "github.com/patterninc/heimdall/pkg/plugin" ) const ( healthCheckTimeout = 30 * time.Second - healthCheckUser = `heimdall-health` + healthCheckConcurrency = 10 healthStatusOK = `ok` healthStatusError = `error` - healthCheckConcurrency = 10 + healthStatusUnchecked = `unchecked` ) +type clusterProbe struct { + cluster *cluster.Cluster + handler plugin.Handler + pluginName string +} + type healthCheckResult struct { - CommandID string `json:"command_id"` - ClusterID string `json:"cluster_id"` - Status string `json:"status"` - LatencyMs int64 `json:"latency_ms"` - Error string `json:"error,omitempty"` + ClusterID string `json:"cluster_id"` + ClusterName string `json:"cluster_name"` + Plugin string `json:"plugin"` + Status string `json:"status"` + LatencyMs int64 `json:"latency_ms"` + Error string `json:"error,omitempty"` } type healthChecksResponse struct { @@ -40,43 +40,12 @@ type healthChecksResponse struct { Checks []healthCheckResult `json:"checks"` } -type healthPair struct { - cmd *command.Command - cluster *cluster.Cluster - handler plugin.Handler -} - func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) defer cancel() - h.writeHealthResponse(w, ctx, nil) -} -func (h *Heimdall) commandHealthHandler(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) - defer cancel() - - commandID := mux.Vars(r)[idKey] - cmd, found := h.Commands[commandID] - if !found { - writeAPIError(w, ErrUnknownCommandID, nil) - return - } - if !cmd.HealthCheck { - writeAPIError(w, fmt.Errorf("command %s has not opted into health checks", commandID), nil) - return - } - - h.writeHealthResponse(w, ctx, &commandID) -} - -func (h *Heimdall) writeHealthResponse(w http.ResponseWriter, ctx context.Context, commandID *string) { - pairs, err := h.resolveHealthPairs(ctx, commandID) - if err != nil { - writeAPIError(w, fmt.Errorf("error resolving health check pairs: %w", err), nil) - return - } - results := h.runHealthChecks(ctx, pairs) + probes := h.resolveClusterProbes() + results := h.runHealthChecks(ctx, probes) healthy := true for _, res := range results { @@ -98,80 +67,62 @@ func (h *Heimdall) writeHealthResponse(w http.ResponseWriter, ctx context.Contex w.Write(data) } -func (h *Heimdall) resolveHealthPairs(ctx context.Context, commandID *string) ([]*healthPair, error) { - var pairs []*healthPair - if commandID != nil { - cmd, found := h.Commands[*commandID] - if !found { - return pairs, nil - } - return h.resolveHealthPairsForCommand(ctx, cmd) - } - for _, cmd := range h.Commands { - cmdPairs, err := h.resolveHealthPairsForCommand(ctx, cmd) - if err != nil { - return pairs, err - } - pairs = append(pairs, cmdPairs...) - } - return pairs, nil -} - -func (h *Heimdall) resolveHealthPairsForCommand(ctx context.Context, cmd *command.Command) ([]*healthPair, error) { - var pairs []*healthPair - if cmd == nil { - return pairs, nil - } - if !cmd.HealthCheck { - return pairs, nil - } - // check DB for command status to avoid unnecessary health checks for inactive commands - dbCmd, err := h.getCommandStatus(ctx, cmd) - if err != nil { - return pairs, err - } - if dbCmd.(*command.Command).Status != status.Active { - return pairs, nil - } +func (h *Heimdall) resolveClusterProbes() []*clusterProbe { + var probes []*clusterProbe for _, cl := range h.Clusters { - if cl.Status != status.Active { + if cl.Status != status.Active || !cl.HealthCheck { continue } - if cl.Tags.Contains(cmd.ClusterTags) { - pairs = append(pairs, &healthPair{cmd, cl, h.commandHandlers[cmd.ID]}) + for _, cmd := range h.Commands { + if cmd.Status != status.Active { + continue + } + if cl.Tags.Contains(cmd.ClusterTags) { + probes = append(probes, &clusterProbe{ + cluster: cl, + handler: h.commandHandlers[cmd.ID], + pluginName: cmd.Plugin, + }) + break + } } } - return pairs, nil + return probes } -func (h *Heimdall) runHealthChecks(ctx context.Context, pairs []*healthPair) []healthCheckResult { - results := make([]healthCheckResult, len(pairs)) +func (h *Heimdall) runHealthChecks(ctx context.Context, probes []*clusterProbe) []healthCheckResult { + results := make([]healthCheckResult, len(probes)) sem := make(chan struct{}, healthCheckConcurrency) var wg sync.WaitGroup - for i, pair := range pairs { + for i, probe := range probes { wg.Add(1) - go func(i int, pair *healthPair) { + go func(i int, probe *clusterProbe) { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() - results[i] = h.checkPair(ctx, pair) - }(i, pair) + results[i] = h.checkCluster(ctx, probe) + }(i, probe) } wg.Wait() return results } -func (h *Heimdall) checkPair(ctx context.Context, pair *healthPair) healthCheckResult { +func (h *Heimdall) checkCluster(ctx context.Context, probe *clusterProbe) healthCheckResult { start := time.Now() - res := healthCheckResult{CommandID: pair.cmd.ID, ClusterID: pair.cluster.ID} + res := healthCheckResult{ + ClusterID: probe.cluster.ID, + ClusterName: probe.cluster.Name, + Plugin: probe.pluginName, + } - var err error - if hc, ok := pair.handler.(plugin.HealthChecker); ok { - err = hc.HealthCheck(ctx, pair.cluster) - } else { - err = h.pluginProbe(ctx, pair.cluster, pair.handler) + hc, ok := probe.handler.(plugin.HealthChecker) + if !ok { + res.Status = healthStatusUnchecked + res.LatencyMs = time.Since(start).Milliseconds() + return res } + err := hc.HealthCheck(ctx, probe.cluster) res.LatencyMs = time.Since(start).Milliseconds() if err != nil { res.Status = healthStatusError @@ -181,30 +132,3 @@ func (h *Heimdall) checkPair(ctx context.Context, pair *healthPair) healthCheckR } return res } - -func (h *Heimdall) pluginProbe(ctx context.Context, cl *cluster.Cluster, handler plugin.Handler) error { - j := &job.Job{} - j.ID = uuid.NewString() - j.User = healthCheckUser - - tmpDir, err := os.MkdirTemp("", "heimdall-health-*") - if err != nil { - return err - } - defer os.RemoveAll(tmpDir) - - runtime := &plugin.Runtime{ - WorkingDirectory: tmpDir, - ResultDirectory: tmpDir + separator + "result", - Version: h.Version, - UserAgent: fmt.Sprintf(formatUserAgent, h.Version), - } - - if err := runtime.Set(); err != nil { - return err - } - defer runtime.Stdout.Close() - defer runtime.Stderr.Close() - - return handler.Execute(ctx, runtime, j, cl) -} diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index e158960d..889c1e19 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -182,8 +182,6 @@ func (h *Heimdall) Start() error { apiRouter.Methods(methodGET).PathPrefix(`/command/statuses`).HandlerFunc(payloadHandler(h.getCommandStatuses)) apiRouter.Methods(methodGET).PathPrefix(`/command/{id}/status`).HandlerFunc(payloadHandler(h.getCommandStatus)) apiRouter.Methods(methodPUT).PathPrefix(`/command/{id}/status`).HandlerFunc(payloadHandler(h.updateCommandStatus)) - apiRouter.Methods(methodGET).PathPrefix(`/command/health`).HandlerFunc(h.healthHandler) - apiRouter.Methods(methodGET).PathPrefix(`/command/{id}/health`).HandlerFunc(h.commandHealthHandler) apiRouter.Methods(methodPUT).PathPrefix(`/command/{id}`).HandlerFunc(payloadHandler(h.submitCommand)) apiRouter.Methods(methodGET).PathPrefix(`/command/{id}`).HandlerFunc(payloadHandler(h.getCommand)) apiRouter.Methods(methodGET).PathPrefix(`/commands`).HandlerFunc(payloadHandler(h.getCommands)) @@ -193,6 +191,7 @@ func (h *Heimdall) Start() error { apiRouter.Methods(methodPUT).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.submitCluster)) apiRouter.Methods(methodGET).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.getCluster)) apiRouter.Methods(methodGET).PathPrefix(`/clusters`).HandlerFunc(payloadHandler(h.getClusters)) + apiRouter.Methods(methodGET).PathPrefix(`/health`).HandlerFunc(h.healthHandler) // metrics endpoint - proxy to metrics service router.Path(`/metrics`).HandlerFunc(metricsRouteHandler) diff --git a/pkg/object/cluster/cluster.go b/pkg/object/cluster/cluster.go index f4d7aec1..a31fc647 100644 --- a/pkg/object/cluster/cluster.go +++ b/pkg/object/cluster/cluster.go @@ -15,6 +15,7 @@ var ( type Cluster struct { object.Object `yaml:",inline" json:",inline"` Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` + HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"` RBACNames []string `yaml:"rbacs,omitempty" json:"rbacs,omitempty"` RBACs []rbac.RBAC `yaml:"-" json:"-"` } diff --git a/pkg/object/command/command.go b/pkg/object/command/command.go index aef7de8a..d31fadd3 100644 --- a/pkg/object/command/command.go +++ b/pkg/object/command/command.go @@ -20,7 +20,6 @@ type Command struct { Plugin string `yaml:"plugin,omitempty" json:"plugin,omitempty"` IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"` ClusterTags *set.Set[string] `yaml:"cluster_tags,omitempty" json:"cluster_tags,omitempty"` - HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"` Handler plugin.Handler `yaml:"-" json:"-"` } From 816f69cff780b10261c80e555d7db3668e44951d Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Tue, 28 Apr 2026 12:25:24 +0530 Subject: [PATCH 08/16] add ping-shell healthcheck --- configs/local.yaml | 2 +- internal/pkg/object/command/ping/ping.go | 5 +++++ internal/pkg/object/command/shell/shell.go | 5 +++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/configs/local.yaml b/configs/local.yaml index f2a49bea..5d0123ab 100644 --- a/configs/local.yaml +++ b/configs/local.yaml @@ -28,7 +28,6 @@ commands: version: 0.0.1 store_result_sync: false description: Test ping command - health_check: true tags: - type:ping cluster_tags: @@ -40,6 +39,7 @@ clusters: status: active version: 0.0.1 description: Just a localhost + health_check: true tags: - type:localhost - data:local \ No newline at end of file diff --git a/internal/pkg/object/command/ping/ping.go b/internal/pkg/object/command/ping/ping.go index dd7efcc0..3dcd8b00 100644 --- a/internal/pkg/object/command/ping/ping.go +++ b/internal/pkg/object/command/ping/ping.go @@ -32,6 +32,11 @@ func (p *commandContext) Execute(ctx context.Context, _ *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (p *commandContext) HealthCheck(_ context.Context, _ *cluster.Cluster) error { + return nil +} + // Cleanup implements the plugin.Handler interface func (p *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed diff --git a/internal/pkg/object/command/shell/shell.go b/internal/pkg/object/command/shell/shell.go index 603fca8c..e6f19ff6 100644 --- a/internal/pkg/object/command/shell/shell.go +++ b/internal/pkg/object/command/shell/shell.go @@ -110,6 +110,11 @@ func (s *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (s *commandContext) HealthCheck(_ context.Context, _ *cluster.Cluster) error { + return nil +} + func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // Implement cleanup if needed return nil From 1f5f93af6c00b10e23364827b19c0d92178abd50 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Tue, 28 Apr 2026 12:44:55 +0530 Subject: [PATCH 09/16] add for other plugins --- .../object/command/clickhouse/clickhouse.go | 25 +++++++++++ internal/pkg/object/command/dynamo/dynamo.go | 39 ++++++++++++++++++ internal/pkg/object/command/ecs/ecs.go | 29 +++++++++++++ internal/pkg/object/command/glue/glue.go | 16 ++++++++ .../pkg/object/command/snowflake/snowflake.go | 41 +++++++++++++++++++ internal/pkg/object/command/spark/spark.go | 41 +++++++++++++++++++ .../pkg/object/command/sparkeks/sparkeks.go | 26 ++++++++++++ internal/pkg/object/command/trino/trino.go | 28 +++++++++++++ 8 files changed, 245 insertions(+) diff --git a/internal/pkg/object/command/clickhouse/clickhouse.go b/internal/pkg/object/command/clickhouse/clickhouse.go index 1fe327e8..cf44f3bd 100644 --- a/internal/pkg/object/command/clickhouse/clickhouse.go +++ b/internal/pkg/object/command/clickhouse/clickhouse.go @@ -184,6 +184,31 @@ func collectResults(rows driver.Rows) (*result.Result, error) { return out, nil } +// HealthCheck implements the plugin.HealthChecker interface +func (cmd *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: clusterCtx.Endpoints, + Auth: clickhouse.Auth{ + Database: clusterCtx.Database, + Username: cmd.Username, + Password: cmd.Password, + }, + }) + if err != nil { + return err + } + defer conn.Close() + + return conn.Ping(ctx) +} + func (cmd *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // No cleanup needed. CLickhouse queries should always be synchronous. return nil diff --git a/internal/pkg/object/command/dynamo/dynamo.go b/internal/pkg/object/command/dynamo/dynamo.go index f8c640ca..b2619d30 100644 --- a/internal/pkg/object/command/dynamo/dynamo.go +++ b/internal/pkg/object/command/dynamo/dynamo.go @@ -143,6 +143,45 @@ func (d *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (d *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + awsConfig, err := config.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + assumeRoleOptions := func(_ *dynamodb.Options) {} + if clusterCtx.RoleARN != nil { + stsSvc := sts.NewFromConfig(awsConfig) + out, err := stsSvc.AssumeRole(ctx, &sts.AssumeRoleInput{ + RoleArn: clusterCtx.RoleARN, + RoleSessionName: assumeRoleSession, + }) + if err != nil { + return err + } + assumeRoleOptions = func(o *dynamodb.Options) { + o.Credentials = credentials.NewStaticCredentialsProvider( + *out.Credentials.AccessKeyId, + *out.Credentials.SecretAccessKey, + *out.Credentials.SessionToken, + ) + } + } + + svc := dynamodb.NewFromConfig(awsConfig, assumeRoleOptions) + maxResults := int32(1) + _, err = svc.ListTables(ctx, &dynamodb.ListTablesInput{Limit: &maxResults}) + return err +} + func (d *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed return nil diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index b2750b7c..a1ef4d50 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -852,3 +852,32 @@ func isThrottlingError(err error) bool { } return false } + +// HealthCheck implements the plugin.HealthChecker interface +func (e *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + ecsClient := ecs.NewFromConfig(cfg) + out, err := ecsClient.DescribeClusters(ctx, &ecs.DescribeClustersInput{ + Clusters: []string{clusterCtx.ClusterName}, + }) + if err != nil { + return err + } + + if len(out.Clusters) == 0 { + return fmt.Errorf("ECS cluster %q not found", clusterCtx.ClusterName) + } + + return nil +} diff --git a/internal/pkg/object/command/glue/glue.go b/internal/pkg/object/command/glue/glue.go index de150531..81a10b5d 100644 --- a/internal/pkg/object/command/glue/glue.go +++ b/internal/pkg/object/command/glue/glue.go @@ -3,6 +3,9 @@ package glue import ( "context" + awssdk "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/glue" "github.com/patterninc/heimdall/internal/pkg/aws" heimdallContext "github.com/patterninc/heimdall/pkg/context" "github.com/patterninc/heimdall/pkg/object/cluster" @@ -56,6 +59,19 @@ func (g *commandContext) Execute(ctx context.Context, _ *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (g *commandContext) HealthCheck(ctx context.Context, _ *cluster.Cluster) error { + cfg, err := awsconfig.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + glueClient := glue.NewFromConfig(cfg) + maxResults := awssdk.Int32(1) + _, err = glueClient.GetDatabases(ctx, &glue.GetDatabasesInput{MaxResults: maxResults}) + return err +} + // Cleanup implements the plugin.Handler interface func (g *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed diff --git a/internal/pkg/object/command/snowflake/snowflake.go b/internal/pkg/object/command/snowflake/snowflake.go index 19d87cc0..8d59c760 100644 --- a/internal/pkg/object/command/snowflake/snowflake.go +++ b/internal/pkg/object/command/snowflake/snowflake.go @@ -146,6 +146,47 @@ func (s *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + privateKeyBytes, err := os.ReadFile(clusterCtx.PrivateKey) + if err != nil { + return err + } + + privateKey, err := parsePrivateKey(privateKeyBytes) + if err != nil { + return err + } + + dsn, err := sf.DSN(&sf.Config{ + Account: clusterCtx.Account, + User: clusterCtx.User, + Database: clusterCtx.Database, + Warehouse: clusterCtx.Warehouse, + Role: s.Role, + Authenticator: sf.AuthTypeJwt, + PrivateKey: privateKey, + }) + if err != nil { + return err + } + + db, err := sql.Open(snowflakeDriverName, dsn) + if err != nil { + return err + } + defer db.Close() + + return db.PingContext(ctx) +} + func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed return nil diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index 9f8faae5..3de19ef0 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -261,6 +261,47 @@ timeoutLoop: } +// HealthCheck implements the plugin.HealthChecker interface +func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + awsConfig, err := config.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + assumeRoleOptions := func(_ *emrcontainers.Options) {} + if clusterCtx.RoleARN != nil { + stsSvc := sts.NewFromConfig(awsConfig) + out, err := stsSvc.AssumeRole(ctx, &sts.AssumeRoleInput{ + RoleArn: clusterCtx.RoleARN, + RoleSessionName: assumeRoleSession, + }) + if err != nil { + return err + } + assumeRoleOptions = func(o *emrcontainers.Options) { + o.Credentials = credentials.NewStaticCredentialsProvider( + *out.Credentials.AccessKeyId, + *out.Credentials.SecretAccessKey, + *out.Credentials.SessionToken, + ) + } + } + + emrClient := emrcontainers.NewFromConfig(awsConfig, assumeRoleOptions) + maxResults := int32(1) + _, err = emrClient.ListVirtualClusters(ctx, &emrcontainers.ListVirtualClustersInput{ + MaxResults: &maxResults, + }) + return err +} + func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed return nil diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index 658f0cbd..ec208acc 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -192,6 +192,32 @@ func (s *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. return nil } +// HealthCheck implements the plugin.HealthChecker interface +func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c != nil && c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + if clusterCtx.RoleARN == nil { + return nil + } + + awsCfg, err := awsconfig.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + stsSvc := sts.NewFromConfig(awsCfg) + _, err = stsSvc.AssumeRole(ctx, &sts.AssumeRoleInput{ + RoleArn: clusterCtx.RoleARN, + RoleSessionName: aws.String("heimdall-health"), + }) + return err +} + func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // get app name and namespace from job id and command context diff --git a/internal/pkg/object/command/trino/trino.go b/internal/pkg/object/command/trino/trino.go index 88a11b6b..45b24971 100644 --- a/internal/pkg/object/command/trino/trino.go +++ b/internal/pkg/object/command/trino/trino.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "net/http" "time" "github.com/hladush/go-telemetry/pkg/telemetry" @@ -95,6 +96,33 @@ func (t *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (t *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, clusterCtx.Endpoint+"/v1/info", nil) + if err != nil { + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("trino /v1/info returned status %d", resp.StatusCode) + } + + return nil +} + func (t *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed return nil From 561263079e9060e045c156477fd757db4d25ff46 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Thu, 28 May 2026 16:26:10 +0530 Subject: [PATCH 10/16] chore: add per cluster health-check --- go.mod | 10 +++--- go.sum | 12 +++++++ internal/pkg/heimdall/health.go | 52 +++++++++++++++++++++++++++++-- internal/pkg/heimdall/heimdall.go | 3 +- 4 files changed, 69 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index d139202c..f7a2c254 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24.6 require ( github.com/ClickHouse/clickhouse-go/v2 v2.40.3 github.com/antlr4-go/antlr/v4 v4.13.1 - github.com/aws/aws-sdk-go-v2 v1.39.6 + github.com/aws/aws-sdk-go-v2 v1.41.7 github.com/aws/aws-sdk-go-v2/config v1.30.3 github.com/aws/aws-sdk-go-v2/credentials v1.18.3 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.3 @@ -43,13 +43,15 @@ require ( github.com/andybalholm/brotli v1.2.0 // indirect github.com/apache/arrow-go/v18 v18.4.0 // indirect github.com/apache/thrift v0.22.0 // indirect + github.com/aws/aws-lambda-go v1.54.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2 // indirect github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 // indirect + github.com/aws/aws-sdk-go-v2/service/codedeploy v1.35.15 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.2 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.2 // indirect @@ -57,7 +59,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.2 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.27.0 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.32.0 // indirect - github.com/aws/smithy-go v1.23.2 // indirect + github.com/aws/smithy-go v1.25.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/danieljoos/wincred v1.2.2 // indirect diff --git a/go.sum b/go.sum index 3db4b588..99e6c574 100644 --- a/go.sum +++ b/go.sum @@ -28,10 +28,14 @@ github.com/apache/arrow-go/v18 v18.4.0 h1:/RvkGqH517iY8bZKc4FD5/kkdwXJGjxf28JIXb github.com/apache/arrow-go/v18 v18.4.0/go.mod h1:Aawvwhj8x2jURIzD9Moy72cF0FyJXOpkYpdmGRHcw14= github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g= +github.com/aws/aws-lambda-go v1.54.0 h1:EGYpdyRGF88xszqlGcBewz811mJeRS+maNlLZXFheII= +github.com/aws/aws-lambda-go v1.54.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= github.com/aws/aws-sdk-go-v2 v1.38.0 h1:UCRQ5mlqcFk9HJDIqENSLR3wiG1VTWlyUfLDEvY7RxU= github.com/aws/aws-sdk-go-v2 v1.38.0/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= +github.com/aws/aws-sdk-go-v2 v1.41.7 h1:DWpAJt66FmnnaRIOT/8ASTucrvuDPZASqhhLey6tLY8= +github.com/aws/aws-sdk-go-v2 v1.41.7/go.mod h1:4LAfZOPHNVNQEckOACQx60Y8pSRjIkNZQz1w92xpMJc= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0/go.mod h1:/mXlTIVG9jbxkqDnr5UQNQxW1HRYxeGklkM9vAFeabg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= @@ -48,16 +52,22 @@ github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 h1:o9RnO+YZ4X+kt5Z7Nv github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3/go.mod h1:+6aLJzOG1fvMOyzIySYjOFjcguGvVRL68R+uoRencN4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 h1:GpT/TrnBYuE5gan2cZbTtvP+JlHsutdmlV2YfEyNde0= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23/go.mod h1:xYWD6BS9ywC5bS3sz9Xh04whO/hzK2plt2Zkyrp4JuA= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 h1:joyyUFhiTQQmVK6ImzNU9TQSNRNeD9kOklqTzyk5v6s= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3/go.mod h1:+vNIyZQP3b3B1tSLI0lxvrU9cfM7gpdRXMFfm67ZcPc= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 h1:bpd8vxhlQi2r1hiueOw02f/duEPTMK59Q4QMAoTTtTo= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23/go.mod h1:15DfR2nw+CRHIk0tqNyifu3G1YdAOy68RftkhMDDwYk= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2 h1:sBpc8Ph6CpfZsEdkz/8bfg8WhKlWMCms5iWj6W/AW2U= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2/go.mod h1:Z2lDojZB+92Wo6EKiZZmJid9pPrDJW2NNIXSlaEfVlU= github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 h1:Yj4NvoEEdSxA90x/uCBskzeF3OxZr72Yaf64n0fIVR4= github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7/go.mod h1:9/Q0/HtqBTLMksFse42wZjUq0jJrUuo4XlnXy/uSoeg= +github.com/aws/aws-sdk-go-v2/service/codedeploy v1.35.15 h1:iFcroRD6TUmNLCoAMdPJGV28h1K4yULxNludMiA/LwA= +github.com/aws/aws-sdk-go-v2/service/codedeploy v1.35.15/go.mod h1:r8xU4gaQ0oQNlfptj6Lzyv2Ityc2o0Sp3Gl3bX8KVE0= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0 h1:b7F96mjkzsqymMSGhuCqBQTZFx3mhTMa6IoG6SoVvC8= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0/go.mod h1:F8Rqs4FVGBTUzx3wbFm7HB/mgIA4Tc6/x0yQmjoB+/w= github.com/aws/aws-sdk-go-v2/service/ecs v1.62.0 h1:E5/BzpoN6fc/xWtKiFPUJBW6nW3KFINCz6so7v/fQ8E= @@ -88,6 +98,8 @@ github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM= github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= +github.com/aws/smithy-go v1.25.1 h1:J8ERsGSU7d+aCmdQur5Txg6bVoYelvQJgtZehD12GkI= +github.com/aws/smithy-go v1.25.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/babourine/x v1.0.4 h1:74LI/hDs87c3cQe5MAIFrtrRwhrOK1ayIZIMOEps4QU= github.com/babourine/x v1.0.4/go.mod h1:GWvPKlJfBjuVP5+fZ6L0aQ0ELxNmHsldrfokYagtq8Q= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index e23a5551..bb6208bb 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -3,6 +3,7 @@ package heimdall import ( "context" "encoding/json" + "fmt" "net/http" "sync" "time" @@ -40,7 +41,13 @@ type healthChecksResponse struct { Checks []healthCheckResult `json:"checks"` } -func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { +type clusterHealthRequest struct { + ID string `json:"id"` +} + +// getClustersHealthz handles GET /clusters/healthz +// Returns 200 when all probes pass, 503 when any fail. +func (h *Heimdall) getClustersHealthz(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) defer cancel() @@ -55,9 +62,8 @@ func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { } } - resp := healthChecksResponse{Healthy: healthy, Checks: results} + resp := &healthChecksResponse{Healthy: healthy, Checks: results} data, _ := json.Marshal(resp) - w.Header().Set(contentTypeKey, contentTypeJSON) if healthy { w.WriteHeader(http.StatusOK) @@ -67,6 +73,46 @@ func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { w.Write(data) } +// getClusterHealth handles GET /cluster/{id}/health — runs health check for one cluster. +func (h *Heimdall) getClusterHealth(ctx context.Context, req *clusterHealthRequest) (any, error) { + ctx, cancel := context.WithTimeout(ctx, healthCheckTimeout) + defer cancel() + + cl, found := h.Clusters[req.ID] + if !found { + return nil, fmt.Errorf("%w: %s", ErrUnknownClusterID, req.ID) + } + + probe := h.resolveProbeForCluster(cl) + if probe == nil { + return &healthChecksResponse{Healthy: true, Checks: []healthCheckResult{}}, nil + } + + result := h.checkCluster(ctx, probe) + healthy := result.Status != healthStatusError + + return &healthChecksResponse{Healthy: healthy, Checks: []healthCheckResult{result}}, nil +} + +func (h *Heimdall) resolveProbeForCluster(cl *cluster.Cluster) *clusterProbe { + if cl.Status != status.Active { + return nil + } + for _, cmd := range h.Commands { + if cmd.Status != status.Active { + continue + } + if cl.Tags.Contains(cmd.ClusterTags) { + return &clusterProbe{ + cluster: cl, + handler: h.commandHandlers[cmd.ID], + pluginName: cmd.Plugin, + } + } + } + return nil +} + func (h *Heimdall) resolveClusterProbes() []*clusterProbe { var probes []*clusterProbe for _, cl := range h.Clusters { diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index 889c1e19..7d5a4295 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -191,7 +191,8 @@ func (h *Heimdall) Start() error { apiRouter.Methods(methodPUT).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.submitCluster)) apiRouter.Methods(methodGET).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.getCluster)) apiRouter.Methods(methodGET).PathPrefix(`/clusters`).HandlerFunc(payloadHandler(h.getClusters)) - apiRouter.Methods(methodGET).PathPrefix(`/health`).HandlerFunc(h.healthHandler) + apiRouter.Methods(methodGET).PathPrefix(`/clusters/healthz`).HandlerFunc(h.getClustersHealthz) + apiRouter.Methods(methodGET).PathPrefix(`/cluster/{id}/health`).HandlerFunc(payloadHandler(h.getClusterHealth)) // metrics endpoint - proxy to metrics service router.Path(`/metrics`).HandlerFunc(metricsRouteHandler) From 48d2056aed27e0d8126b0d3c8170c07dc9de9ce9 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Thu, 11 Jun 2026 15:40:13 +0530 Subject: [PATCH 11/16] enforce check --- internal/pkg/heimdall/health.go | 6 +++--- internal/pkg/heimdall/heimdall.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index bb6208bb..258846fd 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -45,9 +45,9 @@ type clusterHealthRequest struct { ID string `json:"id"` } -// getClustersHealthz handles GET /clusters/healthz +// getClustersHealth handles GET /clusters/health — runs health checks for all clusters and returns aggregated result. // Returns 200 when all probes pass, 503 when any fail. -func (h *Heimdall) getClustersHealthz(w http.ResponseWriter, r *http.Request) { +func (h *Heimdall) getClustersHealth(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) defer cancel() @@ -95,7 +95,7 @@ func (h *Heimdall) getClusterHealth(ctx context.Context, req *clusterHealthReque } func (h *Heimdall) resolveProbeForCluster(cl *cluster.Cluster) *clusterProbe { - if cl.Status != status.Active { + if cl.Status != status.Active || !cl.HealthCheck { return nil } for _, cmd := range h.Commands { diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index 7d5a4295..51e3e753 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -186,13 +186,13 @@ func (h *Heimdall) Start() error { apiRouter.Methods(methodGET).PathPrefix(`/command/{id}`).HandlerFunc(payloadHandler(h.getCommand)) apiRouter.Methods(methodGET).PathPrefix(`/commands`).HandlerFunc(payloadHandler(h.getCommands)) apiRouter.Methods(methodGET).PathPrefix(`/cluster/statuses`).HandlerFunc(payloadHandler(h.getClusterStatuses)) + apiRouter.Methods(methodGET).PathPrefix(`/clusters/health`).HandlerFunc(h.getClustersHealth) + apiRouter.Methods(methodGET).PathPrefix(`/cluster/{id}/health`).HandlerFunc(payloadHandler(h.getClusterHealth)) apiRouter.Methods(methodGET).PathPrefix(`/cluster/{id}/status`).HandlerFunc(payloadHandler(h.getClusterStatus)) apiRouter.Methods(methodPUT).PathPrefix(`/cluster/{id}/status`).HandlerFunc(payloadHandler(h.updateClusterStatus)) apiRouter.Methods(methodPUT).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.submitCluster)) apiRouter.Methods(methodGET).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.getCluster)) apiRouter.Methods(methodGET).PathPrefix(`/clusters`).HandlerFunc(payloadHandler(h.getClusters)) - apiRouter.Methods(methodGET).PathPrefix(`/clusters/healthz`).HandlerFunc(h.getClustersHealthz) - apiRouter.Methods(methodGET).PathPrefix(`/cluster/{id}/health`).HandlerFunc(payloadHandler(h.getClusterHealth)) // metrics endpoint - proxy to metrics service router.Path(`/metrics`).HandlerFunc(metricsRouteHandler) From 0a34779f3062ae2431e470226e4ab51d5b491a15 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Thu, 11 Jun 2026 15:46:57 +0530 Subject: [PATCH 12/16] make timeout configurable --- configs/local.yaml | 3 +++ internal/pkg/heimdall/health.go | 15 +++++++++++++-- internal/pkg/heimdall/heimdall.go | 1 + 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/configs/local.yaml b/configs/local.yaml index 5d0123ab..e5042686 100644 --- a/configs/local.yaml +++ b/configs/local.yaml @@ -14,6 +14,9 @@ plugin_directory: ./plugins janitor: finished_job_retention_days: 14 +health_check: + timeout_seconds: 300 + # auth plugin auth: plugin: ./plugins/auth_header.so diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index 258846fd..a9ca016a 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -21,6 +21,10 @@ const ( healthStatusUnchecked = `unchecked` ) +type healthCheckConfig struct { + TimeoutSeconds int `yaml:"timeout_seconds,omitempty" json:"timeout_seconds,omitempty"` +} + type clusterProbe struct { cluster *cluster.Cluster handler plugin.Handler @@ -48,7 +52,7 @@ type clusterHealthRequest struct { // getClustersHealth handles GET /clusters/health — runs health checks for all clusters and returns aggregated result. // Returns 200 when all probes pass, 503 when any fail. func (h *Heimdall) getClustersHealth(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) + ctx, cancel := context.WithTimeout(r.Context(), h.HealthCheck.timeout()) defer cancel() probes := h.resolveClusterProbes() @@ -75,7 +79,7 @@ func (h *Heimdall) getClustersHealth(w http.ResponseWriter, r *http.Request) { // getClusterHealth handles GET /cluster/{id}/health — runs health check for one cluster. func (h *Heimdall) getClusterHealth(ctx context.Context, req *clusterHealthRequest) (any, error) { - ctx, cancel := context.WithTimeout(ctx, healthCheckTimeout) + ctx, cancel := context.WithTimeout(ctx, h.HealthCheck.timeout()) defer cancel() cl, found := h.Clusters[req.ID] @@ -178,3 +182,10 @@ func (h *Heimdall) checkCluster(ctx context.Context, probe *clusterProbe) health } return res } + +func (c *healthCheckConfig) timeout() time.Duration { + if c == nil || c.TimeoutSeconds <= 0 { + return healthCheckTimeout + } + return time.Duration(c.TimeoutSeconds) * time.Second +} diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index 51e3e753..5f2da7dd 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -53,6 +53,7 @@ type Heimdall struct { Pool *pool.Pool[*job.Job] `yaml:"pool,omitempty" json:"pool,omitempty"` Auth *auth.Auth `yaml:"auth,omitempty" json:"auth,omitempty"` Janitor *janitor.Janitor `yaml:"janitor,omitempty" json:"janitor,omitempty"` + HealthCheck *healthCheckConfig `yaml:"health_check,omitempty" json:"health_check,omitempty"` Version string `yaml:"-" json:"-"` agentName string commandHandlers map[string]plugin.Handler From 38ea50762247ff9274c21fe7aa55a6b19b1e37b3 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Thu, 11 Jun 2026 16:27:21 +0530 Subject: [PATCH 13/16] update docs --- README.md | 20 ++++++++++++++++++ internal/pkg/heimdall/health.go | 6 +++++- internal/pkg/object/command/ecs/ecs.go | 15 ++++++++++++- .../pkg/object/command/postgres/postgres.go | 21 +++++++++++++++++++ .../pkg/object/command/sparkeks/sparkeks.go | 2 +- internal/pkg/object/command/trino/trino.go | 6 +++++- 6 files changed, 66 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 9548feef..d6b981b2 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Originally inspired by [Netflix Genie](https://github.com/Netflix/genie), Heimda * 🔐 **Secure orchestration without credential leakage** * 🧠 **Dynamic routing based on command / cluster criteria** * 📦 **Configurable or self-registering clusters** +* 🩺 **Cluster health checks** with per-plugin probes and configurable timeout --- @@ -96,9 +97,11 @@ Heimdall supports a growing set of pluggable command types: | `dynamo` | [DynamoDB read operation](https://github.com/patterninc/heimdall/blob/main/plugins/dynamo/README.md) | Sync or Async | | `snowflake` | [Query execution in Snowflake](https://github.com/patterninc/heimdall/blob/main/plugins/snowflake/README.md) | Async | | `spark` | [SparkSQL query execution on EMR on EKS](https://github.com/patterninc/heimdall/blob/main/plugins/spark/README.md) | Async | +| `sparkeks` | [SparkSQL query execution on EKS](https://github.com/patterninc/heimdall/blob/main/plugins/sparkeks/README.md) | Async | | `trino` | [Query execution in Trino](https://github.com/patterninc/heimdall/blob/main/plugins/trino/README.md) | Async | | `clickhouse` | [Query execution in Clickhouse](https://github.com/patterninc/heimdall/blob/main/plugins/clickhouse/README.md) | Sync | | `ecs fargate` | [Task Deployment in ECS Fargate](https://github.com/patterninc/heimdall/blob/main/plugins/ecs/README.md) | Async | +| `postgres` | [PostgreSQL query execution](https://github.com/patterninc/heimdall/blob/main/plugins/postgres/README.md) | Sync or Async | --- @@ -181,8 +184,25 @@ Commands may also restrict invocation via `allowed_callers` — a list of anchor | `GET /api/v1/cluster//status` | Check cluster status | | `PUT /api/v1/cluster//status` | Set cluster status | | `GET /api/v1/clusters` | List configured clusters | +| `GET /api/v1/clusters/health` | Health check all opted-in clusters | +| `GET /api/v1/cluster//health` | Health check a single cluster | +--- + +## 🩺 Cluster Health Checks + +Opt any cluster into health probing by setting `health_check: true` in its config. Each plugin performs a lightweight connectivity check against its backend (see the Health Check column in the plugins table above). + +Configure the global probe timeout (default 30s): + +```yaml +health_check: + timeout_seconds: 10 +``` + +`GET /api/v1/clusters/health` returns 200 if all probes pass, 503 if any fail. `status` per check: `ok`, `error`, or `unchecked`. Failing or misconfigured health checks have no effect on job execution or startup. + --- ## 👥 Credits diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index a9ca016a..19f11b06 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -87,9 +87,13 @@ func (h *Heimdall) getClusterHealth(ctx context.Context, req *clusterHealthReque return nil, fmt.Errorf("%w: %s", ErrUnknownClusterID, req.ID) } + if !cl.HealthCheck { + return nil, fmt.Errorf("health check is not enabled for cluster %s", req.ID) + } + probe := h.resolveProbeForCluster(cl) if probe == nil { - return &healthChecksResponse{Healthy: true, Checks: []healthCheckResult{}}, nil + return nil, fmt.Errorf("no active command matched cluster %s", req.ID) } result := h.checkCluster(ctx, probe) diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index 2ac7f5fb..685a7810 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -913,8 +913,21 @@ func (e *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return err } + clusterName := clusterCtx.ClusterName + if clusterName == "" { + clusterName = "default" + } + if len(out.Clusters) == 0 { - return fmt.Errorf("ECS cluster %q not found", clusterCtx.ClusterName) + return fmt.Errorf("ECS cluster %q not found", clusterName) + } + + if out.Clusters[0].Status == nil || *out.Clusters[0].Status != "ACTIVE" { + status := "" + if out.Clusters[0].Status != nil { + status = *out.Clusters[0].Status + } + return fmt.Errorf("ECS cluster %q is not active: %s", clusterName, status) } return nil diff --git a/internal/pkg/object/command/postgres/postgres.go b/internal/pkg/object/command/postgres/postgres.go index 034a13c5..2d6af278 100644 --- a/internal/pkg/object/command/postgres/postgres.go +++ b/internal/pkg/object/command/postgres/postgres.go @@ -156,6 +156,27 @@ func splitAndTrimQueries(query string) []string { return queries } +func (p *postgresCommandContext) HealthCheck(_ context.Context, c *cluster.Cluster) error { + clusterContext, err := validateClusterContext(c) + if err != nil { + return err + } + + db := &database.Database{ConnectionString: clusterContext.ConnectionString} + sess, err := db.NewSession(false) + if err != nil { + return err + } + defer sess.Close() + + rows, err := sess.Query("SELECT 1") + if err != nil { + return err + } + rows.Close() + return nil +} + func (p *postgresCommandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // implement me return nil diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index ec208acc..0bd957a0 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -195,7 +195,7 @@ func (s *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. // HealthCheck implements the plugin.HealthChecker interface func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { clusterCtx := &clusterContext{} - if c != nil && c.Context != nil { + if c.Context != nil { if err := c.Context.Unmarshal(clusterCtx); err != nil { return err } diff --git a/internal/pkg/object/command/trino/trino.go b/internal/pkg/object/command/trino/trino.go index 45b24971..236095fc 100644 --- a/internal/pkg/object/command/trino/trino.go +++ b/internal/pkg/object/command/trino/trino.go @@ -3,6 +3,7 @@ package trino import ( "context" "fmt" + "io" "log" "net/http" "time" @@ -114,7 +115,10 @@ func (t *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er if err != nil { return err } - defer resp.Body.Close() + defer func() { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() if resp.StatusCode != http.StatusOK { return fmt.Errorf("trino /v1/info returned status %d", resp.StatusCode) From a333d047c8cdc28f41445fe45bf8b0d2e1d3335f Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Fri, 12 Jun 2026 15:53:05 +0530 Subject: [PATCH 14/16] update health endpoints --- internal/pkg/heimdall/health.go | 6 ++- internal/pkg/object/command/ecs/ecs.go | 8 +++ .../pkg/object/command/snowflake/snowflake.go | 43 +++++++++++++++- internal/pkg/object/command/spark/spark.go | 11 +++- .../pkg/object/command/sparkeks/sparkeks.go | 50 ++++++++++++++++--- internal/pkg/object/command/trino/trino.go | 34 +++++++++++++ 6 files changed, 142 insertions(+), 10 deletions(-) diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go index 19f11b06..7a8910b8 100644 --- a/internal/pkg/heimdall/health.go +++ b/internal/pkg/heimdall/health.go @@ -67,7 +67,11 @@ func (h *Heimdall) getClustersHealth(w http.ResponseWriter, r *http.Request) { } resp := &healthChecksResponse{Healthy: healthy, Checks: results} - data, _ := json.Marshal(resp) + data, err := json.Marshal(resp) + if err != nil { + writeAPIError(w, err, nil) + return + } w.Header().Set(contentTypeKey, contentTypeJSON) if healthy { w.WriteHeader(http.StatusOK) diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index 685a7810..b1aeb575 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -930,5 +930,13 @@ func (e *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return fmt.Errorf("ECS cluster %q is not active: %s", clusterName, status) } + if clusterCtx.MaxTaskCount > 0 { + running := int(out.Clusters[0].RunningTasksCount) + pending := int(out.Clusters[0].PendingTasksCount) + if running+pending >= clusterCtx.MaxTaskCount { + return fmt.Errorf("ECS cluster %q at capacity: %d running + %d pending >= max %d", clusterName, running, pending, clusterCtx.MaxTaskCount) + } + } + return nil } diff --git a/internal/pkg/object/command/snowflake/snowflake.go b/internal/pkg/object/command/snowflake/snowflake.go index 8d59c760..a41dd9ea 100644 --- a/internal/pkg/object/command/snowflake/snowflake.go +++ b/internal/pkg/object/command/snowflake/snowflake.go @@ -184,7 +184,48 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er } defer db.Close() - return db.PingContext(ctx) + rows, err := db.QueryContext(ctx, fmt.Sprintf("SHOW WAREHOUSES LIKE '%s'", clusterCtx.Warehouse)) + if err != nil { + return fmt.Errorf("snowflake SHOW WAREHOUSES failed: %w", err) + } + defer rows.Close() + + cols, err := rows.Columns() + if err != nil { + return err + } + + // Find the "state" column index + stateIdx := -1 + for i, col := range cols { + if col == "state" { + stateIdx = i + break + } + } + if stateIdx < 0 { + return fmt.Errorf("snowflake SHOW WAREHOUSES: state column not found") + } + + if !rows.Next() { + return fmt.Errorf("snowflake warehouse %q not found", clusterCtx.Warehouse) + } + + vals := make([]any, len(cols)) + ptrs := make([]any, len(cols)) + for i := range vals { + ptrs[i] = &vals[i] + } + if err := rows.Scan(ptrs...); err != nil { + return err + } + + state, _ := vals[stateIdx].(string) + if state != "STARTED" { + return fmt.Errorf("snowflake warehouse %q is not ready: state=%s", clusterCtx.Warehouse, state) + } + + return nil } func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index 3de19ef0..299d0c89 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -296,10 +296,17 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er emrClient := emrcontainers.NewFromConfig(awsConfig, assumeRoleOptions) maxResults := int32(1) - _, err = emrClient.ListVirtualClusters(ctx, &emrcontainers.ListVirtualClustersInput{ + out, err := emrClient.ListVirtualClusters(ctx, &emrcontainers.ListVirtualClustersInput{ MaxResults: &maxResults, + States: []types.VirtualClusterState{types.VirtualClusterStateRunning}, }) - return err + if err != nil { + return err + } + if len(out.VirtualClusters) == 0 { + return fmt.Errorf("no running EMR virtual clusters found") + } + return nil } func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index 0bd957a0..4e44ab03 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -15,7 +15,9 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/eks" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sts" @@ -201,7 +203,7 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er } } - if clusterCtx.RoleARN == nil { + if clusterCtx.KubernetesClusterName == nil { return nil } @@ -210,12 +212,48 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return err } - stsSvc := sts.NewFromConfig(awsCfg) - _, err = stsSvc.AssumeRole(ctx, &sts.AssumeRoleInput{ - RoleArn: clusterCtx.RoleARN, - RoleSessionName: aws.String("heimdall-health"), + eksOptions := []func(*eks.Options){} + if clusterCtx.RoleARN != nil { + stsSvc := sts.NewFromConfig(awsCfg) + out, err := stsSvc.AssumeRole(ctx, &sts.AssumeRoleInput{ + RoleArn: clusterCtx.RoleARN, + RoleSessionName: aws.String("heimdall-health"), + }) + if err != nil { + return err + } + eksOptions = append(eksOptions, func(o *eks.Options) { + o.Credentials = credentials.NewStaticCredentialsProvider( + *out.Credentials.AccessKeyId, + *out.Credentials.SecretAccessKey, + *out.Credentials.SessionToken, + ) + }) + } + + if clusterCtx.Region != nil { + eksOptions = append(eksOptions, func(o *eks.Options) { + o.Region = *clusterCtx.Region + }) + } + + eksClient := eks.NewFromConfig(awsCfg, eksOptions...) + out, err := eksClient.DescribeCluster(ctx, &eks.DescribeClusterInput{ + Name: clusterCtx.KubernetesClusterName, }) - return err + if err != nil { + return err + } + + if out.Cluster == nil || string(out.Cluster.Status) != "ACTIVE" { + status := "" + if out.Cluster != nil { + status = string(out.Cluster.Status) + } + return fmt.Errorf("EKS cluster %q is not active: %s", *clusterCtx.KubernetesClusterName, status) + } + + return nil } func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { diff --git a/internal/pkg/object/command/trino/trino.go b/internal/pkg/object/command/trino/trino.go index 236095fc..8781204f 100644 --- a/internal/pkg/object/command/trino/trino.go +++ b/internal/pkg/object/command/trino/trino.go @@ -2,6 +2,7 @@ package trino import ( "context" + "encoding/json" "fmt" "io" "log" @@ -124,6 +125,39 @@ func (t *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return fmt.Errorf("trino /v1/info returned status %d", resp.StatusCode) } + return t.checkTrinoClusterCapacity(ctx, clusterCtx.Endpoint) +} + +func (t *commandContext) checkTrinoClusterCapacity(ctx context.Context, endpoint string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint+"/v1/cluster", nil) + if err != nil { + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer func() { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("trino /v1/cluster returned status %d", resp.StatusCode) + } + + var info struct { + ActiveWorkers int `json:"activeWorkers"` + } + if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { + return fmt.Errorf("trino /v1/cluster: failed to decode response: %w", err) + } + + if info.ActiveWorkers == 0 { + return fmt.Errorf("trino cluster has no active workers") + } + return nil } From b8cada90b17c6db9366addff7fd1314bf900c4fc Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Fri, 12 Jun 2026 16:36:22 +0530 Subject: [PATCH 15/16] update go mod --- go.mod | 19 ++++++++++--------- go.sum | 28 ++++++++++------------------ 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index f7a2c254..2eedecd0 100644 --- a/go.mod +++ b/go.mod @@ -5,23 +5,29 @@ go 1.24.6 require ( github.com/ClickHouse/clickhouse-go/v2 v2.40.3 github.com/antlr4-go/antlr/v4 v4.13.1 - github.com/aws/aws-sdk-go-v2 v1.41.7 + github.com/aws/aws-lambda-go v1.54.0 + github.com/aws/aws-sdk-go-v2 v1.42.0 github.com/aws/aws-sdk-go-v2/config v1.30.3 github.com/aws/aws-sdk-go-v2/credentials v1.18.3 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.3 + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 + github.com/aws/aws-sdk-go-v2/service/codedeploy v1.35.15 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0 github.com/aws/aws-sdk-go-v2/service/ecs v1.62.0 github.com/aws/aws-sdk-go-v2/service/emrcontainers v1.37.0 github.com/aws/aws-sdk-go-v2/service/glue v1.123.0 github.com/aws/aws-sdk-go-v2/service/s3 v1.86.0 github.com/aws/aws-sdk-go-v2/service/sts v1.36.0 + github.com/aws/smithy-go v1.27.1 github.com/babourine/x v1.0.4 + github.com/go-faster/errors v0.7.1 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 github.com/hladush/go-telemetry v0.0.0-20250803202644-50ee1a0a57cc github.com/kubeflow/spark-operator/v2 v2.3.0 github.com/lib/pq v1.10.9 github.com/linkedin/goavro v2.1.0+incompatible + github.com/pkg/errors v0.9.1 github.com/shopspring/decimal v1.4.0 github.com/snowflakedb/gosnowflake v1.15.0 github.com/stretchr/testify v1.11.1 @@ -43,15 +49,13 @@ require ( github.com/andybalholm/brotli v1.2.0 // indirect github.com/apache/arrow-go/v18 v18.4.0 // indirect github.com/apache/thrift v0.22.0 // indirect - github.com/aws/aws-lambda-go v1.54.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2 // indirect - github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 // indirect - github.com/aws/aws-sdk-go-v2/service/codedeploy v1.35.15 // indirect + github.com/aws/aws-sdk-go-v2/service/eks v1.85.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.2 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.2 // indirect @@ -59,7 +63,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.2 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.27.0 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.32.0 // indirect - github.com/aws/smithy-go v1.25.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/danieljoos/wincred v1.2.2 // indirect @@ -69,7 +72,6 @@ require ( github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.9 // indirect github.com/go-faster/city v1.0.1 // indirect - github.com/go-faster/errors v0.7.1 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect @@ -98,7 +100,6 @@ require ( github.com/paulmach/orb v0.11.1 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.22.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/go.sum b/go.sum index 99e6c574..e6dd9e63 100644 --- a/go.sum +++ b/go.sum @@ -30,14 +30,10 @@ github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g= github.com/aws/aws-lambda-go v1.54.0 h1:EGYpdyRGF88xszqlGcBewz811mJeRS+maNlLZXFheII= github.com/aws/aws-lambda-go v1.54.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= -github.com/aws/aws-sdk-go-v2 v1.38.0 h1:UCRQ5mlqcFk9HJDIqENSLR3wiG1VTWlyUfLDEvY7RxU= -github.com/aws/aws-sdk-go-v2 v1.38.0/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= -github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= -github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= github.com/aws/aws-sdk-go-v2 v1.41.7 h1:DWpAJt66FmnnaRIOT/8ASTucrvuDPZASqhhLey6tLY8= github.com/aws/aws-sdk-go-v2 v1.41.7/go.mod h1:4LAfZOPHNVNQEckOACQx60Y8pSRjIkNZQz1w92xpMJc= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0/go.mod h1:/mXlTIVG9jbxkqDnr5UQNQxW1HRYxeGklkM9vAFeabg= +github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/bymhA= +github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y= github.com/aws/aws-sdk-go-v2/config v1.30.3 h1:utupeVnE3bmB221W08P0Moz1lDI3OwYa2fBtUhl7TCc= @@ -48,18 +44,14 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.2 h1:nRniHAvjFJGUCl04F3WaAj7 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.2/go.mod h1:eJDFKAMHHUvv4a0Zfa7bQb//wFNUXGrbFpYRCHe2kD0= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.3 h1:Nb2pUE30lySKPGdkiIJ1SZgHsjiebOiRNI7R9NA1WtM= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.3/go.mod h1:BO5EKulvhBF1NXwui8lfnuDPBQQU5807yvWASZ/5n6k= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 h1:o9RnO+YZ4X+kt5Z7Nvcishlz0nksIt2PIzDglLMP0vA= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3/go.mod h1:+6aLJzOG1fvMOyzIySYjOFjcguGvVRL68R+uoRencN4= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 h1:GpT/TrnBYuE5gan2cZbTtvP+JlHsutdmlV2YfEyNde0= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23/go.mod h1:xYWD6BS9ywC5bS3sz9Xh04whO/hzK2plt2Zkyrp4JuA= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 h1:joyyUFhiTQQmVK6ImzNU9TQSNRNeD9kOklqTzyk5v6s= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3/go.mod h1:+vNIyZQP3b3B1tSLI0lxvrU9cfM7gpdRXMFfm67ZcPc= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 h1:f3vKqSo13fhTYb+JEcXwXefZQE26I1FB5eTSniU67ko= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29/go.mod h1:MzoLFUArKGpGD+ukmPiTPG1X5x4o6M2kq4v2dr1FiEc= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 h1:bpd8vxhlQi2r1hiueOw02f/duEPTMK59Q4QMAoTTtTo= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23/go.mod h1:15DfR2nw+CRHIk0tqNyifu3G1YdAOy68RftkhMDDwYk= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 h1:RdwIf/CuUsvJX3RgJagbOyotl/cxoLY4xviKuE7p2GY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29/go.mod h1:71wt8W2EgswdZy9Mf9KNnzxZ3TiZlv4caKghPktDOkA= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2 h1:sBpc8Ph6CpfZsEdkz/8bfg8WhKlWMCms5iWj6W/AW2U= @@ -72,6 +64,8 @@ github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0 h1:b7F96mjkzsqymMSGhuCqBQT github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0/go.mod h1:F8Rqs4FVGBTUzx3wbFm7HB/mgIA4Tc6/x0yQmjoB+/w= github.com/aws/aws-sdk-go-v2/service/ecs v1.62.0 h1:E5/BzpoN6fc/xWtKiFPUJBW6nW3KFINCz6so7v/fQ8E= github.com/aws/aws-sdk-go-v2/service/ecs v1.62.0/go.mod h1:UrdK8ip8HSwnESeuXhte4vlRVv0GIOpC92LR1+2m+zA= +github.com/aws/aws-sdk-go-v2/service/eks v1.85.0 h1:PX1X82jI4OEpsB7zrEuU0/V0Tq0+ylEJEu/CjhorV1c= +github.com/aws/aws-sdk-go-v2/service/eks v1.85.0/go.mod h1:rbIASs+SfCDUXx2EdfMkNpDGptlW8hvMZ9AawRiUBqE= github.com/aws/aws-sdk-go-v2/service/emrcontainers v1.37.0 h1:g36yievH3ecx+76/79+LIal5FqIXT+2J3Ztynayj9d4= github.com/aws/aws-sdk-go-v2/service/emrcontainers v1.37.0/go.mod h1:7J9Pidm1n/FvnKMcCUzi8eWNDzzqR6w3sbjhXrMXaxk= github.com/aws/aws-sdk-go-v2/service/glue v1.123.0 h1:oI5uX+NPxqkIP1Qy8XTlTiG3blhDDI0h1OtrOs14KZw= @@ -94,12 +88,10 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.32.0 h1:ywQF2N4VjqX+Psw+jLjMmUL2 github.com/aws/aws-sdk-go-v2/service/ssooidc v1.32.0/go.mod h1:Z+qv5Q6b7sWiclvbJyPSOT1BRVU9wfSUPaqQzZ1Xg3E= github.com/aws/aws-sdk-go-v2/service/sts v1.36.0 h1:bRP/a9llXSSgDPk7Rqn5GD/DQCGo6uk95plBFKoXt2M= github.com/aws/aws-sdk-go-v2/service/sts v1.36.0/go.mod h1:tgBsFzxwl65BWkuJ/x2EUs59bD4SfYKgikvFDJi1S58= -github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= -github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= -github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM= -github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/aws/smithy-go v1.25.1 h1:J8ERsGSU7d+aCmdQur5Txg6bVoYelvQJgtZehD12GkI= github.com/aws/smithy-go v1.25.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= +github.com/aws/smithy-go v1.27.1 h1:4T340VFndXtADGF52gYa1POyL7s9E4Z1OeZ1hCscIw8= +github.com/aws/smithy-go v1.27.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/babourine/x v1.0.4 h1:74LI/hDs87c3cQe5MAIFrtrRwhrOK1ayIZIMOEps4QU= github.com/babourine/x v1.0.4/go.mod h1:GWvPKlJfBjuVP5+fZ6L0aQ0ELxNmHsldrfokYagtq8Q= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= From a121e26624473ca802c7caed03e9baea99ea46b7 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Mon, 15 Jun 2026 12:15:40 +0530 Subject: [PATCH 16/16] address wiz comment and bump pkg --- go.mod | 4 ++-- go.sum | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 2eedecd0..f84f5645 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.30.3 github.com/aws/aws-sdk-go-v2/credentials v1.18.3 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.3 - github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.65.0 github.com/aws/aws-sdk-go-v2/service/codedeploy v1.35.15 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0 github.com/aws/aws-sdk-go-v2/service/ecs v1.62.0 @@ -49,7 +49,7 @@ require ( github.com/andybalholm/brotli v1.2.0 // indirect github.com/apache/arrow-go/v18 v18.4.0 // indirect github.com/apache/thrift v0.22.0 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.2 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect diff --git a/go.sum b/go.sum index e6dd9e63..9e99560b 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/by github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 h1:eBMB84YGghSocM7PsjmmPffTa+1FBUeNvGvFou6V/4o= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8/go.mod h1:lyw7GFp3qENLh7kwzf7iMzAxDn+NzjXEAGjKS2UOKqI= github.com/aws/aws-sdk-go-v2/config v1.30.3 h1:utupeVnE3bmB221W08P0Moz1lDI3OwYa2fBtUhl7TCc= github.com/aws/aws-sdk-go-v2/config v1.30.3/go.mod h1:NDGwOEBdpyZwLPlQkpKIO7frf18BW8PaCmAM9iUxQmI= github.com/aws/aws-sdk-go-v2/credentials v1.18.3 h1:ptfyXmv+ooxzFwyuBth0yqABcjVIkjDL0iTYZBSbum8= @@ -58,6 +60,8 @@ github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2 h1:sBpc8Ph6CpfZsEdkz/8bfg8WhKlW github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2/go.mod h1:Z2lDojZB+92Wo6EKiZZmJid9pPrDJW2NNIXSlaEfVlU= github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7 h1:Yj4NvoEEdSxA90x/uCBskzeF3OxZr72Yaf64n0fIVR4= github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.58.7/go.mod h1:9/Q0/HtqBTLMksFse42wZjUq0jJrUuo4XlnXy/uSoeg= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.65.0 h1:3yaFbUbuLfN8n1q01wZtQtHRzUDc/jm0VvniMY0IPE8= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.65.0/go.mod h1:PobeppEnIjw4pcgjFryNDZCTH7AiqZw0yb5r98Gvf9c= github.com/aws/aws-sdk-go-v2/service/codedeploy v1.35.15 h1:iFcroRD6TUmNLCoAMdPJGV28h1K4yULxNludMiA/LwA= github.com/aws/aws-sdk-go-v2/service/codedeploy v1.35.15/go.mod h1:r8xU4gaQ0oQNlfptj6Lzyv2Ityc2o0Sp3Gl3bX8KVE0= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0 h1:b7F96mjkzsqymMSGhuCqBQTZFx3mhTMa6IoG6SoVvC8=