Skip to content

Commit 80b2174

Browse files
authored
Make all list partition calls specific to v1 tenants only (#3758)
1 parent 94d166a commit 80b2174

10 files changed

Lines changed: 33 additions & 57 deletions

File tree

internal/operation/pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func NewTenantOperationPool(p *partition.Partition, ql *zerolog.Logger, operatio
7878
// list all tenants
7979
innerCtx, innerCancel := context.WithTimeout(outerCtx, 5*time.Second)
8080

81-
tenants, err := p.ListTenantsForController(innerCtx, sqlcv1.TenantMajorEngineVersionV1)
81+
tenants, err := p.ListTenantsForController(innerCtx)
8282

8383
if err != nil {
8484
innerCancel()

internal/services/controllers/olap/process_alerts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func (o *OLAPControllerImpl) runTenantProcessAlerts(ctx context.Context) func()
1717
o.l.Debug().Ctx(ctx).Msgf("partition: processing tenant alerts")
1818

1919
// list all tenants
20-
tenants, err := o.p.ListTenantsForController(ctx, sqlcv1.TenantMajorEngineVersionV1)
20+
tenants, err := o.p.ListTenantsForController(ctx)
2121

2222
if err != nil {
2323
o.l.Error().Ctx(ctx).Err(err).Msg("could not list tenants")

internal/services/controllers/olap/process_dag_status_updates.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (o *OLAPControllerImpl) runDAGStatusUpdates(ctx context.Context) func() {
2323
o.l.Debug().Ctx(ctx).Msgf("partition: running status updates for dags")
2424

2525
// list all tenants
26-
tenants, err := o.p.ListTenantsForController(ctx, sqlcv1.TenantMajorEngineVersionV1)
26+
tenants, err := o.p.ListTenantsForController(ctx)
2727

2828
if err != nil {
2929
o.l.Error().Ctx(ctx).Err(err).Msg("could not list tenants")

internal/services/controllers/olap/process_task_status_updates.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (o *OLAPControllerImpl) runTaskStatusUpdates(ctx context.Context) func() {
2323
o.l.Debug().Ctx(ctx).Msgf("partition: running status updates for tasks")
2424

2525
// list all tenants
26-
tenants, err := o.p.ListTenantsForController(ctx, sqlcv1.TenantMajorEngineVersionV1)
26+
tenants, err := o.p.ListTenantsForController(ctx)
2727

2828
if err != nil {
2929
o.l.Error().Ctx(ctx).Err(err).Msg("could not list tenants")

internal/services/controllers/retention/shared.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func GetDataRetentionExpiredTime(duration string) (time.Time, error) {
2323
}
2424

2525
func (rc *RetentionControllerImpl) ForTenants(ctx context.Context, perTenantTimeout time.Duration, f func(ctx context.Context, tenant sqlcv1.Tenant) error) error {
26-
tenants, err := rc.p.ListTenantsForController(ctx, sqlcv1.TenantMajorEngineVersionV0)
26+
tenants, err := rc.p.ListTenantsForController(ctx)
2727

2828
if err != nil {
2929
return fmt.Errorf("could not list tenants: %w", err)

internal/services/partition/partition.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,16 +176,16 @@ func (p *Partition) GetInternalTenantForController(ctx context.Context) (*sqlcv1
176176
return p.repo.GetInternalTenantForController(ctx, p.GetControllerPartitionId())
177177
}
178178

179-
func (p *Partition) ListTenantsForController(ctx context.Context, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error) {
180-
return p.repo.ListTenantsByControllerPartition(ctx, p.GetControllerPartitionId(), majorVersion)
179+
func (p *Partition) ListTenantsForController(ctx context.Context) ([]*sqlcv1.Tenant, error) {
180+
return p.repo.ListTenantsByControllerPartition(ctx, p.GetControllerPartitionId())
181181
}
182182

183-
func (p *Partition) ListTenantsForScheduler(ctx context.Context, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error) {
184-
return p.repo.ListTenantsBySchedulerPartition(ctx, p.GetSchedulerPartitionId(), majorVersion)
183+
func (p *Partition) ListTenantsForScheduler(ctx context.Context) ([]*sqlcv1.Tenant, error) {
184+
return p.repo.ListTenantsBySchedulerPartition(ctx, p.GetSchedulerPartitionId())
185185
}
186186

187-
func (p *Partition) ListTenantsForWorkerPartition(ctx context.Context, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error) {
188-
return p.repo.ListTenantsByWorkerPartition(ctx, p.GetWorkerPartitionId(), majorVersion)
187+
func (p *Partition) ListTenantsForWorkerPartition(ctx context.Context) ([]*sqlcv1.Tenant, error) {
188+
return p.repo.ListTenantsByWorkerPartition(ctx, p.GetWorkerPartitionId())
189189
}
190190

191191
func (p *Partition) runControllerPartitionHeartbeat(ctx context.Context) func() {

internal/services/scheduler/v1/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func (s *Scheduler) runSetTenants(ctx context.Context) func() {
393393
s.l.Debug().Ctx(ctx).Msgf("partition: checking step run requeue")
394394

395395
// list all tenants
396-
tenants, err := s.repov1.Tenant().ListTenantsBySchedulerPartition(ctx, s.p.GetSchedulerPartitionId(), sqlcv1.TenantMajorEngineVersionV1)
396+
tenants, err := s.repov1.Tenant().ListTenantsBySchedulerPartition(ctx, s.p.GetSchedulerPartitionId())
397397

398398
if err != nil {
399399
s.l.Err(err).Ctx(ctx).Msg("could not list tenants")

pkg/repository/sqlcv1/tenants.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ FROM
8787
"Tenant" as tenants
8888
WHERE
8989
"controllerPartitionId" = sqlc.arg('controllerPartitionId')::text
90-
AND "version" = @majorVersion::"TenantMajorEngineVersion"
90+
AND "version" = 'V1'::"TenantMajorEngineVersion"
9191
AND "deletedAt" IS NULL;
9292

9393
-- name: ListTenantsByTenantWorkerPartitionId :many
@@ -97,7 +97,7 @@ FROM
9797
"Tenant" as tenants
9898
WHERE
9999
"workerPartitionId" = sqlc.arg('workerPartitionId')::text
100-
AND "version" = @majorVersion::"TenantMajorEngineVersion"
100+
AND "version" = 'V1'::"TenantMajorEngineVersion"
101101
AND "deletedAt" IS NULL;
102102

103103
-- name: GetTenantByID :one
@@ -493,7 +493,7 @@ FROM
493493
"Tenant" as tenants
494494
WHERE
495495
"schedulerPartitionId" = sqlc.arg('schedulerPartitionId')::text
496-
AND "version" = @majorVersion::"TenantMajorEngineVersion"
496+
AND "version" = 'V1'::"TenantMajorEngineVersion"
497497
AND "deletedAt" IS NULL;
498498

499499
-- name: UpsertTenantAlertingSettings :one

pkg/repository/sqlcv1/tenants.sql.go

Lines changed: 9 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/repository/tenant.go

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,11 @@ type TenantRepository interface {
137137
GetInternalTenantForController(ctx context.Context, controllerPartitionId string) (*sqlcv1.Tenant, error)
138138

139139
// ListTenantsByPartition lists all tenants in the given partition
140-
ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error)
140+
ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string) ([]*sqlcv1.Tenant, error)
141141

142-
ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error)
142+
ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string) ([]*sqlcv1.Tenant, error)
143143

144-
ListTenantsBySchedulerPartition(ctx context.Context, schedulerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error)
144+
ListTenantsBySchedulerPartition(ctx context.Context, schedulerPartitionId string) ([]*sqlcv1.Tenant, error)
145145

146146
// CreateEnginePartition creates a new partition for tenants within the engine
147147
CreateControllerPartition(ctx context.Context) (string, error)
@@ -743,26 +743,20 @@ func (r *tenantRepository) GetInternalTenantForController(ctx context.Context, c
743743
return tenant, nil
744744
}
745745

746-
func (r *tenantRepository) ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error) {
746+
func (r *tenantRepository) ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string) ([]*sqlcv1.Tenant, error) {
747747
if controllerPartitionId == "" {
748748
return nil, fmt.Errorf("partitionId is required")
749749
}
750750

751-
return r.queries.ListTenantsByControllerPartitionId(ctx, r.pool, sqlcv1.ListTenantsByControllerPartitionIdParams{
752-
ControllerPartitionId: controllerPartitionId,
753-
Majorversion: majorVersion,
754-
})
751+
return r.queries.ListTenantsByControllerPartitionId(ctx, r.pool, controllerPartitionId)
755752
}
756753

757-
func (r *tenantRepository) ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error) {
754+
func (r *tenantRepository) ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string) ([]*sqlcv1.Tenant, error) {
758755
if workerPartitionId == "" {
759756
return nil, fmt.Errorf("partitionId is required")
760757
}
761758

762-
return r.queries.ListTenantsByTenantWorkerPartitionId(ctx, r.pool, sqlcv1.ListTenantsByTenantWorkerPartitionIdParams{
763-
WorkerPartitionId: workerPartitionId,
764-
Majorversion: majorVersion,
765-
})
759+
return r.queries.ListTenantsByTenantWorkerPartitionId(ctx, r.pool, workerPartitionId)
766760
}
767761

768762
func (r *tenantRepository) CreateControllerPartition(ctx context.Context) (string, error) {
@@ -850,15 +844,12 @@ func (r *tenantRepository) UpdateSchedulerPartitionHeartbeat(ctx context.Context
850844
return partition.ID, nil
851845
}
852846

853-
func (r *tenantRepository) ListTenantsBySchedulerPartition(ctx context.Context, schedulerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error) {
847+
func (r *tenantRepository) ListTenantsBySchedulerPartition(ctx context.Context, schedulerPartitionId string) ([]*sqlcv1.Tenant, error) {
854848
if schedulerPartitionId == "" {
855849
return nil, fmt.Errorf("partitionId is required")
856850
}
857851

858-
return r.queries.ListTenantsBySchedulerPartitionId(ctx, r.pool, sqlcv1.ListTenantsBySchedulerPartitionIdParams{
859-
SchedulerPartitionId: schedulerPartitionId,
860-
Majorversion: majorVersion,
861-
})
852+
return r.queries.ListTenantsBySchedulerPartitionId(ctx, r.pool, schedulerPartitionId)
862853
}
863854

864855
func (r *tenantRepository) CreateSchedulerPartition(ctx context.Context) (string, error) {

0 commit comments

Comments
 (0)