From 1bf61907546416068febf148884f527055499c8f Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Wed, 17 Jun 2026 17:14:24 +0530 Subject: [PATCH 1/2] fix: healthchecks for ecs, snowflake and trino --- internal/pkg/object/command/ecs/ecs.go | 8 ----- .../pkg/object/command/snowflake/snowflake.go | 24 +++++++++---- internal/pkg/object/command/trino/trino.go | 34 ------------------- 3 files changed, 17 insertions(+), 49 deletions(-) diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index b1aeb575..685a7810 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -930,13 +930,5 @@ func (e *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return fmt.Errorf("ECS cluster %q is not active: %s", clusterName, status) } - if clusterCtx.MaxTaskCount > 0 { - running := int(out.Clusters[0].RunningTasksCount) - pending := int(out.Clusters[0].PendingTasksCount) - if running+pending >= clusterCtx.MaxTaskCount { - return fmt.Errorf("ECS cluster %q at capacity: %d running + %d pending >= max %d", clusterName, running, pending, clusterCtx.MaxTaskCount) - } - } - return nil } diff --git a/internal/pkg/object/command/snowflake/snowflake.go b/internal/pkg/object/command/snowflake/snowflake.go index a41dd9ea..8a2dc1b9 100644 --- a/internal/pkg/object/command/snowflake/snowflake.go +++ b/internal/pkg/object/command/snowflake/snowflake.go @@ -195,12 +195,13 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return err } - // Find the "state" column index - stateIdx := -1 + stateIdx, autoResumeIdx := -1, -1 for i, col := range cols { - if col == "state" { + switch col { + case "state": stateIdx = i - break + case "auto_resume": + autoResumeIdx = i } } if stateIdx < 0 { @@ -221,11 +222,20 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er } state, _ := vals[stateIdx].(string) - if state != "STARTED" { + switch state { + case "STARTED": + return nil + case "SUSPENDED": + if autoResumeIdx >= 0 { + autoResume, _ := vals[autoResumeIdx].(string) + if autoResume == "true" { + return nil + } + } + return fmt.Errorf("snowflake warehouse %q is suspended with auto_resume disabled", clusterCtx.Warehouse) + default: return fmt.Errorf("snowflake warehouse %q is not ready: state=%s", clusterCtx.Warehouse, state) } - - return nil } func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { diff --git a/internal/pkg/object/command/trino/trino.go b/internal/pkg/object/command/trino/trino.go index 8781204f..236095fc 100644 --- a/internal/pkg/object/command/trino/trino.go +++ b/internal/pkg/object/command/trino/trino.go @@ -2,7 +2,6 @@ package trino import ( "context" - "encoding/json" "fmt" "io" "log" @@ -125,39 +124,6 @@ func (t *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return fmt.Errorf("trino /v1/info returned status %d", resp.StatusCode) } - return t.checkTrinoClusterCapacity(ctx, clusterCtx.Endpoint) -} - -func (t *commandContext) checkTrinoClusterCapacity(ctx context.Context, endpoint string) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint+"/v1/cluster", nil) - if err != nil { - return err - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer func() { - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - }() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("trino /v1/cluster returned status %d", resp.StatusCode) - } - - var info struct { - ActiveWorkers int `json:"activeWorkers"` - } - if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { - return fmt.Errorf("trino /v1/cluster: failed to decode response: %w", err) - } - - if info.ActiveWorkers == 0 { - return fmt.Errorf("trino cluster has no active workers") - } - return nil } From a995b7894ea56a4b13bfc2a5c57048fe43417065 Mon Sep 17 00:00:00 2001 From: Yash Shrivastava Date: Wed, 17 Jun 2026 17:59:56 +0530 Subject: [PATCH 2/2] address review comments --- .../pkg/object/command/snowflake/snowflake.go | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/internal/pkg/object/command/snowflake/snowflake.go b/internal/pkg/object/command/snowflake/snowflake.go index 8a2dc1b9..94512ba1 100644 --- a/internal/pkg/object/command/snowflake/snowflake.go +++ b/internal/pkg/object/command/snowflake/snowflake.go @@ -221,23 +221,36 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return err } - state, _ := vals[stateIdx].(string) + state := scanToString(vals[stateIdx]) switch state { - case "STARTED": + case "STARTED", "RESUMING": return nil case "SUSPENDED": - if autoResumeIdx >= 0 { - autoResume, _ := vals[autoResumeIdx].(string) - if autoResume == "true" { - return nil - } + if autoResumeIdx >= 0 && scanToString(vals[autoResumeIdx]) == "true" { + return nil } - return fmt.Errorf("snowflake warehouse %q is suspended with auto_resume disabled", clusterCtx.Warehouse) + return fmt.Errorf("snowflake warehouse %q is suspended and auto_resume is not enabled", clusterCtx.Warehouse) default: return fmt.Errorf("snowflake warehouse %q is not ready: state=%s", clusterCtx.Warehouse, state) } } +func scanToString(v any) string { + switch val := v.(type) { + case string: + return val + case []byte: + return string(val) + case bool: + if val { + return "true" + } + return "false" + default: + return fmt.Sprintf("%v", v) + } +} + func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed return nil