Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [BUGFIX] gRPC: Fix panic when `grpc_compression` is set to `snappy` on ingester client or store-gateway client configurations. #7459
* [BUGFIX] Config: Mask Swift, etcd, Redis, and HTTP basic-auth credentials on the `/config` endpoint. #7473
* [BUGFIX] Memberlist: Drop incoming TCP transport packets when digest verification fails, preventing corrupted payloads from being forwarded. #7474
* [BUGFIX] Compactor: Fix stale `cortex_bucket_index_last_successful_update_timestamp_seconds` metric not being cleaned up when tenant ownership changes due to ring rebalancing. This caused false alarms on bucket index update rate when a tenant moved between compactors. #7485

## 1.21.0 2026-04-24

Expand Down
55 changes: 34 additions & 21 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,32 +412,50 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
markedForDeletion = append(markedForDeletion, deleted...)
isMarkedForDeletion := util.StringsMap(markedForDeletion)
allUsers := append(active, markedForDeletion...)
currentOwnedUsers := util.StringsMap(allUsers)

// Delete per-tenant metrics for all tenants not belonging anymore to this shard.
// Such tenants have been moved to a different shard, so their updated metrics will
// be exported by the new shard.
for _, userID := range c.lastOwnedUsers {
if !isActive[userID] && !isMarkedForDeletion[userID] {
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantParquetBlocks.DeleteLabelValues(userID)
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
c.remainingPlannedCompactions.DeleteLabelValues(userID)
if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
c.inProgressCompactions.DeleteLabelValues(userID)
c.oldestPartitionGroupOffset.DeleteLabelValues(userID)
}
}
c.deleteUserMetrics(userID)
}
}

// Also reset metrics for any user that was previously tracked but is no longer
// in the current owned set. This handles the case where a user's ownership changed
// due to ring rebalancing but the user still exists in the bucket (so it appears
// in the base scanner results but is filtered out by the shard check).
for _, userID := range c.lastOwnedUsers {
if _, stillOwned := currentOwnedUsers[userID]; !stillOwned {
c.deleteUserMetrics(userID)
}
}

c.lastOwnedUsers = allUsers

return active, markedForDeletion, nil
}

// deleteUserMetrics removes all per-tenant metrics for the given user.
func (c *BlocksCleaner) deleteUserMetrics(userID string) {
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantParquetBlocks.DeleteLabelValues(userID)
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
c.remainingPlannedCompactions.DeleteLabelValues(userID)
if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
c.inProgressCompactions.DeleteLabelValues(userID)
c.oldestPartitionGroupOffset.DeleteLabelValues(userID)
}
}
}

func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) {
cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID)
visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker)
Expand Down Expand Up @@ -473,7 +491,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
return err
}
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
c.deleteUserMetrics(userID)

var blocksToDelete []any
err := userBucket.Iter(ctx, "", func(name string) error {
Expand Down Expand Up @@ -524,12 +542,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
}

// Given all blocks have been deleted, we can also remove the metrics.
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantParquetBlocks.DeleteLabelValues(userID)
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.deleteUserMetrics(userID)

if deletedBlocks.Load() > 0 {
level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks.Load())
Expand Down
66 changes: 66 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,72 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
))
}

func TestBlocksCleaner_ShouldCleanupBucketIndexMetricOnOwnershipChange(t *testing.T) {
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

// Create blocks for two users.
createTSDBBlock(t, bucketClient, "user-1", 10, 20, nil)
createTSDBBlock(t, bucketClient, "user-2", 30, 40, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
}

ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
scanner, err := users.NewScanner(users.UsersScannerConfig{
Strategy: users.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

// First run: both users are owned by this compactor.
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers))

// Verify bucket index last update metric is set for both users.
require.NotZero(t, prom_testutil.ToFloat64(cleaner.tenantBucketIndexLastUpdate.WithLabelValues("user-1")))
require.NotZero(t, prom_testutil.ToFloat64(cleaner.tenantBucketIndexLastUpdate.WithLabelValues("user-2")))

// Simulate ring rebalancing: user-2 ownership moves to a different compactor.
// The ShardedScanner now only returns user-1.
cleaner.usersScanner, err = users.NewScanner(users.UsersScannerConfig{
Strategy: users.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cleaner.usersScanner = users.NewShardedScanner(cleaner.usersScanner, func(userID string) (bool, error) {
return userID == "user-1", nil
}, logger)

// Second run: user-2 is no longer owned.
activeUsers, deleteUsers, err = cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false))
require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers))

// Verify: user-1 metric still exists and is non-zero, user-2 metric has been cleaned up.
require.NotZero(t, prom_testutil.ToFloat64(cleaner.tenantBucketIndexLastUpdate.WithLabelValues("user-1")))
// user-2 metric should have been deleted. Calling WithLabelValues creates a new zero-value gauge,
// so we verify by checking the total number of metrics in the GaugeVec.
assert.Equal(t, 1, prom_testutil.CollectAndCount(cleaner.tenantBucketIndexLastUpdate),
"expected only user-1 metric to remain after ownership change")
}


func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) {
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
Expand Down
Loading