Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
515 changes: 515 additions & 0 deletions mysql-test/main/win_streaming.result

Large diffs are not rendered by default.

114 changes: 114 additions & 0 deletions mysql-test/main/win_streaming.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#
# Streaming Window Functions Tests
#

# I will remove these comments when I'm done testing, will just write the # cases I'll consider for testing here.

# Explain per scenario to lock up streaming path
# Explains for non streamable cases, no need to check output

# For now, row_number(), rank(), dense_rank() are streamable.
# should I only test on rank? would it make sense to add EXPLAIN for all?

# Basic streaming (one rank with explain, others only correctness)

# Order by and partition
# Explain with one function to show streamable and index or filesort is
# used
# Show compatible functions also stream
# show results only to prove partition tracking correctness
# windows reusing the main query order (whichever is longer) under its mdev
# incompatible orders materialize
# aggregate functions inside partition lists materialize
# aggregate functions anywhere in the select list materialize
# cases to look harder later (subqueries, expressions)

#test with limit and analyze to show we read only rows needed

CREATE TABLE t1 (pk INT PRIMARY KEY, a INT, b INT);
INSERT INTO t1 VALUES (1, 1, 3);
INSERT INTO t1 VALUES (2, 1, 1);
INSERT INTO t1 VALUES (3, 2, 2);
INSERT INTO t1 VALUES (4, 2, 4);
INSERT INTO t1 VALUES (5, 3, 1);
INSERT INTO t1 VALUES (6, 3, 2);

# Basic streaming: one rank with explain, others only correctness
SELECT pk, RANK() OVER (ORDER BY pk) AS rnk from t1;
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT pk, RANK() OVER (ORDER BY pk) AS rnk from t1 limit 2;

# question: should i add test for other streamable functions?

# Order by and partition: explain with rank to show streamable
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (PARTITION BY a ORDER BY b) FROM t1;
SELECT row_number() OVER (PARTITION BY a ORDER BY b) as rn FROM t1;
SELECT rank() OVER (PARTITION BY a ORDER BY b) as rnk FROM t1;
SELECT dense_rank() OVER (PARTITION BY a ORDER BY b) as drnk FROM t1;

# Show compatible functions also stream
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (ORDER BY a), dense_rank() OVER (ORDER BY a) FROM t1;
SELECT rank() OVER (ORDER BY a) as rnk, dense_rank() OVER (ORDER BY a) as drnk FROM t1;

# Windows reusing the main query order (whichever is longer)
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (ORDER BY a, b) FROM t1 ORDER BY a;
SELECT rank() OVER (ORDER BY a, b) as rnk FROM t1 ORDER BY a;

--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (ORDER BY a) FROM t1 ORDER BY a, b;
SELECT rank() OVER (ORDER BY a) as rnk FROM t1 ORDER BY a, b;

--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (ORDER BY a) FROM t1 ORDER BY a;
SELECT rank() OVER (ORDER BY a) as rnk FROM t1 ORDER BY a;

# Incompatible orders materialize
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (ORDER BY a), rank() OVER (ORDER BY b) FROM t1;

# Aggregate functions inside partition lists materialize
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (PARTITION BY max(a) ORDER BY b) FROM t1;

# Aggregate functions anywhere in the select list materialize
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT max(a), rank() OVER (ORDER BY b) FROM t1;

# Incompatible RANGE frame materializes
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT count(*) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 5 FOLLOWING) FROM t1;

--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT count(*) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM t1;

# GROUP BY materializes (for now?)
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (ORDER BY a) FROM t1 GROUP BY a;

# Multi-table join
CREATE TABLE t2 (pk INT PRIMARY KEY, c INT);
INSERT INTO t2 VALUES (1, 100);
INSERT INTO t2 VALUES (2, 200);
INSERT INTO t2 VALUES (3, 300);
INSERT INTO t2 VALUES (4, 400);
INSERT INTO t2 VALUES (5, 500);
INSERT INTO t2 VALUES (6, 600);

--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (ORDER BY t1.b) FROM t1 JOIN t2 ON t1.pk = t2.pk;
SELECT rank() OVER (ORDER BY t1.b) as rnk FROM t1 JOIN t2 ON t1.pk = t2.pk;
SELECT rank() OVER (PARTITION BY t1.a ORDER BY t1.b) as rnk FROM t1 JOIN t2 ON t1.pk = t2.pk;
SELECT rank() OVER (ORDER BY t1.b) as rnk FROM t1 LEFT JOIN t2 ON t1.pk = t2.pk;

DROP TABLE t2;

# Subquery in FROM
--source include/explain-no-costs.inc
EXPLAIN FORMAT=JSON SELECT rank() OVER (ORDER BY a) as rnk FROM (SELECT a FROM t1 WHERE a > 1) derived;

# Cases to look harder later (subqueries, expressions)

