IGNITE-14777 window functions support#12096
Conversation
9b522ec to
f04ed9f
Compare
f04ed9f to
dbf1e84
Compare
|
@oleg-zinovev, I've partially reviewed your PR. Review not completed yet, but I have some comments to publish. Also there are a lot of codestyle violations. Please read the https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines article about Ignite codestyle. Most of the problems can be detected automatically (for example using command:
|
| * - project removing constants | ||
| */ | ||
| @Value.Enclosing | ||
| public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule { |
There was a problem hiding this comment.
It's a goal of AccumulatorsFactory in/out adapters to provide rows suitable for accumulator. Let's use this logic instead of creating addional rel ops.
There was a problem hiding this comment.
The implementation of aggregates in Ignite relies on the indices of values in the row instead of evaluating expressions in AggregateCall#rexList. In this case, constants must be present in the row before it is processed by the aggregate.
If we want to eliminate a separate rule that implements this using projections, I can suggest one of the following options:
-
Redesign AccumulatorsFactory so that it uses AggregateCall#rexList instead of AggregateCall#argList when computing the aggregate, and replace RexInputRef for window constants with RexLiteral in the WindowConverterRule.
-
Add projection of constants when computing window functions so that the row passed to accumulators contains window constants in the required positions. However, in this case, the implementation of WindowNode will include logic that already exists in ProjectionNode.
-
Implement a separate factory for aggregates (or add new methods in the current one) that works directly with RexWinAggCall. In this case, AccumulatorsFactory (or the new factory for window functions) will need to handle a set of RexNode operands, but this will not affect the current logic for regular AggregateCall. Also, this would allow abandoning the transformation of RexWinAggCall into AggregateCall and, thus, avoid copying groups in WindowConverterRule. However, in this scenario, I might need to change the visibility of classes in the package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.
Let me know if you'd like to propose a more robust solution.
There was a problem hiding this comment.
First approach looks intresting, I think it's most performant solution. But If it brings a lot of complexity to the current patch I think we can keep current approach, and fix it later by different ticket.
There was a problem hiding this comment.
@alex-plekhanov
Hi,
Could you please take a look at commit e9f9a37?
I removed projection execution as a separate step during query planning (ProjectWindowConstantsRule). Projections are now applied directly during window function evaluation.
However, I have a couple of concerns:
-
The number of projections will now depend on how many window functions are invoked
-
Re-scanning a window partition will require recomputing the projections
I think I can address the first issue by refactoring WindowFunctionFactory so that a single projection is shared across all functions within a group.
But I’m not sure what the best approach is for the second one, other than introducing a cache of already projected rows—which would increase memory usage during execution.
There was a problem hiding this comment.
As far as I understand now each calculation of each aggregate call will apply project, which will create the new row and copy input row to this row. In this case it's even more resource consuming than ProjectWindowConstantsRule. Maybe it's better to revert to ProjectWindowConstantsRule.
In my opinion right way to implement this is to provide some list of precompiled functions (columns projections List<Function<Row, T>>) to AbstractWindowFunction (and AbstractAccumulator) and implement get method in these classes like:
(T)projects.get(arguments().get(idx)).apply(row)
instead of:
(T)hnd.get(arguments().get(idx), row)
In this case no additional rows will be created and only required column modifications will be applied.
But let's do it later, by another ticket, this ticket is already too complex.
There was a problem hiding this comment.
Hi,
The exact same idea came to my mind about an hour after the commit... :)
I agree, this approach looks like the better one, although it will require some rework in the accumulators.
| partition = partitionFactory.get(); | ||
| } | ||
| else if (prevRow != null && partCmp != null && partCmp.compare(prevRow, row) != 0) { | ||
| partition.drainTo(rowFactory, outBuf); |
There was a problem hiding this comment.
- This operation can block the thread for a long time.
- Large amount of rows can be stored in outBuf, in worth case there will be 2x input rows count (in partition and in outBuf)
Consider pushing directly to downstream. Drain (and push) only requested amount and postpone next pushes until next request.
There was a problem hiding this comment.
Let's fix it with another ticket, it's not a blocker but complicate the implementation
| doPush(); | ||
| } | ||
| else | ||
| nodeMemoryTracker.onRowAdded(row); |
There was a problem hiding this comment.
Can be false positive memory limit exceed error. Tracker need to be reset not only on rewindInternal, but also when partition is fully drained.
Test required for new node (see MemoryQuotasIntegrationTest)
| Supplier<WindowPartition<Row>> partitionFactory, | ||
| RowHandler.RowFactory<Row> rowFactory | ||
| ) { | ||
| super(ctx, rowType, DFLT_ROW_OVERHEAD); |
There was a problem hiding this comment.
Memory tracker row overhead depends at least on count of aggregates (maybe also kinds of aggregates)
| register(SqlStdOperatorTable.BIT_XOR); | ||
|
|
||
| // Window specific operations | ||
| register(SqlStdOperatorTable.ROW_NUMBER); |
There was a problem hiding this comment.
Smoke test for each operand required in StdSqlOperatorsTest
This comment was marked as outdated.
This comment was marked as outdated.
dbf1e84 to
9f135fa
Compare
This comment was marked as outdated.
This comment was marked as outdated.
…g into window partition factory call, window exclusion validation
9f135fa to
13dd848
Compare
| else { | ||
| Row offsetRow = frame.get(idx); | ||
| Object val = get(0, offsetRow); | ||
| if (val == null) { |
There was a problem hiding this comment.
Based on the description of LAG/LEAD function (and other database behavior), we have to return a value even if it is NULL. A default value returns only in the case when a row does not exist.
statement ok
CREATE TABLE t_lag_lead(id INTEGER, val INTEGER);
statement ok
INSERT INTO t_lag_lead VALUES (1, 10), (2, NULL), (3, 30);
query IIII
SELECT id, val,
LAG(val, 1, 999) OVER (ORDER BY id),
LEAD(val, 1, 999) OVER (ORDER BY id)
FROM t_lag_lead
ORDER BY id;
----
1 10 999 NULL
2 NULL 10 30
3 30 NULL 999
There was a problem hiding this comment.
Hi.
Fixed except the following:
If the third argument of lag/lead is non-nullable, Calcite changes the return type of the function to non-nullable (org.apache.calcite.sql.fun.SqlLeadLagAggFunction#transformType).
Because of this, your example returns 0 instead of null.
Not entirely sure what the best way to handle this.
There was a problem hiding this comment.
Probably we have to rewrite (add CAST: LAG(val, 1, 999) <=> LAG(val, 1, CAST(999 AS INTEGER)) in the rule or something else; I didn't test it) to support expected behavior. But anyway it is not a crucial point; we can do it in the future.
If the fix is not trivial, just leave a comment (TODO that pointed out of the specific JIRA ticket).
The Ignite community can fix it in the future.
| public static RelCollation mergeCollations(RelCollation collation0, RelCollation collation1) { | ||
| ImmutableBitSet keys = ImmutableBitSet.of(collation0.getKeys()); | ||
| List<RelFieldCollation> fields = U.arrayList(collation0.getFieldCollations()); | ||
| for (RelFieldCollation it : collation1.getFieldCollations()) |
There was a problem hiding this comment.
It's crucial to maintain ORDER BY direction and nulls ordering, also it's crucial to maintain ORDER BY columns order. Here, if PARTITION BY collation and ORDER BY collation is intersected, PARTITION BY direction (default) and columns order are applied instead of ORDER BY direction and columns order.
For example:
SELECT row_number() OVER (PARTITION BY id ORDER BY id DESC) FROM tbl
Inserts sort node with ASC-nulls-first direction
SELECT row_number() OVER (PARTITION BY id, value ORDER BY value, id DESC) FROM tbl
Inserts sort node with 0, 1 columns order.
Let's add these cases to tests.
There was a problem hiding this comment.
Codestyle: Braces should be used for multi-line for statement
There was a problem hiding this comment.
Hi,
According to the docs (https://sql-academy.org/en/guide/sorting-in-windows-functions), for OVER expressions ORDER BY clause only affects the row order within each partition defined by PARTITION BY.
Therefore, sorting by a field that appears in both ORDER BY and PARTITION BY is redundant. Within a single partition, all rows share the same value of that field, so the sort operation receives identical values as input.
I’ll add the required tests to verify the behavior and compare it across different database management systems.
| RelDataTypeFactory.Builder builder = typeFactory.builder(); | ||
|
|
||
| for (int i = 0; i < inputFieldCnt; i++) { | ||
| // add fields from original input, passed through window rel |
There was a problem hiding this comment.
Codestyle: Comments start with an uppercase letter and end with a point.
| builder.add(windowFields.get(i)); | ||
| } | ||
| for (int i = inputFieldCnt; i < newInputFields.size(); i++) { | ||
| // add constants from new input |
There was a problem hiding this comment.
Codestyle: Comments start with an uppercase letter and end with a point.
| builder.add(newInputFields.get(i)); | ||
| } | ||
| for (int i = inputFieldCnt; i < windowFields.size(); i++) { | ||
| // add fields, provided by window |
There was a problem hiding this comment.
Codestyle: Comments start with an uppercase letter and end with a point.
| ); | ||
|
|
||
| /** */ | ||
| Supplier<WindowPartition<Row>> windowPartitionFactory( |
There was a problem hiding this comment.
ExpressionFactory is mostly for RexNode to something convertion. windowPartitionFactory is not directly related to RexNode, so, maybe it's better to use partition factory constructor instead of new ExpressionFactory method.
|
@oleg-zinovev, sorry for delay with review. |
This comment was marked as outdated.
This comment was marked as outdated.
|
|
||
| /** Partition of rows in window function calculation. */ | ||
| public interface WindowPartition<Row> { | ||
|
|
There was a problem hiding this comment.
Codestyle: Redundant line
| F.asList( | ||
| F.asList(1), | ||
| F.asList(2), | ||
| F.asList(1), | ||
| F.asList(2), | ||
| F.asList(3), | ||
| F.asList(1) | ||
| ), |
There was a problem hiding this comment.
I don't like the idea of specifying expected results as parameters. Maybe it's better to describe cases inside executeWindow method? For example, declare method:
private void checkWindow(Window.Group grp, boolean streaming, Object[][] expRes) {
Move content of executeWindow to this method, and call it from executeWIndow like:
checkWindow(rowNumber(), true, new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
checkWindow(rowNumber(), false, new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
checkWindow(countRows(RexWindowBounds.UNBOUNDED_PRECEDING, RexWindowBounds.CURRENT_ROW), false,
new Object[][] {{1}, {2}, {1}, {2}, {3}, {1}});
...
| * - project removing constants | ||
| */ | ||
| @Value.Enclosing | ||
| public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule { |
There was a problem hiding this comment.
First approach looks intresting, I think it's most performant solution. But If it brings a lot of complexity to the current patch I think we can keep current approach, and fix it later by different ticket.
# Conflicts: # modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java # modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
1a3964c to
7285f91
Compare
|
@alex-plekhanov |
| assert !grp.aggCalls.isEmpty(); | ||
| } | ||
|
|
||
| /** */ |
| @SuppressWarnings({"rawtypes", "unchecked"}) | ||
| class RelJson { | ||
| /** */ | ||
| /** */ |
There was a problem hiding this comment.
All these changes to comments with two spaces (/* */) should be reverted to comments with one space.
| /** Window node. */ | ||
| public class WindowNode<Row> extends MemoryTrackingNode<Row> implements SingleNode<Row>, Downstream<Row> { | ||
| /** */ | ||
| /** */ |
There was a problem hiding this comment.
All these changes to comments with two spaces (/* */) should be reverted to comments with one space.
| private final Function<Row, Row> lowerBound; | ||
|
|
||
| /** */ | ||
| /** */ |
There was a problem hiding this comment.
All these changes to comments with two spaces (/* */) should be reverted to comments with one space.
|
|
||
| /** */ | ||
| final class WindowFunctionFactory<Row> extends AccumulatorsFactoryBase<Row> { | ||
| /** */ |
There was a problem hiding this comment.
All these changes to comments with two spaces (/* */) should be reverted to comments with one space.
| if (waiting == 0) { | ||
| waiting = IN_BUFFER_SIZE; | ||
| source().request(IN_BUFFER_SIZE); | ||
| if (!inLoop) { |
There was a problem hiding this comment.
Codestyle: Redundant braces for one line statement.
| for (Window.RexWinAggCall call : grp.aggCalls) { | ||
| for (int i = 0; i < call.operands.size(); i++) { | ||
| RexNode operand = call.operands.get(i); | ||
| if (rexToOrd.containsKey(operand)) { |
There was a problem hiding this comment.
Codestyle: Redundant braces for one line statement.
| public final class WindowFunctions { | ||
|
|
||
| /** Check window group can be processed with streaming partition. */ | ||
| public static boolean streamable(Window.Group group) { |
There was a problem hiding this comment.
Abbreviation should be used for group
| frame.reset(); | ||
| } | ||
|
|
||
| @Override public boolean isStreaming() { |
| accumulators = null; | ||
| } | ||
|
|
||
| @Override public boolean isStreaming() { |
| if (waiting == 0) | ||
| source().request(waiting = IN_BUFFER_SIZE); |
There was a problem hiding this comment.
Maybe it's better to move this under !inLoop condition, to prevent from some recursive calls like:
flush() pushes to downstream
downstream requests new rows
window requests new rows from input
input pushes rows to window and call flush()
flush() pushes to downstream
downstream requests new rows
...
All this in oute stack trace
# Conflicts: # modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
|
@vldpyatkov I found your PR (#12815). I’d like to point out that our solutions can coexist, and I can try to integrate them if either of the PRs gets accepted. I think this could be a nice addition to my implementation :) |
Thank you for submitting the pull request to the Apache Ignite.
In order to streamline the review of the contribution
we ask you to ensure the following steps have been taken:
The Contribution Checklist
The description explains WHAT and WHY was made instead of HOW.
The following pattern must be used:
IGNITE-XXXX Change summarywhereXXXX- number of JIRA issue.(see the Maintainers list)
the
green visaattached to the JIRA ticket (see TC.Bot: Check PR)Notes
If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com #ignite channel.
Description of Changes:
Added support for planning and executing window functions.
Added special window functions: row_number, rank, dense_rank, percent_rank, cume_dist, ntile, nth_value, first_value, last_value, lag, lead.
Provides two modes of window function execution:
Supports specifying integer offsets in either variant (ROWS / RANGE) and time interval offsets for RANGE.
Window Planning:
During query planning, windows are split into separate rels for each group of aggregation functions. Each logical window rel includes a collation that is required for correctly partitioning rows and defining frames when computing the window.
Splitting is done using Calcite’s standard rule, which groups function calls based on the window specification (according to the OVER clause).
After that, constants used in the window are projected to support referencing them in the current implementation of Ignite aggregates. (If constants are used in FOLLOWING/PRECEDING, they are directly substituted into the offset, which helps reduce the number of frame boundary searches.)
An additional planning phase was introduced specifically for window planning. (I couldn't find a suitable existing place for the new rules, so I followed the approach used in Apache Drill.)
Separate Change:
During development, when attempting to upgrade to Calcite 1.39, it was discovered that IgniteTypeFactory#leastRestrictive does not take into account the nullability of the resulting type when merging FLOAT and DOUBLE.
P.S. I'll be appreciate to any feedback