Skip to content

Warn when a generated segment significantly exceeds maxRowsPerSegment due to partition-key skew #19573

Description

@Vishesh-Paliwal

Description

Today, when ingestion uses hashed (or range) partitioning with a targetRowsPerSegment / maxRowsPerSegment, that value is treated as a target assuming evenly distributed partition keys. If the data is skewed — i.e. a single partition-key value (e.g. one orgId) has far more rows than the target — Druid silently produces an oversized segment. There is no warning, metric, or task-report signal indicating that the configured target was significantly exceeded. The ingestion task simply reports success.

This is because the number of shards is derived from the cardinality of the partition dimension, not from the row distribution within each key:

// ParallelIndexSupervisorTask.computeIntervalToNumShards()
estimatedNumShards = Math.round(estimatedCardinality / maxRowsPerSegment);

and every row with the same key deterministically hashes into the same bucket:

// HashPartitionFunction.MURMUR3_32_ABS
return Math.abs(Hashing.murmur3_32().hashBytes(serializedRow).asInt() % numBuckets);

So a single hot key cannot be split, and the resulting segment can be many multiples of maxRowsPerSegment — with nothing surfaced to the operator.

Proposed change: after segment generation, compare each segment's actual row count against the configured targetRowsPerSegment / maxRowsPerSegment. When a segment exceeds the target by more than a configurable ratio (default e.g. 2.0), emit a WARN log and surface it in the task report so it is visible from the Overlord UI rather than buried in worker logs.

Concretely:

  • Per-segment row counts are already available at push time via Appenderator.getRowCount(SegmentIdWithShardSpec).
  • The natural hook is in the segment-generate tasks' createGeneratedPartitionsReport() (PartialHashSegmentGenerateTask / PartialRangeSegmentGenerateTask).
  • Add a field to IngestionStatsAndErrors (e.g. segmentsExceedingTarget: list of {segmentId, rowCount, target}) so the skew is reported as structured task output, not just a log line.
  • Make the warning ratio configurable with a sensible default; warning at exactly 1.0× would be too noisy given the "best-effort target" semantics.

Scope this to the native batch path first; the MSQ segment-generation path (FrameChannelHashPartitioner etc.) can be a follow-up.

This is intentionally a small, observability-only change — it does not alter partitioning behavior. It only makes existing skew visible so operators can react (re-partition, add a partition dimension, etc.) instead of discovering oversized segments via slow queries.

Motivation

Use case . We run Druid with daily segment granularity and hashed partitioning by orgId, with maxRowsPerSegment = 5,000,000. This works well for the vast majority of tenants. However, we now have one very large tenant generating roughly 5 billion logs per week. Because all rows for a single orgId hash to the same shard, that one org's data lands in a small number of massively oversized segments — far beyond the 5M target — while the configured limit suggests segments should be ~5M rows. The ingestion tasks report success with no indication that the target was blown past by orders of magnitude.

The skew is also extremely spiky: within the same day this org can produce ~20 million rows in one hour and ~300 million in another. So the oversized-segment problem is not even uniform across time chunks — some segments are catastrophically large and others are normal, which makes it hard to notice without per-segment inspection.

We strongly suspect these oversized segments are a significant contributor to slower queries for that tenant (and to uneven load across historicals), but today there is nothing in Druid's output that points an operator at the root cause. The first symptom is "queries are slow," and it takes manual segment-size investigation to discover the skew.

Rationale / benefits:

  • Operability: turns a silent, hard-to-diagnose condition into an explicit, actionable signal at ingestion time.
  • Low risk: purely additive observability; no change to partitioning logic, segment layout, or query behavior.
  • Faster root-causing: operators learn about skew when the data is ingested, not weeks later via query latency regressions.
  • Foundation for a real fix: the warning also gives us the diagnostic to motivate and measure a follow-up that actually mitigates the skew.

An open question for the maintainers: given a distribution this skewed and spiky, the standard advice is to add a secondary partition dimension to break up the hot key. But that only helps if the secondary dimension actually varies within the hot org, and it doesn't fully address the per-hour spikiness. Is there a recommended approach today for splitting a single oversized partition-key value across multiple segments while preserving cross-key pruning — beyond adding a secondary dimension? I'd also like to follow up with a separate Proposal for automatic overflow sub-partitioning of skewed range keys (the // Future improvement: Handle skewed partitions better TODO in PartitionBoundaries already points at this gap i feel). This issue is intentionally scoped to just the warning; would love to propose a deeper fix separately If you feel it's worth it .

I'm happy to contribute the implementation for this warning .

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions