MDEV-38970: Streaming window functions step 1#5267
Conversation
and criteria validation function
There was a problem hiding this comment.
Code Review
This pull request introduces support for streaming window functions, allowing certain functions like row number, rank, and dense rank to be computed on the fly without materializing into a temporary table. The code review feedback highlights several critical issues, including potential null pointer dereferences (crashes) due to missing checks on partition_list and order_list, a logic bug in compare_order_lists when handling trailing constants, and an ignored return value in cursor setup. Additionally, the reviewer suggests caching the THD pointer to avoid expensive thread-local lookups in the performance-critical per-row execution path, and resolving an inconsistency in how default versus explicit frames are handled.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| static int compare_order_lists(ORDER *list1, int spec_number1, ORDER *list2, | ||
| int spec_number2) | ||
| { | ||
| if (!list1 && !list2) | ||
| return CMP_EQ; | ||
| if (!list1) | ||
| return CMP_LT_C; | ||
| if (!list2) | ||
| return CMP_GT_C; | ||
| ORDER *elem1= list1; | ||
| ORDER *elem2= list2; | ||
| for (; elem1 && elem2; elem1= elem1->next, elem2= elem2->next) | ||
| { | ||
| int cmp; | ||
| // remove all constants as we don't need them for comparision | ||
| while (elem1 && ((*elem1->item)->real_item())->const_item()) | ||
| { | ||
| elem1= elem1->next; | ||
| continue; | ||
| } | ||
|
|
||
| while (elem2 && ((*elem2->item)->real_item())->const_item()) | ||
| { | ||
| elem2= elem2->next; | ||
| continue; | ||
| } | ||
|
|
||
| if (!elem1 || !elem2) | ||
| break; | ||
|
|
||
| if ((cmp= | ||
| compare_order_elements(elem1, spec_number1, elem2, spec_number2))) | ||
| return cmp; | ||
| } | ||
| if (elem1) | ||
| return CMP_GT_C; | ||
| if (elem2) | ||
| return CMP_LT_C; | ||
| return CMP_EQ; | ||
| } |
There was a problem hiding this comment.
The current implementation of compare_order_lists has a bug where trailing constants in either list are not skipped. Because the loop condition is elem1 && elem2, the loop terminates as soon as one list runs out of elements. Any remaining trailing constants in the other list are not skipped, causing the function to incorrectly return CMP_GT_C or CMP_LT_C instead of CMP_EQ. We should restructure the loop to skip constants first before performing any length or element comparisons.
static int compare_order_lists(ORDER *list1, int spec_number1, ORDER *list2,
int spec_number2)
{
ORDER *elem1= list1;
ORDER *elem2= list2;
while (true)
{
while (elem1 && ((*elem1->item)->real_item())->const_item())
elem1= elem1->next;
while (elem2 && ((*elem2->item)->real_item())->const_item())
elem2= elem2->next;
if (!elem1 && !elem2)
return CMP_EQ;
if (!elem1)
return CMP_LT_C;
if (!elem2)
return CMP_GT_C;
int cmp;
if ((cmp= compare_order_elements(elem1, spec_number1, elem2, spec_number2)))
return cmp;
elem1= elem1->next;
elem2= elem2->next;
}
}| static inline bool check_argument_list_aggregation(Window_spec *win_spec) | ||
| { | ||
| for (ORDER *o= win_spec->partition_list->first; o; o= o->next) | ||
| if ((*o->item)->with_sum_func()) | ||
| return true; | ||
|
|
||
| for (ORDER *o= win_spec->order_list->first; o; o= o->next) | ||
| if ((*o->item)->with_sum_func()) | ||
| return true; | ||
|
|
||
| return false; | ||
| } |
There was a problem hiding this comment.
partition_list and order_list can be NULL if the window specification does not contain PARTITION BY or ORDER BY clauses. Accessing win_spec->partition_list->first or win_spec->order_list->first without a null check will cause a server crash (segmentation fault). We must add null checks before accessing these lists.
static inline bool check_argument_list_aggregation(Window_spec *win_spec)
{
if (win_spec->partition_list)
{
for (ORDER *o= win_spec->partition_list->first; o; o= o->next)
if ((*o->item)->with_sum_func())
return true;
}
if (win_spec->order_list)
{
for (ORDER *o= win_spec->order_list->first; o; o= o->next)
if ((*o->item)->with_sum_func())
return true;
}
return false;
}| int win_func_order_elements= | ||
| spec->partition_list->elements + spec->order_list->elements; |
There was a problem hiding this comment.
Similar to the crash vector in check_argument_list_aggregation, spec->partition_list and spec->order_list can be NULL. Accessing spec->partition_list->elements or spec->order_list->elements directly will cause a crash. We should safely handle NULL pointers here.
int win_func_order_elements=
(spec->partition_list ? spec->partition_list->elements : 0) +
(spec->order_list ? spec->order_list->elements : 0);| longest_wf_order= concat_order_lists( | ||
| thd->mem_root, | ||
| win_func_with_longest_order->window_spec->partition_list->first, | ||
| win_func_with_longest_order->window_spec->order_list->first); |
There was a problem hiding this comment.
Similar to the previous crash vectors, partition_list and order_list can be NULL. We must check if they are non-NULL before accessing their first member to avoid a null pointer dereference.
longest_wf_order= concat_order_lists(
thd->mem_root,
win_func_with_longest_order->window_spec->partition_list ?
win_func_with_longest_order->window_spec->partition_list->first : NULL,
win_func_with_longest_order->window_spec->order_list ?
win_func_with_longest_order->window_spec->order_list->first : NULL);|
|
||
| /* Check if we found any error in the window function while adding values | ||
| through cursors. */ | ||
| if (unlikely(current_thd->is_error() || current_thd->is_killed())) |
There was a problem hiding this comment.
Use the cached thd pointer instead of calling current_thd in this performance-critical loop.
if (unlikely(thd->is_error() || thd->is_killed()))References
- Avoid calling thread-local lookups like
current_thdorthd_to_trxin performance-critical paths (such as per-record operations or frequent lookups). Instead, pass down or cache the transaction pointer (trx_t) to reduce overhead.
There was a problem hiding this comment.
Ok, this makes sense as it's called per row. I copied the execution loop (with the comments), so I left it like it is. Will change it now.
There was a problem hiding this comment.
Might be worth changing in the materialization path too
There was a problem hiding this comment.
Let's not touch the existing code at this point, at least this should be done with a separate commit.
just check ordering compatibility across window functions instead
| // we don't even need to pass the row to the window function, because the | ||
| // add() functions read from the TABLE::record[0] directly, as we did | ||
| // NOT call split_sum_func(), so we still point to base table | ||
| (join_tab - 1)->window_funcs_streaming_step->process_row(); |
There was a problem hiding this comment.
process_rows() return value is ignored: potential error or KILLed query status is swallowed, add a check please
| while ((win_func= it++)) | ||
| { | ||
| Group_bound_tracker *tracker= | ||
| new Group_bound_tracker(thd, win_func->window_spec->partition_list); |
There was a problem hiding this comment.
This memory leaks, needs to be deallocated in the destructor or some cleanup method. As an option, you can allocate it on the query arena like new (thd->mem_root) Group_bound_tracker then it be will freed automatically. But it's better to follow the same pattern other window functions classes do.
| // i would skip this now | ||
| List<Cursor_manager> cursor_managers; | ||
| if (get_window_functions_required_cursors(thd, window_functions, | ||
| &cursor_managers)) |
There was a problem hiding this comment.
cursor_managers are not deallocated, other window functions do it via delete_elements()
|
in tests to avoid query text duplication. |
|
|
||
| /* Check if we found any error in the window function while adding values | ||
| through cursors. */ | ||
| if (unlikely(current_thd->is_error() || current_thd->is_killed())) |
There was a problem hiding this comment.
By the way, current_thd is still here despite commit 227812d claims to cache THD
|
|
||
| static | ||
| void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list) | ||
| static void |
There was a problem hiding this comment.
Change of formatting unrelated to our work, please roll it back.
| { | ||
| if (win_func_list->elements == 0) | ||
| return; | ||
|
|
| { | ||
| curr->marker= (MARKER_SORTORDER_CHANGE | | ||
| MARKER_PARTITION_CHANGE | | ||
| curr->marker= (MARKER_SORTORDER_CHANGE | MARKER_PARTITION_CHANGE | |
| while ((win_func= it++)) | ||
| { | ||
| Group_bound_tracker *tracker= | ||
| new Group_bound_tracker(thd, win_func->window_spec->partition_list); |
There was a problem hiding this comment.
tracker needs to be checked for successful allocation
| find_longest_compatible_order(List_iterator_fast<Item_window_func> &it) | ||
| { | ||
| int longest_order_elements= -1; | ||
| Item_window_func *longest, *win_func; |
There was a problem hiding this comment.
I suggest initializing longest to nullptr and then check if it has been set in the loop below.
I thought about this actually (running queries side by side) but thought I would need an optimizer switch just for that, I will try |
This PR is the initial implementation of the streaming window functions path as part of my GSOC project.
It's still under development. I'll update this accordingly during development.
To first highlight what's not there yet and what's not working
Note: Many comments are there just for myself, and some namings are to be changed.
What's been added for now:
Window_funcs_sort_streamingobject, the naming comes from the fact that our criteria can be defined as having only oneWindow_funcs_sortobject only, hence thesortin the name. Contains:compute_window_func)process_rowmethod, intended to iterate the window functions and run for the current row.have_streaming_window_funcs(), a preparation time function to check the criteria for streaming, it checks:(rank() over(max(a)))have_streaming_window_funcs()in preparation time to know if streaming CAN be satisfied (anything else needing a temp table falls back to materialization)window_funcs_streaming_stepis added to theJOIN_TAB, analog towindow_funcs_step, responsible for setting up window functions and holding the state for the trackers and cursors across rows. (it points to aWindow_funcs_sort_streamingobject)end_compute_window_func()is attached to the last real table (last table is always real in this case), which callsprocess_rowon the current row in the join loop, and sends it to the client.Next step:
row_number(), rank() and dense_rank()work as expected, they stream when ordering is compatible, reuse the main query ordering if applicable and work for indexed and non indexed orderings.As I said, this is initial, it has most of what we want to implement for streaming, but needs some cleaning up and some reviewing, as I reused much of the logic.