From 4647d708f863236af5704b40ad9a98eb8fecebba Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 7 May 2026 08:03:21 +0000 Subject: [PATCH] fix(compactor): Clean up stale bucket index metrics on ownership change When a tenant's compactor ownership changes due to ring rebalancing, the old compactor's cortex_bucket_index_last_successful_update_timestamp_seconds metric was not being cleaned up. This caused the metric to have duplicate series (one stale from the old owner, one fresh from the new owner), triggering false alarms on bucket index update rate. The fix ensures scanUsers() properly detects tenants that were previously owned but are no longer in the current owned set, and cleans up their metrics. Also extracts metric deletion into a reusable deleteUserMetrics() helper to reduce code duplication. Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + pkg/compactor/blocks_cleaner.go | 55 ++++++++++++++--------- pkg/compactor/blocks_cleaner_test.go | 66 ++++++++++++++++++++++++++++ 3 files changed, 101 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7815bb58495..396bc61b7b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * [BUGFIX] gRPC: Fix panic when `grpc_compression` is set to `snappy` on ingester client or store-gateway client configurations. #7459 * [BUGFIX] Config: Mask Swift, etcd, Redis, and HTTP basic-auth credentials on the `/config` endpoint. #7473 * [BUGFIX] Memberlist: Drop incoming TCP transport packets when digest verification fails, preventing corrupted payloads from being forwarded. #7474 +* [BUGFIX] Compactor: Fix stale `cortex_bucket_index_last_successful_update_timestamp_seconds` metric not being cleaned up when tenant ownership changes due to ring rebalancing. This caused false alarms on bucket index update rate when a tenant moved between compactors. #7485 ## 1.21.0 2026-04-24 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 4a0e13c2018..e7633c9d67e 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -412,32 +412,50 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro markedForDeletion = append(markedForDeletion, deleted...) isMarkedForDeletion := util.StringsMap(markedForDeletion) allUsers := append(active, markedForDeletion...) + currentOwnedUsers := util.StringsMap(allUsers) + // Delete per-tenant metrics for all tenants not belonging anymore to this shard. // Such tenants have been moved to a different shard, so their updated metrics will // be exported by the new shard. for _, userID := range c.lastOwnedUsers { if !isActive[userID] && !isMarkedForDeletion[userID] { - c.tenantBlocks.DeleteLabelValues(userID) - c.tenantParquetBlocks.DeleteLabelValues(userID) - c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID) - c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID) - c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) - c.tenantPartialBlocks.DeleteLabelValues(userID) - c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) - if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle { - c.remainingPlannedCompactions.DeleteLabelValues(userID) - if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { - c.inProgressCompactions.DeleteLabelValues(userID) - c.oldestPartitionGroupOffset.DeleteLabelValues(userID) - } - } + c.deleteUserMetrics(userID) + } + } + + // Also reset metrics for any user that was previously tracked but is no longer + // in the current owned set. This handles the case where a user's ownership changed + // due to ring rebalancing but the user still exists in the bucket (so it appears + // in the base scanner results but is filtered out by the shard check). + for _, userID := range c.lastOwnedUsers { + if _, stillOwned := currentOwnedUsers[userID]; !stillOwned { + c.deleteUserMetrics(userID) } } + c.lastOwnedUsers = allUsers return active, markedForDeletion, nil } +// deleteUserMetrics removes all per-tenant metrics for the given user. +func (c *BlocksCleaner) deleteUserMetrics(userID string) { + c.tenantBlocks.DeleteLabelValues(userID) + c.tenantParquetBlocks.DeleteLabelValues(userID) + c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID) + c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID) + c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) + c.tenantPartialBlocks.DeleteLabelValues(userID) + c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) + if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle { + c.remainingPlannedCompactions.DeleteLabelValues(userID) + if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { + c.inProgressCompactions.DeleteLabelValues(userID) + c.oldestPartitionGroupOffset.DeleteLabelValues(userID) + } + } +} + func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) { cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID) visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker) @@ -473,7 +491,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil { return err } - c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) + c.deleteUserMetrics(userID) var blocksToDelete []any err := userBucket.Iter(ctx, "", func(name string) error { @@ -524,12 +542,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog } // Given all blocks have been deleted, we can also remove the metrics. - c.tenantBlocks.DeleteLabelValues(userID) - c.tenantParquetBlocks.DeleteLabelValues(userID) - c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID) - c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID) - c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) - c.tenantPartialBlocks.DeleteLabelValues(userID) + c.deleteUserMetrics(userID) if deletedBlocks.Load() > 0 { level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks.Load()) diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index ea247392578..df39f94883b 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -599,6 +599,72 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar )) } +func TestBlocksCleaner_ShouldCleanupBucketIndexMetricOnOwnershipChange(t *testing.T) { + bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) + bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) + + // Create blocks for two users. + createTSDBBlock(t, bucketClient, "user-1", 10, 20, nil) + createTSDBBlock(t, bucketClient, "user-2", 30, 40, nil) + + cfg := BlocksCleanerConfig{ + DeletionDelay: time.Hour, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(), + } + + ctx := context.Background() + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + scanner, err := users.NewScanner(users.UsersScannerConfig{ + Strategy: users.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) + cfgProvider := newMockConfigProvider() + blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: blocksMarkedForDeletionName, + Help: blocksMarkedForDeletionHelp, + }, append(commonLabels, reasonLabelName)) + dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) + + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) + + // First run: both users are owned by this compactor. + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) + + // Verify bucket index last update metric is set for both users. + require.NotZero(t, prom_testutil.ToFloat64(cleaner.tenantBucketIndexLastUpdate.WithLabelValues("user-1"))) + require.NotZero(t, prom_testutil.ToFloat64(cleaner.tenantBucketIndexLastUpdate.WithLabelValues("user-2"))) + + // Simulate ring rebalancing: user-2 ownership moves to a different compactor. + // The ShardedScanner now only returns user-1. + cleaner.usersScanner, err = users.NewScanner(users.UsersScannerConfig{ + Strategy: users.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) + cleaner.usersScanner = users.NewShardedScanner(cleaner.usersScanner, func(userID string) (bool, error) { + return userID == "user-1", nil + }, logger) + + // Second run: user-2 is no longer owned. + activeUsers, deleteUsers, err = cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) + + // Verify: user-1 metric still exists and is non-zero, user-2 metric has been cleaned up. + require.NotZero(t, prom_testutil.ToFloat64(cleaner.tenantBucketIndexLastUpdate.WithLabelValues("user-1"))) + // user-2 metric should have been deleted. Calling WithLabelValues creates a new zero-value gauge, + // so we verify by checking the total number of metrics in the GaugeVec. + assert.Equal(t, 1, prom_testutil.CollectAndCount(cleaner.tenantBucketIndexLastUpdate), + "expected only user-1 metric to remain after ownership change") +} + + func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) { bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)