Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api-reference/automatic-compaction-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ This experimental policy prioritizes compaction of intervals with the largest nu
|`maxAverageUncompactedBytesPerSegment`|Maximum average size of uncompacted segments in an interval eligible for compaction. Human-readable byte format (e.g., "2GiB").|2 GiB|
|`minUncompactedBytesPercentForFullCompaction`|Threshold percentage (0-100) of uncompacted bytes to total bytes below which minor compaction is eligible instead of full compaction.|0|
|`minUncompactedRowsPercentForFullCompaction`|Threshold percentage (0-100) of uncompacted rows to total rows below which minor compaction is eligible instead of full compaction.|0|
|`forcePendingDeletionCompaction`|When `true`, an interval whose [cascading reindexing](../data-management/cascading-reindexing.md) deletion rules have not yet been applied to all of its segments is compacted even if it does not meet the minimum interval-size thresholds above. This applies only to cascading reindexing supervisors and their deletion rules; it has no effect on intervals without pending deletion rules or on any other compaction. Intended for operators who apply deletion rules for data compliance.|false|

#### Compaction policy `fixedIntervalOrder`

Expand Down
6 changes: 6 additions & 0 deletions docs/data-management/cascading-reindexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ Example:
}
```

**Applying deletions regardless of compaction thresholds:**

When the cluster compaction policy is [`mostFragmentedFirst`](../api-reference/automatic-compaction-api.md#compaction-policy-mostfragmentedfirst), an interval must meet various minimum thresholds (such as `minUncompactedCount`) before it is compacted. This intentionally avoids compacting intervals with low fragmentation.

Operators who apply deletion rules for data compliance (or are generally interested in eagerly applying their deletion rules) can set `forcePendingDeletionCompaction: true` on the policy. Any interval that has deletion rules not yet applied to all of its segments is then compacted regardless of those thresholds, while still using the policy's full-vs-minor decision. This applies only to cascading reindexing deletion rules; it does not change eligibility for any other compaction. The setting defaults to `false`, so by default deletions wait until the interval qualifies for compaction on its own.

#### Index spec rules

Index spec rules control compression and encoding settings for compacted segments, independently of partitioning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,59 @@ public void test_cascadingCompactionTemplate_multiplePeriodsApplyDifferentCompac
verifyEventCountOlderThan(Period.days(7), "item", "hat", 0);
}

@Test
public void test_cascadingReindexing_appliesPendingDeletionRule_evenWhenIntervalBelowFragmentationThreshold()
{
// minUncompactedCount is far above the segment count, so the interval only becomes a candidate
// because forcePendingDeletionCompaction is enabled and a deletion rule is pending.
configureCompaction(
CompactionEngine.MSQ,
new MostFragmentedIntervalFirstPolicy(10_000, null, null, null, null, null, true)
);

DateTime now = DateTimes.nowUtc();
// A few segments older than the rule period; half the rows are item='hat'.
String oldEvents = generateEventsInInterval(
new Interval(now.minusDays(31), now.minusDays(14)),
6,
Duration.ofHours(25).toMillis()
);
runIngestionAtGranularity("DAY", oldEvents);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

Assertions.assertEquals(6, getNumSegmentsWith(Granularities.DAY));
verifyEventCountOlderThan(Period.days(7), "item", "hat", 3);

ReindexingDeletionRule deletionRule = new ReindexingDeletionRule(
"deletionRule",
"Drop rows where item is 'hat'",
Period.days(7),
new EqualityFilter("item", ColumnType.STRING, "hat", null),
null
);

CascadingReindexingTemplate template = new CascadingReindexingTemplate(
dataSource,
null,
null,
InlineReindexingRuleProvider.builder().deletionRules(List.of(deletionRule)).build(),
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(null, null),
null,
null
);

runCompactionWithSpec(template);
waitForAllCompactionTasksToFinish();
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

// The pending deletion rule forced compaction despite the interval being below the size threshold.
verifyEventCountOlderThan(Period.days(7), "item", "hat", 0);
}

@Test
public void test_cascadingReindexing_withVirtualColumnOnNestedData_filtersCorrectly()
{
Expand Down Expand Up @@ -1235,12 +1288,12 @@ public static List<Object[]> getPolicyAndPartition()
return List.of(
new Object[]{
// decides minor compaction based on bytes percent
new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null, null),
new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null, null, null),
new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false)
},
new Object[]{
// decides minor compaction based on rows percent
new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, null, 51, null),
new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, null, 51, null, null),
new DynamicPartitionsSpec(null, null)
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public static Stream<Arguments> getCompactionSupervisorTestParams()
),
Arguments.of(
CompactionEngine.MSQ,
new MostFragmentedIntervalFirstPolicy(1, HumanReadableBytes.valueOf(1), null, null, 80, null)
new MostFragmentedIntervalFirstPolicy(1, HumanReadableBytes.valueOf(1), null, null, 80, null, null)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.CompactionSlotManager;
import org.apache.druid.server.compaction.CompactionTaskStatus;
import org.apache.druid.server.compaction.DataSourceCompactibleSegmentIterator;
import org.apache.druid.server.compaction.Eligibility;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
Expand All @@ -50,6 +53,8 @@
*/
public class CompactionConfigBasedJobTemplate implements CompactionJobTemplate
{
private static final Logger log = new Logger(CompactionConfigBasedJobTemplate.class);

private final DataSourceCompactionConfig config;
private final ReindexingConfigOptimizer configOptimizer;

Expand Down Expand Up @@ -94,10 +99,27 @@ public List<CompactionJob> createCompactionJobs(
// Create a job for each CompactionCandidate
while (segmentIterator.hasNext()) {
final CompactionCandidate candidate = segmentIterator.next();
final Eligibility eligibility =
params.getClusterCompactionConfig()
.getCompactionPolicy()
.checkEligibilityForCompaction(candidate, params.getLatestTaskStatus(candidate));
final CompactionCandidateSearchPolicy policy = params.getClusterCompactionConfig().getCompactionPolicy();
final CompactionTaskStatus latestTaskStatus = params.getLatestTaskStatus(candidate);

Eligibility eligibility = policy.checkEligibilityForCompaction(candidate, latestTaskStatus);
if (policy.isForceMandatoryCompactionEnabled()
&& !eligibility.isEligible()
&& configOptimizer.hasUnappliedDeletionRules(config, candidate, params)) {
// When the operator has opted in, a cascading reindexing interval with unapplied deletion rules
// must be reindexed for compliance even when it is below the policy's size thresholds; bypass
// those gates but keep its full-vs-minor decision.
final String policyRejectionReason = eligibility.getReason();
eligibility = policy.checkEligibilityForMandatoryCompaction(candidate, latestTaskStatus);
log.info(
"Forcing compaction of interval[%s] for dataSource[%s] in mode[%s] because it has unapplied"
+ " deletion rules, even though it is not normally eligible[%s].",
candidate.getUmbrellaInterval(),
config.getDataSource(),
eligibility.getMode(),
policyRejectionReason
);
}
if (!eligibility.isEligible()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ DataSourceCompactionConfig optimizeConfig(
CompactionJobParams params
);

/**
* Whether {@code candidate} has deletion rules not yet applied to all of its segments. The job
* template uses this to force compaction eligibility for intervals that must be reindexed to apply
* deletion rules, regardless of the policy's minimum interval-size thresholds. Defaults to false.
*/
default boolean hasUnappliedDeletionRules(
DataSourceCompactionConfig config,
CompactionCandidate candidate,
CompactionJobParams params
)
{
return false;
}

/**
* Identity finalizer that returns the config unchanged.
* Use this for templates that don't need per-candidate customization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -95,6 +94,31 @@ public DataSourceCompactionConfig optimizeConfig(
.build();
}

@Override
public boolean hasUnappliedDeletionRules(
DataSourceCompactionConfig config,
CompactionCandidate candidate,
CompactionJobParams params
)
{
if (config.getTransformSpec() == null) {
return false;
}
final DimFilter filter = config.getTransformSpec().getFilter();
if (!(filter instanceof NotDimFilter)) {
return false;
}

// Compare the decomposed NOT(OR(...)) clauses (as optimizeConfig does) rather than the whole
// transformSpec, so a spec differing only by folded-in partitioning virtual columns isn't mistaken
// for a pending deletion. Unlike optimizeConfig, never-compacted candidates are not short-circuited.
return computeRequiredSetOfFilterRulesForCandidate(
Comment thread
capistrant marked this conversation as resolved.
candidate,
(NotDimFilter) filter,
params.getFingerprintMapper()
) != null;
}

/**
* Computes the required set of deletion rules to be applied for the given {@link CompactionCandidate}.
* <p>
Expand All @@ -121,16 +145,20 @@ private NotDimFilter computeRequiredSetOfFilterRulesForCandidate(
expectedFilters = ((OrDimFilter) expectedFilter.getField()).getFields();
}

// If any segment has no fingerprint, we cannot prove the deletion rules were applied to it, so we
// cannot prune anything: report every rule as unapplied. This also subsumes the all-null case (no
// segment can prove application). Only when every segment has a fingerprint do we compare applied
// filters against expected.
final boolean anySegmentMissingFingerprint = candidateSegments.getSegments().stream()
.anyMatch(s -> s.getIndexingStateFingerprint() == null);
if (anySegmentMissingFingerprint) {
return expectedFilter;
}

Set<String> uniqueFingerprints = candidateSegments.getSegments().stream()
.map(DataSegment::getIndexingStateFingerprint)
.filter(Objects::nonNull)
.collect(Collectors.toSet());

if (uniqueFingerprints.isEmpty()) {
// no fingerprints means that no candidate segments have transforms to compare against. Return all filters eagerly.
return expectedFilter;
}

Set<DimFilter> unappliedRules = new HashSet<>();

for (String fingerprint : uniqueFingerprints) {
Expand Down
Loading
Loading