diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index b1aeb575..685a7810 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -930,13 +930,5 @@ func (e *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return fmt.Errorf("ECS cluster %q is not active: %s", clusterName, status) } - if clusterCtx.MaxTaskCount > 0 { - running := int(out.Clusters[0].RunningTasksCount) - pending := int(out.Clusters[0].PendingTasksCount) - if running+pending >= clusterCtx.MaxTaskCount { - return fmt.Errorf("ECS cluster %q at capacity: %d running + %d pending >= max %d", clusterName, running, pending, clusterCtx.MaxTaskCount) - } - } - return nil } diff --git a/internal/pkg/object/command/snowflake/snowflake.go b/internal/pkg/object/command/snowflake/snowflake.go index a41dd9ea..94512ba1 100644 --- a/internal/pkg/object/command/snowflake/snowflake.go +++ b/internal/pkg/object/command/snowflake/snowflake.go @@ -195,12 +195,13 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return err } - // Find the "state" column index - stateIdx := -1 + stateIdx, autoResumeIdx := -1, -1 for i, col := range cols { - if col == "state" { + switch col { + case "state": stateIdx = i - break + case "auto_resume": + autoResumeIdx = i } } if stateIdx < 0 { @@ -220,12 +221,34 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return err } - state, _ := vals[stateIdx].(string) - if state != "STARTED" { + state := scanToString(vals[stateIdx]) + switch state { + case "STARTED", "RESUMING": + return nil + case "SUSPENDED": + if autoResumeIdx >= 0 && scanToString(vals[autoResumeIdx]) == "true" { + return nil + } + return fmt.Errorf("snowflake warehouse %q is suspended and auto_resume is not enabled", clusterCtx.Warehouse) + default: return fmt.Errorf("snowflake warehouse %q is not ready: state=%s", clusterCtx.Warehouse, state) } +} - return nil +func scanToString(v any) string { + switch val := v.(type) { + case string: + return val + case []byte: + return string(val) + case bool: + if val { + return "true" + } + return "false" + default: + return fmt.Sprintf("%v", v) + } } func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { diff --git a/internal/pkg/object/command/trino/trino.go b/internal/pkg/object/command/trino/trino.go index 8781204f..236095fc 100644 --- a/internal/pkg/object/command/trino/trino.go +++ b/internal/pkg/object/command/trino/trino.go @@ -2,7 +2,6 @@ package trino import ( "context" - "encoding/json" "fmt" "io" "log" @@ -125,39 +124,6 @@ func (t *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er return fmt.Errorf("trino /v1/info returned status %d", resp.StatusCode) } - return t.checkTrinoClusterCapacity(ctx, clusterCtx.Endpoint) -} - -func (t *commandContext) checkTrinoClusterCapacity(ctx context.Context, endpoint string) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint+"/v1/cluster", nil) - if err != nil { - return err - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer func() { - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - }() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("trino /v1/cluster returned status %d", resp.StatusCode) - } - - var info struct { - ActiveWorkers int `json:"activeWorkers"` - } - if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { - return fmt.Errorf("trino /v1/cluster: failed to decode response: %w", err) - } - - if info.ActiveWorkers == 0 { - return fmt.Errorf("trino cluster has no active workers") - } - return nil }