Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions internal/pkg/object/command/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,13 +930,5 @@ func (e *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er
return fmt.Errorf("ECS cluster %q is not active: %s", clusterName, status)
}

if clusterCtx.MaxTaskCount > 0 {
running := int(out.Clusters[0].RunningTasksCount)
pending := int(out.Clusters[0].PendingTasksCount)
if running+pending >= clusterCtx.MaxTaskCount {
return fmt.Errorf("ECS cluster %q at capacity: %d running + %d pending >= max %d", clusterName, running, pending, clusterCtx.MaxTaskCount)
}
}

return nil
}
37 changes: 30 additions & 7 deletions internal/pkg/object/command/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er
return err
}

// Find the "state" column index
stateIdx := -1
stateIdx, autoResumeIdx := -1, -1
for i, col := range cols {
if col == "state" {
switch col {
case "state":
stateIdx = i
break
case "auto_resume":
autoResumeIdx = i
}
}
if stateIdx < 0 {
Expand All @@ -220,12 +221,34 @@ func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er
return err
}

state, _ := vals[stateIdx].(string)
if state != "STARTED" {
state := scanToString(vals[stateIdx])
switch state {
case "STARTED", "RESUMING":
return nil
case "SUSPENDED":
if autoResumeIdx >= 0 && scanToString(vals[autoResumeIdx]) == "true" {
return nil
}
return fmt.Errorf("snowflake warehouse %q is suspended and auto_resume is not enabled", clusterCtx.Warehouse)
default:
return fmt.Errorf("snowflake warehouse %q is not ready: state=%s", clusterCtx.Warehouse, state)
}
}

return nil
func scanToString(v any) string {
switch val := v.(type) {
case string:
return val
case []byte:
return string(val)
case bool:
if val {
return "true"
}
return "false"
default:
return fmt.Sprintf("%v", v)
}
}

func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error {
Expand Down
34 changes: 0 additions & 34 deletions internal/pkg/object/command/trino/trino.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package trino

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -125,39 +124,6 @@ func (t *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) er
return fmt.Errorf("trino /v1/info returned status %d", resp.StatusCode)
}

return t.checkTrinoClusterCapacity(ctx, clusterCtx.Endpoint)
}

func (t *commandContext) checkTrinoClusterCapacity(ctx context.Context, endpoint string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint+"/v1/cluster", nil)
if err != nil {
return err
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("trino /v1/cluster returned status %d", resp.StatusCode)
}

var info struct {
ActiveWorkers int `json:"activeWorkers"`
}
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
return fmt.Errorf("trino /v1/cluster: failed to decode response: %w", err)
}

if info.ActiveWorkers == 0 {
return fmt.Errorf("trino cluster has no active workers")
}

return nil
}

Expand Down
Loading