DROP TABLE t1;
2 changes: 2 additions & 0 deletions sql/item_sum.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ class Item_sum :public Item_func_or_sum
Item_sum(THD *thd, Item_sum *item);
enum Type type() const override { return SUM_FUNC_ITEM; }
virtual enum Sumfunctype sum_func () const=0;
virtual inline bool is_streamable() const { return false; }
bool is_aggr_sum_func()
{
switch (sum_func()) {
Expand Down Expand Up @@ -895,6 +896,7 @@ class Item_sum_count :public Item_sum_int
bool add() override;
void cleanup() override;
void remove() override;
inline bool is_streamable() const override { return true; }

public:
Item_sum_count(THD *thd, Item *item_par):
Expand Down
6 changes: 6 additions & 0 deletions sql/item_windowfunc.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ class Item_sum_row_number: public Item_sum_int
return name;
}

inline bool is_streamable() const override { return true; }

protected:
Item *shallow_copy(THD *thd) const override
{ return get_item_copy<Item_sum_row_number>(thd, this); }
Expand Down Expand Up @@ -215,6 +217,8 @@ class Item_sum_rank: public Item_sum_int
return name;
}

inline bool is_streamable() const override { return true; }

void setup_window_func(THD *thd, Window_spec *window_spec) override;

void cleanup() override
Expand Down Expand Up @@ -290,6 +294,8 @@ class Item_sum_dense_rank: public Item_sum_int
return name;
}

inline bool is_streamable() const override { return true; }

void setup_window_func(THD *thd, Window_spec *window_spec) override;

void cleanup() override
Expand Down
2 changes: 1 addition & 1 deletion sql/sql_lex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3793,7 +3793,7 @@ uint st_select_lex::get_cardinality_of_ref_ptrs_slice(uint order_group_num_arg)
select_n_where_fields * winfunc_factor +
order_group_num * 2 * winfunc_factor +
hidden_bit_fields +
fields_in_window_functions + 1;
fields_in_window_functions + 1; // consider this case for streaming
return n;
}

Expand Down
60 changes: 58 additions & 2 deletions sql/sql_select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/

#include "mariadb.h"
#include "sql_list.h"
#include "sql_priv.h"
#include "unireg.h"
#include "sql_select.h"
Expand Down Expand Up @@ -71,6 +72,9 @@
#include "opt_hints.h"
#include "opt_group_by_cardinality.h"

#include "sql_window.h"
#include "item_windowfunc.h"

/*
A key part number that means we're using a fulltext scan.

Expand Down Expand Up @@ -228,6 +232,8 @@ static enum_nested_loop_state
end_update(JOIN *join, JOIN_TAB *join_tab, bool end_of_records);
static enum_nested_loop_state
end_unique_update(JOIN *join, JOIN_TAB *join_tab, bool end_of_records);
static enum_nested_loop_state
end_compute_win_func(JOIN *join, JOIN_TAB *join_tab, bool end_of_records);

static int join_read_const_table(THD *thd, JOIN_TAB *tab, POSITION *pos);
static int join_read_system(JOIN_TAB *tab);
Expand Down Expand Up @@ -1600,6 +1606,7 @@ JOIN::prepare(TABLE_LIST *tables_init, COND *conds_init, uint og_num,
DBUG_RETURN(-1);
thd->lex->current_select->context_analysis_place= save_place;

// this sets window functions up
if (setup_without_group(thd, ref_ptrs, tables_list,
select_lex->leaf_tables, fields_list,
all_fields, &conds, order, group_list,
Expand All @@ -1608,6 +1615,15 @@ JOIN::prepare(TABLE_LIST *tables_init, COND *conds_init, uint og_num,
&hidden_group_fields))
DBUG_RETURN(-1);

// this needs to decide compatibility with main query sorting if exists
// but the actual setting of which is set before test_if_need_tmp_table()
if (select_lex->n_sum_items == select_lex->window_funcs.elements &&
select_lex->group_list.elements == 0 &&
have_streaming_window_funcs(thd, select_lex->window_funcs,
win_func_longest_order, order,
streaming_wf_order_is_longer))
streamable_window_funcs= true;

/*
Permanently remove redundant parts from the query if
1) This is a subquery
Expand Down Expand Up @@ -3338,8 +3354,14 @@ int JOIN::optimize_stage2()
ORDER BY is computed after the window function computation is done, so
the sort will be done on the temp table.
*/
if (select_lex->have_window_funcs())
if (select_lex->have_window_funcs() && !streamable_window_funcs)
simple_order= FALSE;
// this means the order by should be done in a temp table (it's real purpose
// is checking if order by references only the first non-const table in JOIN)

// i'm not very sure of this, simple_order might change later??
if (!need_tmp && simple_order && streaming_wf_order_is_longer)
order= win_func_longest_order;

