-
Notifications
You must be signed in to change notification settings - Fork 364
Feat--enforce-retention #3615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat--enforce-retention #3615
Changes from all commits
a494030
9b03999
13d6314
8265df2
81e8c50
7c50d13
7e55d06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,8 +9,10 @@ import ( | |
| "github.com/jackc/pgx/v5/pgtype" | ||
| "github.com/labstack/echo/v4" | ||
|
|
||
| v1handlers "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1" | ||
| "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" | ||
| "github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1" | ||
| "github.com/hatchet-dev/hatchet/pkg/analytics" | ||
| "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" | ||
| ) | ||
|
|
||
|
|
@@ -22,6 +24,16 @@ func (t *V1EventsService) V1EventList(ctx echo.Context, request gen.V1EventListR | |
| offset := int64(0) | ||
| since := time.Now().Add(-time.Hour * 24) | ||
|
|
||
| if request.Params.Since != nil { | ||
| since = *request.Params.Since | ||
| } | ||
|
|
||
| if v1handlers.IsBeforeRetention(since, tenant.DataRetentionPeriod) { | ||
| t.config.Analytics.Count(ctx.Request().Context(), analytics.Event, analytics.List, analytics.Properties{ | ||
| "outside_retention": true, | ||
| }) | ||
| } | ||
|
Comment on lines
+31
to
+35
|
||
|
|
||
| if request.Params.Limit != nil { | ||
| limit = *request.Params.Limit | ||
| } | ||
|
|
@@ -30,10 +42,6 @@ func (t *V1EventsService) V1EventList(ctx echo.Context, request gen.V1EventListR | |
| offset = *request.Params.Offset | ||
| } | ||
|
|
||
| if request.Params.Since != nil { | ||
| since = *request.Params.Since | ||
| } | ||
|
|
||
| opts := sqlcv1.ListEventsParams{ | ||
| Tenantid: tenantId, | ||
| Limit: pgtype.Int8{ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,13 +6,13 @@ import ( | |
| "github.com/google/uuid" | ||
| "github.com/labstack/echo/v4" | ||
|
|
||
| v1handlers "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1" | ||
| "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" | ||
| transformers "github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1" | ||
| "github.com/hatchet-dev/hatchet/pkg/analytics" | ||
| v1 "github.com/hatchet-dev/hatchet/pkg/repository" | ||
| "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" | ||
| "github.com/hatchet-dev/hatchet/pkg/telemetry" | ||
|
|
||
| transformers "github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1" | ||
| ) | ||
|
|
||
| func (t *LogsService) V1TenantLogLineList(ctx echo.Context, request gen.V1TenantLogLineListRequestObject) (gen.V1TenantLogLineListResponseObject, error) { | ||
|
|
@@ -47,6 +47,12 @@ func (t *LogsService) V1TenantLogLineList(ctx echo.Context, request gen.V1Tenant | |
| since = request.Params.Since | ||
| } | ||
|
|
||
| if since != nil && v1handlers.IsBeforeRetention(*since, tenant.DataRetentionPeriod) { | ||
| t.config.Analytics.Count(ctx.Request().Context(), analytics.Log, analytics.List, analytics.Properties{ | ||
| "outside_retention": true, | ||
| }) | ||
| } | ||
|
Comment on lines
+50
to
+54
|
||
|
|
||
| if request.Params.Until != nil { | ||
| until = request.Params.Until | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| package v1 | ||
|
|
||
| import "time" | ||
|
|
||
| const defaultRetention = 720 * time.Hour | ||
|
|
||
| func retentionBoundary(retentionPeriod string) time.Time { | ||
| retention, err := time.ParseDuration(retentionPeriod) | ||
| if err != nil || retention <= 0 { | ||
| retention = defaultRetention | ||
| } | ||
|
|
||
| return time.Now().Add(-retention) | ||
|
Comment on lines
+5
to
+13
|
||
| } | ||
|
|
||
| // IsBeforeRetention returns true when the given timestamp is older than the | ||
| // tenant's retention window (now - retentionPeriod). | ||
| func IsBeforeRetention(t time.Time, retentionPeriod string) bool { | ||
| if t.IsZero() { | ||
| return false | ||
| } | ||
|
|
||
| return t.Before(retentionBoundary(retentionPeriod)) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,9 @@ import ( | |
| "github.com/jackc/pgx/v5" | ||
| "github.com/labstack/echo/v4" | ||
|
|
||
| v1handlers "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1" | ||
| "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" | ||
| "github.com/hatchet-dev/hatchet/pkg/analytics" | ||
| "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" | ||
|
|
||
| transformers "github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1" | ||
|
|
@@ -31,6 +33,14 @@ func (t *TasksService) V1TaskGet(ctx echo.Context, request gen.V1TaskGetRequestO | |
| return nil, echo.NewHTTPError(500, "Task type assertion failed") | ||
| } | ||
|
|
||
| tenant := ctx.Get("tenant").(*sqlcv1.Tenant) | ||
|
|
||
| if ts := task.InsertedAt; ts.Valid && v1handlers.IsBeforeRetention(ts.Time, tenant.DataRetentionPeriod) { | ||
| t.config.Analytics.Count(ctx.Request().Context(), analytics.TaskRun, analytics.Get, analytics.Properties{ | ||
| "outside_retention": true, | ||
| }) | ||
| } | ||
|
Comment on lines
+38
to
+42
|
||
|
|
||
| attempt := request.Params.Attempt | ||
|
|
||
| var retryCount *int | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,9 @@ import ( | |
| "github.com/jackc/pgx/v5" | ||
| "github.com/labstack/echo/v4" | ||
|
|
||
| v1handlers "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1" | ||
| "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" | ||
| "github.com/hatchet-dev/hatchet/pkg/analytics" | ||
| v1 "github.com/hatchet-dev/hatchet/pkg/repository" | ||
| "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" | ||
|
|
||
|
|
@@ -20,6 +22,12 @@ func (t *V1WorkflowRunsService) V1WorkflowRunGet(ctx echo.Context, request gen.V | |
| tenantId := tenant.ID | ||
| rawWorkflowRun := ctx.Get("v1-workflow-run").(*v1.V1WorkflowRunPopulator) | ||
|
|
||
| if ts := rawWorkflowRun.WorkflowRun.CreatedAt; ts.Valid && v1handlers.IsBeforeRetention(ts.Time, tenant.DataRetentionPeriod) { | ||
| t.config.Analytics.Count(ctx.Request().Context(), analytics.WorkflowRun, analytics.Get, analytics.Properties{ | ||
| "outside_retention": true, | ||
| }) | ||
| } | ||
|
Comment on lines
+25
to
+29
|
||
|
|
||
| requestContext := ctx.Request().Context() | ||
|
|
||
| details, err := t.getWorkflowRunDetails( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,12 +7,13 @@ import ( | |
| "github.com/google/uuid" | ||
| "github.com/labstack/echo/v4" | ||
|
|
||
| v1handlers "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1" | ||
| "github.com/hatchet-dev/hatchet/api/v1/server/oas/gen" | ||
| transformers "github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1" | ||
| "github.com/hatchet-dev/hatchet/pkg/analytics" | ||
| v1 "github.com/hatchet-dev/hatchet/pkg/repository" | ||
| "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" | ||
| "github.com/hatchet-dev/hatchet/pkg/telemetry" | ||
|
|
||
| transformers "github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1" | ||
| ) | ||
|
|
||
| func allOlapStatuses(runningFilter *gen.V1RunningFilter) []sqlcv1.V1ReadableStatusOlap { | ||
|
|
@@ -97,13 +98,21 @@ func normalizeWorkflowRunStatuses(statuses []gen.V1TaskStatus, runningFilter *ge | |
| return normalized | ||
| } | ||
|
|
||
| func (t *V1WorkflowRunsService) WithDags(ctx context.Context, request gen.V1WorkflowRunListRequestObject, tenantId uuid.UUID) (gen.V1WorkflowRunListResponseObject, error) { | ||
| func (t *V1WorkflowRunsService) WithDags(ctx context.Context, request gen.V1WorkflowRunListRequestObject, tenant *sqlcv1.Tenant) (gen.V1WorkflowRunListResponseObject, error) { | ||
| ctx, span := telemetry.NewSpan(ctx, "v1-workflow-runs-list-with-dags-tasks") | ||
| defer span.End() | ||
|
|
||
| tenantId := tenant.ID | ||
| since := request.Params.Since | ||
|
|
||
| if v1handlers.IsBeforeRetention(since, tenant.DataRetentionPeriod) { | ||
| t.config.Analytics.Count(ctx, analytics.WorkflowRun, analytics.List, analytics.Properties{ | ||
| "outside_retention": true, | ||
| }) | ||
| } | ||
|
Comment on lines
+108
to
+112
|
||
|
|
||
| var ( | ||
| statuses = allOlapStatuses(request.Params.RunningFilter) | ||
| since = request.Params.Since | ||
| limit int64 = 50 | ||
| offset int64 | ||
| ) | ||
|
|
@@ -244,13 +253,21 @@ func (t *V1WorkflowRunsService) WithDags(ctx context.Context, request gen.V1Work | |
| ), nil | ||
| } | ||
|
|
||
| func (t *V1WorkflowRunsService) OnlyTasks(ctx context.Context, request gen.V1WorkflowRunListRequestObject, tenantId uuid.UUID) (gen.V1WorkflowRunListResponseObject, error) { | ||
| func (t *V1WorkflowRunsService) OnlyTasks(ctx context.Context, request gen.V1WorkflowRunListRequestObject, tenant *sqlcv1.Tenant) (gen.V1WorkflowRunListResponseObject, error) { | ||
| ctx, span := telemetry.NewSpan(ctx, "v1-workflow-runs-list-only-tasks") | ||
| defer span.End() | ||
|
|
||
| tenantId := tenant.ID | ||
| since := request.Params.Since | ||
|
|
||
| if v1handlers.IsBeforeRetention(since, tenant.DataRetentionPeriod) { | ||
| t.config.Analytics.Count(ctx, analytics.WorkflowRun, analytics.List, analytics.Properties{ | ||
| "outside_retention": true, | ||
| }) | ||
| } | ||
|
Comment on lines
+263
to
+267
|
||
|
|
||
| var ( | ||
| statuses = allOlapStatuses(request.Params.RunningFilter) | ||
| since = request.Params.Since | ||
| workflowIds = []uuid.UUID{} | ||
| limit int64 = 50 | ||
| offset int64 | ||
|
|
@@ -353,15 +370,14 @@ func (t *V1WorkflowRunsService) OnlyTasks(ctx context.Context, request gen.V1Wor | |
|
|
||
| func (t *V1WorkflowRunsService) V1WorkflowRunList(ctx echo.Context, request gen.V1WorkflowRunListRequestObject) (gen.V1WorkflowRunListResponseObject, error) { | ||
| tenant := ctx.Get("tenant").(*sqlcv1.Tenant) | ||
| tenantId := tenant.ID | ||
|
|
||
| spanContext, span := telemetry.NewSpan(ctx.Request().Context(), "v1-workflow-runs-list") | ||
| defer span.End() | ||
|
|
||
| if request.Params.OnlyTasks { | ||
| return t.OnlyTasks(spanContext, request, tenantId) | ||
| return t.OnlyTasks(spanContext, request, tenant) | ||
| } else { | ||
| return t.WithDags(spanContext, request, tenantId) | ||
| return t.WithDags(spanContext, request, tenant) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For
Count()instrumentation, boolean properties should follow the repo convention ofhas_*(to keep dashboards consistent). Consider renamingoutside_retentionto something likehas_retention_violation(and apply consistently across all retention events).