Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions server/src/main/java/org/apache/druid/server/QueryLifecycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,21 @@ public <T> QueryResponse<T> runSimple(
final AuthorizationResult authorizationResult
)
{
initialize(query);
return runSimple(query, authenticationResult, authorizationResult, null);
}

/**
* As {@link #runSimple(Query, AuthenticationResult, AuthorizationResult)}, but takes the user-provided context keys.
* See {@link #initialize(Query, Set)}.
*/
public <T> QueryResponse<T> runSimple(
final Query<T> query,
final AuthenticationResult authenticationResult,
final AuthorizationResult authorizationResult,
@Nullable final Set<String> userProvidedContextKeys
)
{
initialize(query, userProvidedContextKeys);

final Sequence<T> results;

Expand Down Expand Up @@ -212,43 +226,64 @@ public void after(final boolean isDone, final Throwable thrown)
* @throws DruidException if the current state is not NEW, which indicates a bug
*/
public void initialize(final Query<?> baseQuery)
{
initialize(baseQuery, null);
}

/**
* As {@link #initialize(Query)}, but takes the keys the caller set in the context. Pass {@code null} to treat the
* whole context as caller-set (native queries). The SQL layer merges static defaults into the context, so it must
* pass the real user-provided keys.
*
* @throws DruidException if the current state is not NEW, which indicates a bug
*/
public void initialize(final Query<?> baseQuery, @Nullable final Set<String> userProvidedContextKeys)
{
transition(State.NEW, State.INITIALIZED);

userContextKeys = new HashSet<>(baseQuery.getContext().keySet());
final Set<String> effectiveUserKeys =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since it's not really clear from the variable names (userContextKeys vs userProvidedContextKeys), can you add a comment here on the difference between userContextKeys and userProvidedContextKeys

userProvidedContextKeys != null ? userProvidedContextKeys : baseQuery.getContext().keySet();

String queryId = baseQuery.getId();
if (Strings.isNullOrEmpty(queryId)) {
queryId = UUID.randomUUID().toString();
}

// Start with system defaults, apply per-datasource override, then user context wins
// Precedence, high to low: user context > per-datasource per-segment timeout > static defaults > code default.
Map<String, Object> contextWithDefaults = new HashMap<>(queryConfigProvider.getContext());
applyPerDatasourcePerSegmentTimeout(baseQuery, contextWithDefaults, queryId);
Map<String, Object> finalContext = QueryContexts.override(contextWithDefaults, baseQuery.getContext());
applyPerDatasourcePerSegmentTimeout(baseQuery, finalContext, effectiveUserKeys, queryId);
finalContext.put(BaseQuery.QUERY_ID, queryId);

this.baseQuery = baseQuery.withOverriddenContext(finalContext);
this.toolChest = conglomerate.getToolChest(this.baseQuery);
}

/**
* If a per-datasource per-segment timeout is configured, injects it into the context defaults.
* User context (applied later via {@link QueryContexts#override}) will override this if set explicitly.
* In monitorOnly mode, logs the configured timeout but does not inject it.
* If a per-datasource per-segment timeout is configured, injects it into {@code finalContext}, overriding a value
* merged in from static defaults. A perSegmentTimeout the caller set (per {@code userProvidedContextKeys}) is left
* untouched. In monitorOnly mode, logs the configured timeout but does not inject it.
*
* For queries involving multiple datasources (e.g., joins or unions), the timeout from the first matching datasource is applied
* since getTableNames() returns a Set, the match order is non-deterministic.
* For multi-datasource queries (joins/unions), the first matching datasource wins; getTableNames() returns a Set, so
* the match order is non-deterministic.
*/
private void applyPerDatasourcePerSegmentTimeout(
final Query<?> query,
final Map<String, Object> contextWithDefaults,
final Map<String, Object> finalContext,
final Set<String> userProvidedContextKeys,
final String queryId
)
{
if (perSegmentTimeoutConfig.isEmpty()) {
return;
}

// Caller set perSegmentTimeout: respect it.
if (userProvidedContextKeys.contains(QueryContexts.PER_SEGMENT_TIMEOUT_KEY)) {
return;
}

for (String tableName : query.getDataSource().getTableNames()) {
PerSegmentTimeoutConfig dsConfig = perSegmentTimeoutConfig.get(tableName);
if (dsConfig != null) {
Expand All @@ -260,7 +295,7 @@ private void applyPerDatasourcePerSegmentTimeout(
queryId
);
} else {
contextWithDefaults.put(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, dsConfig.getPerSegmentTimeoutMs());
finalContext.put(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, dsConfig.getPerSegmentTimeoutMs());
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class PerSegmentTimeoutInjectionTest
{
Expand Down Expand Up @@ -195,6 +196,47 @@ DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
Assert.assertEquals(2000L, lifecycle.getQuery().context().getPerSegmentTimeout());
}

@Test
public void testPerDatasourceTimeout_staticDefaultInContextNotUserProvided_dynamicWins()
{
// SQL path: a static default is merged into the context but the caller did not set perSegmentTimeout, so the
// dynamic config overrides it.
Map<String, PerSegmentTimeoutConfig> config = Map.of(
DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
);

expectDefaults();

TimeseriesQuery queryWithMergedDefault = baseQuery.withOverriddenContext(
Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, "0")
);

QueryLifecycle lifecycle = createLifecycle(config);
lifecycle.initialize(queryWithMergedDefault, Collections.emptySet());

Assert.assertEquals(5000L, lifecycle.getQuery().context().getPerSegmentTimeout());
}

@Test
public void testPerDatasourceTimeout_userProvidedViaProvenance_userWins()
{
// Same context value, but the caller set perSegmentTimeout, so it wins over the dynamic config.
Map<String, PerSegmentTimeoutConfig> config = Map.of(
DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
);

expectDefaults();

TimeseriesQuery queryWithUserTimeout = baseQuery.withOverriddenContext(
Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 2000L)
);

QueryLifecycle lifecycle = createLifecycle(config);
lifecycle.initialize(queryWithUserTimeout, Set.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY));

Assert.assertEquals(2000L, lifecycle.getQuery().context().getPerSegmentTimeout());
}

private void expectDefaults()
{
EasyMock.expect(queryConfig.getContext()).andReturn(Map.of()).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ private <T> QueryResponse<Object[]> execute(
final QueryResponse<T> results = queryLifecycle.runSimple(
(Query<T>) query,
authenticationResult,
authorizationResult
authorizationResult,
// SQL merges static defaults into the context; pass the user-set keys so dynamic config can override a
// merged-in default but not a value the caller set.
plannerContext.authContextKeys()
);

return mapResultSequence(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ export const QUERY_CONTEXT_COMPLETIONS: JsonCompletionRule[] = [
isObject: true,
completions: [
{ value: 'timeout', documentation: 'Query timeout in milliseconds' },
{
value: 'perSegmentTimeout',
documentation: 'Per-segment processing timeout in milliseconds',
},
{
value: 'priority',
documentation: 'Query priority (higher = more important)',
Expand Down
Loading