Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class CascadingReindexingTemplate implements CompactionJobTemplate, DataS
private final int taskPriority;
private final long inputSegmentSizeBytes;
private final Period skipOffsetFromLatest;
private final Period skipOffsetFromEarliest;
private final Period skipOffsetFromNow;
private final Granularity defaultSegmentGranularity;
private final PartitionsSpec defaultPartitionsSpec;
Expand All @@ -118,6 +119,7 @@ public CascadingReindexingTemplate(
@JsonProperty("ruleProvider") ReindexingRuleProvider ruleProvider,
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext,
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
@JsonProperty("skipOffsetFromEarliest") @Nullable Period skipOffsetFromEarliest,
@JsonProperty("skipOffsetFromNow") @Nullable Period skipOffsetFromNow,
@JsonProperty("defaultSegmentGranularity") Granularity defaultSegmentGranularity,
@JsonProperty("defaultPartitionsSpec") PartitionsSpec defaultPartitionsSpec,
Expand Down Expand Up @@ -152,11 +154,15 @@ public CascadingReindexingTemplate(
}
this.tuningConfig = tuningConfig;

if (skipOffsetFromNow != null && skipOffsetFromLatest != null) {
throw InvalidInput.exception("Cannot set both skipOffsetFromNow and skipOffsetFromLatest");
int skipOffsetCount = (skipOffsetFromNow != null ? 1 : 0)
+ (skipOffsetFromLatest != null ? 1 : 0)
+ (skipOffsetFromEarliest != null ? 1 : 0);
if (skipOffsetCount > 1) {
throw InvalidInput.exception("Cannot set more than one of: skipOffsetFromNow, skipOffsetFromLatest, skipOffsetFromEarliest");
}
this.skipOffsetFromNow = skipOffsetFromNow;
this.skipOffsetFromLatest = skipOffsetFromLatest;
this.skipOffsetFromEarliest = skipOffsetFromEarliest;

this.defaultPartitioningRule = ReindexingPartitioningRule.syntheticRule(
defaultSegmentGranularity,
Expand Down Expand Up @@ -220,6 +226,14 @@ public Period getSkipOffsetFromLatest()
return skipOffsetFromLatest;
}

@Override
@JsonProperty
@Nullable
public Period getSkipOffsetFromEarliest()
{
return skipOffsetFromEarliest;
}

@JsonProperty
@Nullable
private Period getSkipOffsetFromNow()
Expand Down Expand Up @@ -366,7 +380,7 @@ public List<CompactionJob> createCompactionJobs(
}

// Skip offsets, if configured, can result in needing to truncate a search interval. If the truncation makes the interval invalid, skip it.
if ((skipOffsetFromNow != null || skipOffsetFromLatest != null) &&
if ((skipOffsetFromNow != null || skipOffsetFromLatest != null || skipOffsetFromEarliest != null) &&
intervalEndsAfter(reindexingInterval, adjustedTimelineInterval.getEnd())) {

DateTime alignedEnd = intervalInfo.getGranularity().bucketStart(adjustedTimelineInterval.getEnd());
Expand Down Expand Up @@ -422,30 +436,36 @@ protected CompactionJobTemplate createJobTemplateForInterval(
}

/**
* Applies the configured skip offset to an interval by adjusting its end time. Uses either
* skipOffsetFromNow (relative to reference time) or skipOffsetFromLatest (relative to interval end).
* Returns null if the adjusted end would be before the interval start.
* Applies the configured skip offset to an interval by adjusting its start or end time. Uses either
* skipOffsetFromNow (relative to reference time), skipOffsetFromLatest (relative to interval end),
* or skipOffsetFromEarliest (relative to interval start).
* Returns null if the adjusted interval would be invalid.
*
* @param interval the interval to adjust
* @param skipFromNowReferenceTime the reference time for skipOffsetFromNow calculation
* @return the interval with adjusted end time, or null if the result would be invalid
* @return the interval with adjusted boundaries, or null if the result would be invalid
*/
@Nullable
private Interval applySkipOffset(
Interval interval,
DateTime skipFromNowReferenceTime
)
{
DateTime maybeAdjustedStart = interval.getStart();
DateTime maybeAdjustedEnd = interval.getEnd();

if (skipOffsetFromNow != null) {
maybeAdjustedEnd = skipFromNowReferenceTime.minus(skipOffsetFromNow);
} else if (skipOffsetFromLatest != null) {
maybeAdjustedEnd = maybeAdjustedEnd.minus(skipOffsetFromLatest);
} else if (skipOffsetFromEarliest != null) {
maybeAdjustedStart = maybeAdjustedStart.plus(skipOffsetFromEarliest);
}
if (maybeAdjustedEnd.isBefore(interval.getStart())) {

if (maybeAdjustedEnd.isBefore(maybeAdjustedStart) || maybeAdjustedEnd.equals(maybeAdjustedStart)) {
return null;
} else {
return new Interval(interval.getStart(), maybeAdjustedEnd);
return new Interval(maybeAdjustedStart, maybeAdjustedEnd);
}
}

Expand All @@ -458,7 +478,8 @@ private InlineSchemaDataSourceCompactionConfig.Builder createBaseBuilder()
.withInputSegmentSizeBytes(inputSegmentSizeBytes)
.withEngine(CompactionEngine.MSQ)
.withTaskContext(taskContext)
.withSkipOffsetFromLatest(Period.ZERO); // We handle skip offsets at the timeline level, we know we want to cover the entirety of the interval
.withSkipOffsetFromLatest(Period.ZERO) // We handle skip offsets at the timeline level
.withSkipOffsetFromEarliest(Period.ZERO); // We handle skip offsets at the timeline level
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void test_serde() throws Exception
ImmutableMap.of("context_key", "context_value"),
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -148,6 +149,7 @@ public void test_serde_asDataSourceCompactionConfig() throws Exception
ImmutableMap.of("key", "value"),
null,
null,
null,
Granularities.HOUR,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -185,6 +187,7 @@ public void test_createCompactionJobs_ruleProviderNotReady()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -213,6 +216,7 @@ public void test_constructor_setBothSkipOffsetStrategiesThrowsException()
mockProvider,
null,
Period.days(7), // skipOffsetFromLatest
null, // skipOffsetFromEarliest
Period.days(3), // skipOffsetFromNow
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
Expand All @@ -221,7 +225,7 @@ public void test_constructor_setBothSkipOffsetStrategiesThrowsException()
)
);

Assertions.assertEquals("Cannot set both skipOffsetFromNow and skipOffsetFromLatest", exception.getMessage());
Assertions.assertTrue(exception.getMessage().contains("Cannot set more than one of"));
EasyMock.verify(mockProvider);
}

Expand All @@ -241,6 +245,7 @@ public void test_constructor_nullDataSourceThrowsException()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand All @@ -265,6 +270,7 @@ public void test_constructor_nullRuleProviderThrowsException()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand All @@ -291,6 +297,7 @@ public void test_constructor_nullDefaultSegmentGranularityThrowsException()
null,
null,
null,
null,
null, // null defaultSegmentGranularity
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -322,6 +329,7 @@ public void test_constructor_tuningConfigWithPartitionsSpecThrowsException()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -557,6 +565,7 @@ public void test_generateAlignedSearchIntervals_withGranularityAlignment()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -647,6 +656,7 @@ public void test_generateAlignedSearchIntervals_withNonPartitioningRuleSplits()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -742,6 +752,7 @@ public void test_generateAlignedSearchIntervals_withNoPartitioningRules()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -838,6 +849,7 @@ public void test_generateAlignedSearchIntervals_prependIntervalForShortNonPartit
null,
null,
null,
null,
Granularities.HOUR,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -946,6 +958,7 @@ public void test_generateAlignedSearchIntervals()
null,
null,
null,
null,
Granularities.HOUR,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1017,6 +1030,7 @@ public void test_generateAlignedSearchIntervals_noRulesThrowsException()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1084,6 +1098,7 @@ public void test_generateAlignedSearchIntervals_splitPointSnapsToExistingBoundar
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1152,6 +1167,7 @@ public void test_generateAlignedSearchIntervals_prependAlignmentDoesNotExtendTim
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1224,6 +1240,7 @@ public void test_generateAlignedSearchIntervals_duplicateSplitPointsFiltered()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1288,6 +1305,7 @@ public void test_generateAlignedSearchIntervals_singleRuleOnly()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1355,6 +1373,7 @@ public void test_generateAlignedSearchIntervals_zeroPeriodRuleAppliesImmediately
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1447,6 +1466,7 @@ public void test_generateAlignedSearchIntervals_zeroPeriodRuleWithOtherRules()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1519,6 +1539,7 @@ public void test_generateAlignedSearchIntervals_failsWhenDefaultGranularityIsCoa
null,
null,
null,
null,
Granularities.MONTH, // MONTH is coarser than HOUR!
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1582,6 +1603,7 @@ public void test_generateAlignedSearchIntervals_failsWhenOlderRuleHasFinerGranul
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
null,
Expand Down Expand Up @@ -1629,6 +1651,7 @@ public void test_generateAlignedSearchIntervals_defaultPartitioningVirtualColumn
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
defaultVCs,
Expand Down Expand Up @@ -1689,6 +1712,7 @@ public void test_generateAlignedSearchIntervals_defaultPartitioningVirtualColumn
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
defaultVCs,
Expand Down Expand Up @@ -1746,6 +1770,7 @@ public void test_serde_withDefaultPartitioningVirtualColumns() throws Exception
ImmutableMap.of("context_key", "context_value"),
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(5000000, null),
defaultVCs,
Expand Down Expand Up @@ -1788,6 +1813,7 @@ public void test_validate_returnsValid_withDynamicPartitionsSpec()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(null, null),
null,
Expand All @@ -1809,6 +1835,7 @@ public void test_validate_returnsInvalid_withHashedPartitionsSpec()
null,
null,
null,
null,
Granularities.DAY,
new HashedPartitionsSpec(null, 3, null),
null,
Expand All @@ -1834,6 +1861,7 @@ public void test_validate_returnsInvalid_withMaxTotalRows()
null,
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(null, 1000L),
null,
Expand All @@ -1859,6 +1887,7 @@ public void test_validate_returnsInvalid_withOneMaxNumTasks()
Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1),
null,
null,
null,
Granularities.DAY,
new DynamicPartitionsSpec(null, null),
null,
Expand Down
Loading
Loading