/*
If the hint FORCE INDEX FOR ORDER BY/GROUP BY is used for the table
Expand Down Expand Up @@ -3576,6 +3598,25 @@ int JOIN::optimize_stage2()
if (make_aggr_tables_info())
DBUG_RETURN(1);

if (streamable_window_funcs && !need_tmp)
{
JOIN_TAB *last_real_tab= &join_tab[exec_join_tab_cnt() - 1];
// here i would attach the new streamable class (same interface ?)
if (!(last_real_tab->window_funcs_streaming_step=
new Window_funcs_sort_streaming(thd)))
DBUG_RETURN(true);
// this sets up the list, and the partition and group tracking
if (last_real_tab->window_funcs_streaming_step->setup(
select_lex->window_funcs))
DBUG_RETURN(true);
// i need to make SURE THAT END_SEND is not assigned to last table after
// this, this is very important
last_real_tab->next_select=
end_compute_win_func; // calls process_row and end_send
/* Count that we're using window functions. */
status_var_increment(thd->status_var.feature_window_functions);
}

init_join_cache_and_keyread();

if (init_range_rowid_filters())
Expand Down Expand Up @@ -4327,7 +4368,7 @@ bool JOIN::make_aggr_tables_info()
- duplicate value removal
Both of these operations are done after window function computation step.
*/
if (select_lex->window_funcs.elements)
if (select_lex->window_funcs.elements && need_tmp)
{
curr_tab= join_tab + total_join_tab_cnt();
if (!(curr_tab->window_funcs_step= new Window_funcs_computation))
Expand Down Expand Up @@ -24949,6 +24990,7 @@ evaluate_join_record(JOIN *join, JOIN_TAB *join_tab,
{
enum enum_nested_loop_state rc;
/* A match from join_tab is found for the current partial join. */
// this is the loop
rc= (*join_tab->next_select)(join, join_tab+1, 0);
join->thd->get_stmt_da()->inc_current_row_for_warning();
if (rc != NESTED_LOOP_OK && rc != NESTED_LOOP_NO_MORE_ROWS)
Expand Down Expand Up @@ -26162,6 +26204,20 @@ end_send(JOIN *join, JOIN_TAB *join_tab, bool end_of_records)
DBUG_RETURN(NESTED_LOOP_OK);
}

enum_nested_loop_state end_compute_win_func(JOIN *join, JOIN_TAB *join_tab,
bool end_of_records)
{
// this show call process_row with the current row, and the list of window
// functions, process row runs cursors for wfs on the current row (will
// partition trackers work?)
// Then end_send would call the window_func()->val_*() so we need phase
// computation to read the live value
// we don't even need to pass the row to the window function, because the
// add() functions read from the TABLE::record[0] directly, as we did
// NOT call split_sum_func(), so we still point to base table
(join_tab - 1)->window_funcs_streaming_step->process_row();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process_rows() return value is ignored: potential error or KILLed query status is swallowed, add a check please

return end_send(join, join_tab, end_of_records);
}

/*
@brief
Expand Down
30 changes: 25 additions & 5 deletions sql/sql_select.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ enum join_type { JT_UNKNOWN,JT_SYSTEM,JT_CONST,JT_EQ_REF,JT_REF,JT_MAYBE_REF,

class JOIN;

class Window_funcs_sort_streaming;

enum enum_nested_loop_state
{
NESTED_LOOP_KILLED= -2, NESTED_LOOP_ERROR= -1,
Expand Down Expand Up @@ -533,6 +535,12 @@ typedef struct st_join_table {
*/
Window_funcs_computation* window_funcs_step;

/*
Non-NULL value means this join_tab (last real table) must do stream window
function computation before sending
*/
Window_funcs_sort_streaming *window_funcs_streaming_step;

/**
List of topmost expressions in the select list. The *next* JOIN_TAB
in the plan should use it to obtain correct values. Same applicable to
Expand Down Expand Up @@ -1753,9 +1761,20 @@ class JOIN :public Sql_alloc
*/
Sql_cmd_dml *sql_cmd_dml;

/*
True if the query has window functions passing the streaming criteria,
defined by have_streaming_window_funcs()
Note: this does not guarantee they will be streamed, if the query requires
a temp table for any other reason, the window functions follow the
materialization path.
*/
bool streamable_window_funcs= false;
ORDER *win_func_longest_order= NULL;
bool streaming_wf_order_is_longer= false;

JOIN(THD *thd_arg, List<Item> &fields_arg, ulonglong select_options_arg,
select_result *result_arg)
:fields_list(fields_arg)
: fields_list(fields_arg)
{
init(thd_arg, fields_arg, select_options_arg, result_arg);
}
Expand Down Expand Up @@ -1905,11 +1924,12 @@ class JOIN :public Sql_alloc
bool test_if_need_tmp_table()
{
return ((const_tables != table_count &&
((select_distinct || !simple_order || !simple_group) ||
(group_list && order) ||
MY_TEST(select_options & OPTION_BUFFER_RESULT))) ||
((select_distinct || !simple_order || !simple_group) ||
(group_list && order) ||
MY_TEST(select_options & OPTION_BUFFER_RESULT))) ||
(rollup.state != ROLLUP::STATE_NONE && select_distinct) ||
select_lex->have_window_funcs());
(select_lex->have_window_funcs() &&
(!streamable_window_funcs || only_const_tables())));
}
bool choose_subquery_plan(table_map join_tables);
void get_partial_cost_and_fanout(int end_tab_idx,
Expand Down
Loading
Loading