From 7ad01e9433bd94fba24c6b365db777ac3e802a87 Mon Sep 17 00:00:00 2001 From: Divyansh Maheshwari Date: Mon, 15 Jun 2026 15:48:54 +0530 Subject: [PATCH 1/3] FEAT: Server-side pagination and sorting for Jobs page --- internal/pkg/heimdall/job_dal.go | 164 ++++++++++++++++++++++++++++--- internal/pkg/heimdall/result.go | 6 +- web/src/app/api/jobs/jobs.ts | 24 +++-- web/src/modules/Jobs/Helper.tsx | 19 +++- web/src/modules/Jobs/Jobs.tsx | 94 ++++++++++++------ 5 files changed, 249 insertions(+), 58 deletions(-) diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index 0fdc1e30..7c4eaebd 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" _ "embed" + "encoding/base64" "encoding/json" "fmt" "strconv" @@ -56,6 +57,7 @@ var queryJobStatusesSelect string var ( ErrUnknownJobID = fmt.Errorf(`unknown job_id`) + ErrInvalidCursor = fmt.Errorf(`invalid pagination cursor`) insertJobMethod = telemetry.NewMethod("db_connection", "insert_job") getJobMethod = telemetry.NewMethod("db_connection", "get_job") getJobsMethod = telemetry.NewMethod("db_connection", "get_jobs") @@ -90,12 +92,20 @@ var ( Join: `, `, Value: `js.job_status_name in ({{ .Slice }})`, }, - `cursor`: { - Value: `j.system_job_id < $%d`, - }, }, } + // Whitelist of sortable columns, mapping the UI's column key to its SQL + // expression and value type. Anything not listed falls back to + // system_job_id, so the `order_by` value can never be injected into the + // query. Sorts run unindexed today; if a column's sort gets slow, add a + // composite index on (column, system_job_id) to restore the keyset seek. + jobsSortColumns = map[string]sortColumn{ + `id`: {expr: `j.job_id`, isInt: false}, + `created_at`: {expr: `j.created_at`, isInt: true}, + `updated_at`: {expr: `j.updated_at`, isInt: true}, + } + // tag filters use correlated EXISTS — one clause per value, all must match (AND semantics) jobsTagsFilterConfig = map[string]string{ `tags`: `exists (select 1 from job_tags jt where jt.system_job_id = j.system_job_id and jt.job_tag = $%d)`, @@ -230,6 +240,55 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) { const defaultPageSize = 101 +// sortColumn describes a whitelisted sortable column: its SQL expression and +// whether its value is an integer (which affects how the keyset cursor value +// is decoded, since JSON numbers come back as float64). +type sortColumn struct { + expr string + isInt bool +} + +// jobsCursor is the opaque keyset position handed back to clients. It carries +// the last row's sort-column value plus system_job_id as a stable tiebreaker, +// base64-JSON encoded so the wire format can change without breaking callers. +type jobsCursor struct { + Value any `json:"v"` + ID int64 `json:"i"` +} + +func encodeJobsCursor(value any, id int64) string { + // json.Marshal of these primitive types never fails + raw, _ := json.Marshal(jobsCursor{Value: value, ID: id}) + return base64.StdEncoding.EncodeToString(raw) +} + +func decodeJobsCursor(s string) (*jobsCursor, error) { + raw, err := base64.StdEncoding.DecodeString(s) + if err != nil { + return nil, ErrInvalidCursor + } + var c jobsCursor + if err := json.Unmarshal(raw, &c); err != nil { + return nil, ErrInvalidCursor + } + return &c, nil +} + +// appendWhereClause splices an additional condition into the query's WHERE, +// just before the template's trailing ORDER BY, adding the WHERE keyword if the +// query doesn't already have one. +func appendWhereClause(query, clause string) string { + idx := strings.Index(query, "\norder by") + if idx < 0 { + idx = len(query) + } + before, after := query[:idx], query[idx:] + if strings.Contains(before, "where") { + return before + " and\n " + clause + after + } + return before + "\nwhere\n " + clause + after +} + func injectTagsFilter(f *database.Filter, key, existsTemplate string, query string, args []any) (string, []any) { v, ok := (*f)[key] if !ok { @@ -248,15 +307,7 @@ func injectTagsFilter(f *database.Filter, key, existsTemplate string, query stri return query, args } - idx := strings.Index(query, "\norder by") - if idx < 0 { - return query, args - } - before, after, clause := query[:idx], query[idx:], strings.Join(clauses, " and\n ") - if strings.Contains(before, "where") { - return before + " and\n " + clause + after, args - } - return before + "\nwhere\n " + clause + after, args + return appendWhereClause(query, strings.Join(clauses, " and\n ")), args } func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) { @@ -265,7 +316,8 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) defer getJobsMethod.RecordLatency(time.Now()) getJobsMethod.CountRequest() - // extract limit before rendering WHERE clause (it is not a filter condition) + // pagination + sort params drive ORDER BY and the keyset WHERE, not the + // filter conditions, so pull them out before rendering the filter. pageSize := defaultPageSize if v, ok := (*f)[`limit`]; ok { if n, err := strconv.Atoi(v); err == nil && n > 0 { @@ -277,6 +329,36 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) delete(*f, `limit`) } + // resolve the sort column against the whitelist; unknown values fall back to + // the always-indexed system_job_id, so order_by can never be injected. + orderKey := (*f)[`order_by`] + sortCol, sorted := jobsSortColumns[orderKey] + orderExpr := `j.system_job_id` + if sorted { + orderExpr = sortCol.expr + } + delete(*f, `order_by`) + + direction := `desc` + cmp := `<` + if strings.EqualFold((*f)[`direction`], `asc`) { + direction, cmp = `asc`, `>` + } + delete(*f, `direction`) + + // opaque keyset cursor marking the last row of the previous page; absent + // means first page. + var cursor *jobsCursor + if cursorStr := (*f)[`cursor`]; cursorStr != `` { + c, err := decodeJobsCursor(cursorStr) + if err != nil { + getJobsMethod.LogAndCountError(err, "cursor_decode") + return nil, err + } + cursor = c + } + delete(*f, `cursor`) + // open connection sess, err := h.Database.NewSession(false) if err != nil { @@ -295,8 +377,40 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) query, args = injectTagsFilter(f, key, tmpl, query, args) } - // append LIMIT; fetch one extra row to detect whether a next page exists - query = fmt.Sprintf("%s\nlimit $%d", strings.TrimRight(query, "\n; \t"), len(args)+1) + // keyset seek: only rows after the cursor in the chosen order. We compare on + // the sort column with system_job_id as the deterministic tiebreaker, so + // rows sharing a sort value are never dropped or repeated across pages. + if cursor != nil { + if sorted { + value := cursor.Value + if sortCol.isInt { + // JSON numbers decode to float64; restore the integer for compare + fv, ok := value.(float64) + if !ok { + return nil, ErrInvalidCursor + } + value = int64(fv) + } + valIdx := len(args) + 1 + args = append(args, value, cursor.ID) + query = appendWhereClause(query, fmt.Sprintf( + `(%s %s $%d or (%s = $%d and j.system_job_id %s $%d))`, + orderExpr, cmp, valIdx, orderExpr, valIdx, cmp, valIdx+1)) + } else { + query = appendWhereClause(query, fmt.Sprintf(`j.system_job_id %s $%d`, cmp, len(args)+1)) + args = append(args, cursor.ID) + } + } + + // Replace the template's ORDER BY with the chosen sort, keeping system_job_id + // as the tiebreaker. Fetch one extra row to detect whether a next page exists. + orderIdx := strings.Index(query, "\norder by") + if orderIdx < 0 { + orderIdx = len(query) + } + base := strings.TrimRight(query[:orderIdx], "\n; \t") + query = fmt.Sprintf("%s\norder by\n %s %s, j.system_job_id %s\nlimit $%d", + base, orderExpr, direction, direction, len(args)+1) args = append(args, pageSize+1) rows, err := sess.Query(query, args...) @@ -328,10 +442,13 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } + // the extra row signals a next page; trim it and emit a cursor from the last + // row we actually return. rs := &resultset{Data: result} if len(result) > pageSize { + last := result[pageSize-1] rs.HasMore = true - rs.NextCursor = result[pageSize-1].SystemID + rs.NextCursor = encodeJobsCursor(jobSortValue(orderKey, last), last.SystemID) rs.Data = result[:pageSize] } @@ -340,6 +457,21 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } +// jobSortValue returns the value of the sort column for a job, used to build the +// next-page cursor. Mirrors the jobsSortColumns whitelist. +func jobSortValue(orderKey string, j *job.Job) any { + switch orderKey { + case `id`: + return j.ID + case `created_at`: + return j.CreatedAt + case `updated_at`: + return j.UpdatedAt + default: + return j.SystemID + } +} + func (h *Heimdall) getJobStatus(ctx context.Context, j *jobRequest) (any, error) { // Track DB connection for job status operation diff --git a/internal/pkg/heimdall/result.go b/internal/pkg/heimdall/result.go index bf39e19c..caa349c4 100644 --- a/internal/pkg/heimdall/result.go +++ b/internal/pkg/heimdall/result.go @@ -1,7 +1,7 @@ package heimdall type resultset struct { - Data any `json:"data,omitempty"` - HasMore bool `json:"has_more,omitempty"` - NextCursor int64 `json:"next_cursor,omitempty"` + Data any `json:"data,omitempty"` + HasMore bool `json:"has_more,omitempty"` + NextCursor string `json:"next_cursor,omitempty"` } diff --git a/web/src/app/api/jobs/jobs.ts b/web/src/app/api/jobs/jobs.ts index 4e2ba8a4..7a9295f1 100644 --- a/web/src/app/api/jobs/jobs.ts +++ b/web/src/app/api/jobs/jobs.ts @@ -1,17 +1,25 @@ import { API_URL, buildQueryString } from '@/common/Services' -import { ApiParams } from '@/modules/Jobs/Helper' +import { ApiParams, JOBS_PAGE_SIZE, JobsResponse } from '@/modules/Jobs/Helper' -export const fetchJobs = async (params: ApiParams, pageParam?: number) => { - if (pageParam) { - params.page = pageParam.toString() +export const fetchJobs = async ( + params: ApiParams, + cursor: string | null, + sort: { prop: string; flip: boolean }, +): Promise => { + const queryParams: ApiParams = { + ...params, + limit: JOBS_PAGE_SIZE.toString(), + order_by: sort.prop, + direction: sort.flip ? 'desc' : 'asc', } - const queryString = buildQueryString(params) + // keyset pagination: send the opaque cursor for the previous page's last row. + // Omitted on the first page. + if (cursor) queryParams.cursor = cursor + const queryString = buildQueryString(queryParams) const response = await fetch(`${API_URL}/jobs?${queryString}`) - const data = await response.json() - - return data?.data + return await response.json() } export const getJobDetails = async (id: string) => { diff --git a/web/src/modules/Jobs/Helper.tsx b/web/src/modules/Jobs/Helper.tsx index 328a2bda..d81a82cb 100644 --- a/web/src/modules/Jobs/Helper.tsx +++ b/web/src/modules/Jobs/Helper.tsx @@ -18,7 +18,18 @@ export type ApiParams = { cluster?: string command?: string status?: string[] - page?: string + limit?: string + cursor?: string + order_by?: string + direction?: string +} + +export const JOBS_PAGE_SIZE = 20 + +export type JobsResponse = { + data: JobType[] + has_more?: boolean + next_cursor?: string } export type JobType = { @@ -97,6 +108,7 @@ export const useJobConfig = ({ { name: 'name', label: 'Name', + noSort: true, cell: { children: (row: JobType) => { return ( @@ -110,6 +122,7 @@ export const useJobConfig = ({ { name: 'version', label: 'Version', + noSort: true, cell: { children: (row: JobType) => (
@@ -121,6 +134,7 @@ export const useJobConfig = ({ { name: 'user', label: 'User', + noSort: true, cell: { children: (row: JobType) => (
@@ -132,6 +146,7 @@ export const useJobConfig = ({ { name: 'cluster_id', label: 'Cluster ID', + noSort: true, cell: { children: (row: JobType) => (
@@ -143,6 +158,7 @@ export const useJobConfig = ({ { name: 'command_id', label: 'Command ID', + noSort: true, cell: { children: (row: JobType) => (
@@ -180,6 +196,7 @@ export const useJobConfig = ({ { name: 'status', label: 'Status', + noSort: true, cell: { children: (row: JobType) => (
diff --git a/web/src/modules/Jobs/Jobs.tsx b/web/src/modules/Jobs/Jobs.tsx index 33bf285c..263ed6ce 100644 --- a/web/src/modules/Jobs/Jobs.tsx +++ b/web/src/modules/Jobs/Jobs.tsx @@ -8,21 +8,18 @@ import React, { useState, } from 'react' import { + Button, SortByProps, SortColumnProps, StandardTable, } from '@patterninc/react-ui' import { BreadcrumbContext } from '@/common/BreadCrumbsProvider/context' import { fetchJobs, getJobStatus } from '@/app/api/jobs/jobs' -import { useInfiniteQuery, useQuery } from '@tanstack/react-query' -import { ApiParams, JobType, useJobConfig } from './Helper' +import { useQuery } from '@tanstack/react-query' +import { ApiParams, JOBS_PAGE_SIZE, useJobConfig } from './Helper' import { useQueryState } from 'nuqs' import { FilterStatesType } from '@patterninc/react-ui' -import { - noDataAvailable, - noDataAvailableDescription, - sortData, -} from '@/common/Services' +import { noDataAvailable, noDataAvailableDescription } from '@/common/Services' import { AutoRefreshContext } from '@/common/AutoRefreshProvider/context' type FilterType = { @@ -84,22 +81,10 @@ const Jobs = (): React.JSX.Element => { return params }, [jobId, name, user, version, clusterId, commandId, status]) - // Fetch Jobs with filters applied - const { data, isLoading, fetchNextPage, hasNextPage, isSuccess } = - useInfiniteQuery({ - queryKey: ['jobs', filterParams], - queryFn: ({ pageParam }) => fetchJobs(filterParams, pageParam), - getNextPageParam: (lastPage) => lastPage?.nextPage, - enabled: !!filterParams, - initialPageParam: 1, - refetchInterval: refreshInterval.value, - }) - - const { data: jobStatus } = useQuery({ - queryKey: ['jobs'], - queryFn: getJobStatus, - }) + const [pageIndex, setPageIndex] = useState(0) + const [cursors, setCursors] = useState<(string | null)[]>([null]) + // Sort state drives the server-side ORDER BY. `flip` is the descending flag. const [sortBy, setSort] = useState({ prop: 'created_at', flip: true, @@ -113,16 +98,48 @@ const Jobs = (): React.JSX.Element => { prop: obj.activeColumn, flip: obj.direction, }) + setPageIndex(0) + setCursors([null]) } - const jobs = useMemo(() => data?.pages?.flatMap((page) => page) || [], [data]) + // `placeholderData` keeps the previous page visible while the next one loads. + const { data, isLoading, isFetching, isSuccess } = useQuery({ + queryKey: ['jobs', filterParams, sortBy, pageIndex, cursors[pageIndex]], + queryFn: () => fetchJobs(filterParams, cursors[pageIndex] ?? null, sortBy), + enabled: !!filterParams, + refetchInterval: refreshInterval.value, + placeholderData: (prev) => prev, + }) + + const { data: jobStatus } = useQuery({ + queryKey: ['jobs'], + queryFn: getJobStatus, + }) + + const jobs = useMemo(() => data?.data ?? [], [data]) + + const rowsSoFar = pageIndex * JOBS_PAGE_SIZE + jobs.length + const totalResults = data?.has_more ? `${rowsSoFar}+` : `${rowsSoFar}` + + const goToNextPage = useCallback(() => { + const next = data?.next_cursor + if (!next) return + setCursors((prev) => { + const copy = [...prev] + copy[pageIndex + 1] = next + return copy + }) + setPageIndex((i) => i + 1) + }, [data?.next_cursor, pageIndex]) - const sortedData: JobType[] = useMemo(() => { - return sortData(jobs, sortBy) - }, [jobs, sortBy]) + const goToPrevPage = useCallback(() => { + setPageIndex((i) => Math.max(0, i - 1)) + }, []) const updateFilter = useCallback(() => { const queryParams = new URLSearchParams() + setPageIndex(0) + setCursors([null]) setJobId(filter.id) setName(filter.name) setUser(filter.user) @@ -257,6 +274,8 @@ const Jobs = (): React.JSX.Element => { ) const resetFilters = useCallback(() => { + setPageIndex(0) + setCursors([null]) setJobId(null) setName(null) setUser(null) @@ -311,14 +330,12 @@ const Jobs = (): React.JSX.Element => { return (
0} successStatus={isSuccess} - getData={fetchNextPage} - hasMore={!!hasNextPage} loading={isLoading} tableId='tableId' sort={setSortBy} @@ -330,7 +347,7 @@ const Jobs = (): React.JSX.Element => { tableHeaderProps={{ header: { name: 'Results', - value: jobs?.length > 100 ? '100+' : jobs?.length, + value: totalResults, }, pageFilterProps: { filterStates: filters, @@ -342,6 +359,23 @@ const Jobs = (): React.JSX.Element => { }, }} /> +
+ + Page {pageIndex + 1} + +
) } From 7d4f09a189d7270c003f773b9422ad6d54b1eb64 Mon Sep 17 00:00:00 2001 From: Divyansh Maheshwari Date: Mon, 15 Jun 2026 17:07:04 +0530 Subject: [PATCH 2/3] Resolved comments --- internal/pkg/heimdall/job_dal.go | 40 +++++++++----------------------- web/src/app/api/jobs/jobs.ts | 3 +-- 2 files changed, 12 insertions(+), 31 deletions(-) diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index 7c4eaebd..ac20b128 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -95,11 +95,7 @@ var ( }, } - // Whitelist of sortable columns, mapping the UI's column key to its SQL - // expression and value type. Anything not listed falls back to - // system_job_id, so the `order_by` value can never be injected into the - // query. Sorts run unindexed today; if a column's sort gets slow, add a - // composite index on (column, system_job_id) to restore the keyset seek. + // Whitelisted sortable columns -> SQL expr + value type; unknown order_by falls back to system_job_id (no injection). jobsSortColumns = map[string]sortColumn{ `id`: {expr: `j.job_id`, isInt: false}, `created_at`: {expr: `j.created_at`, isInt: true}, @@ -240,17 +236,13 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) { const defaultPageSize = 101 -// sortColumn describes a whitelisted sortable column: its SQL expression and -// whether its value is an integer (which affects how the keyset cursor value -// is decoded, since JSON numbers come back as float64). +// sortColumn is a whitelisted sortable column: SQL expr and whether its value is an integer (for cursor decoding). type sortColumn struct { expr string isInt bool } -// jobsCursor is the opaque keyset position handed back to clients. It carries -// the last row's sort-column value plus system_job_id as a stable tiebreaker, -// base64-JSON encoded so the wire format can change without breaking callers. +// jobsCursor is the opaque keyset position: last row's sort value + system_job_id tiebreaker, base64-JSON encoded. type jobsCursor struct { Value any `json:"v"` ID int64 `json:"i"` @@ -274,9 +266,7 @@ func decodeJobsCursor(s string) (*jobsCursor, error) { return &c, nil } -// appendWhereClause splices an additional condition into the query's WHERE, -// just before the template's trailing ORDER BY, adding the WHERE keyword if the -// query doesn't already have one. +// appendWhereClause inserts a condition into the WHERE, before the trailing ORDER BY (adds WHERE if absent). func appendWhereClause(query, clause string) string { idx := strings.Index(query, "\norder by") if idx < 0 { @@ -316,8 +306,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) defer getJobsMethod.RecordLatency(time.Now()) getJobsMethod.CountRequest() - // pagination + sort params drive ORDER BY and the keyset WHERE, not the - // filter conditions, so pull them out before rendering the filter. + // pull pagination/sort params out before rendering the filter WHERE pageSize := defaultPageSize if v, ok := (*f)[`limit`]; ok { if n, err := strconv.Atoi(v); err == nil && n > 0 { @@ -329,8 +318,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) delete(*f, `limit`) } - // resolve the sort column against the whitelist; unknown values fall back to - // the always-indexed system_job_id, so order_by can never be injected. + // resolve sort column from whitelist; unknown falls back to system_job_id orderKey := (*f)[`order_by`] sortCol, sorted := jobsSortColumns[orderKey] orderExpr := `j.system_job_id` @@ -346,8 +334,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } delete(*f, `direction`) - // opaque keyset cursor marking the last row of the previous page; absent - // means first page. + // decode opaque keyset cursor (absent = first page) var cursor *jobsCursor if cursorStr := (*f)[`cursor`]; cursorStr != `` { c, err := decodeJobsCursor(cursorStr) @@ -377,9 +364,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) query, args = injectTagsFilter(f, key, tmpl, query, args) } - // keyset seek: only rows after the cursor in the chosen order. We compare on - // the sort column with system_job_id as the deterministic tiebreaker, so - // rows sharing a sort value are never dropped or repeated across pages. + // keyset seek: rows after the cursor, sort column + system_job_id tiebreaker (no dropped/repeated rows) if cursor != nil { if sorted { value := cursor.Value @@ -402,8 +387,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } } - // Replace the template's ORDER BY with the chosen sort, keeping system_job_id - // as the tiebreaker. Fetch one extra row to detect whether a next page exists. + // swap in chosen ORDER BY (+ system_job_id tiebreaker); fetch one extra row to detect a next page orderIdx := strings.Index(query, "\norder by") if orderIdx < 0 { orderIdx = len(query) @@ -442,8 +426,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } - // the extra row signals a next page; trim it and emit a cursor from the last - // row we actually return. + // extra row => next page exists; trim it and emit a cursor from the last returned row rs := &resultset{Data: result} if len(result) > pageSize { last := result[pageSize-1] @@ -457,8 +440,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } -// jobSortValue returns the value of the sort column for a job, used to build the -// next-page cursor. Mirrors the jobsSortColumns whitelist. +// jobSortValue returns a job's value for the sort column, for the next-page cursor. func jobSortValue(orderKey string, j *job.Job) any { switch orderKey { case `id`: diff --git a/web/src/app/api/jobs/jobs.ts b/web/src/app/api/jobs/jobs.ts index 7a9295f1..37f3feb2 100644 --- a/web/src/app/api/jobs/jobs.ts +++ b/web/src/app/api/jobs/jobs.ts @@ -12,8 +12,7 @@ export const fetchJobs = async ( order_by: sort.prop, direction: sort.flip ? 'desc' : 'asc', } - // keyset pagination: send the opaque cursor for the previous page's last row. - // Omitted on the first page. + // keyset: send the opaque cursor for the previous page (omitted on first page) if (cursor) queryParams.cursor = cursor const queryString = buildQueryString(queryParams) From a3622609e00c7c8311fecc4ee6d7b4b30a1104cc Mon Sep 17 00:00:00 2001 From: Divyansh Maheshwari Date: Wed, 24 Jun 2026 12:29:04 +0530 Subject: [PATCH 3/3] refactor: extract resolveSortColumns and applyKeysetSeek --- internal/pkg/heimdall/job_dal.go | 105 ++++++++++++++++++++----------- 1 file changed, 68 insertions(+), 37 deletions(-) diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index ac20b128..142be6ae 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -242,6 +242,40 @@ type sortColumn struct { isInt bool } +type resolvedSort struct { + orderKey string + column sortColumn + sorted bool + expr string + direction string + cmp string +} + +func resolveSortColumns(f *database.Filter) resolvedSort { + orderKey := (*f)[`order_by`] + sortCol, sorted := jobsSortColumns[orderKey] + expr := `j.system_job_id` + if sorted { + expr = sortCol.expr + } + delete(*f, `order_by`) + + direction, cmp := `desc`, `<` + if strings.EqualFold((*f)[`direction`], `asc`) { + direction, cmp = `asc`, `>` + } + delete(*f, `direction`) + + return resolvedSort{ + orderKey: orderKey, + column: sortCol, + sorted: sorted, + expr: expr, + direction: direction, + cmp: cmp, + } +} + // jobsCursor is the opaque keyset position: last row's sort value + system_job_id tiebreaker, base64-JSON encoded. type jobsCursor struct { Value any `json:"v"` @@ -300,6 +334,33 @@ func injectTagsFilter(f *database.Filter, key, existsTemplate string, query stri return appendWhereClause(query, strings.Join(clauses, " and\n ")), args } +func applyKeysetSeek(query string, args []any, sort resolvedSort, cursor *jobsCursor) (string, []any, error) { + if cursor == nil { + return query, args, nil + } + + if !sort.sorted { + query = appendWhereClause(query, fmt.Sprintf(`j.system_job_id %s $%d`, sort.cmp, len(args)+1)) + return query, append(args, cursor.ID), nil + } + + value := cursor.Value + if sort.column.isInt { + fv, ok := value.(float64) + if !ok { + return query, args, ErrInvalidCursor + } + value = int64(fv) + } + + valIdx := len(args) + 1 + args = append(args, value, cursor.ID) + query = appendWhereClause(query, fmt.Sprintf( + `(%s %s $%d or (%s = $%d and j.system_job_id %s $%d))`, + sort.expr, sort.cmp, valIdx, sort.expr, valIdx, sort.cmp, valIdx+1)) + return query, args, nil +} + func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) { // Track DB connection for jobs list operation @@ -318,21 +379,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) delete(*f, `limit`) } - // resolve sort column from whitelist; unknown falls back to system_job_id - orderKey := (*f)[`order_by`] - sortCol, sorted := jobsSortColumns[orderKey] - orderExpr := `j.system_job_id` - if sorted { - orderExpr = sortCol.expr - } - delete(*f, `order_by`) - - direction := `desc` - cmp := `<` - if strings.EqualFold((*f)[`direction`], `asc`) { - direction, cmp = `asc`, `>` - } - delete(*f, `direction`) + sort := resolveSortColumns(f) // decode opaque keyset cursor (absent = first page) var cursor *jobsCursor @@ -365,26 +412,10 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } // keyset seek: rows after the cursor, sort column + system_job_id tiebreaker (no dropped/repeated rows) - if cursor != nil { - if sorted { - value := cursor.Value - if sortCol.isInt { - // JSON numbers decode to float64; restore the integer for compare - fv, ok := value.(float64) - if !ok { - return nil, ErrInvalidCursor - } - value = int64(fv) - } - valIdx := len(args) + 1 - args = append(args, value, cursor.ID) - query = appendWhereClause(query, fmt.Sprintf( - `(%s %s $%d or (%s = $%d and j.system_job_id %s $%d))`, - orderExpr, cmp, valIdx, orderExpr, valIdx, cmp, valIdx+1)) - } else { - query = appendWhereClause(query, fmt.Sprintf(`j.system_job_id %s $%d`, cmp, len(args)+1)) - args = append(args, cursor.ID) - } + query, args, err = applyKeysetSeek(query, args, sort, cursor) + if err != nil { + getJobsMethod.LogAndCountError(err, "keyset_seek") + return nil, err } // swap in chosen ORDER BY (+ system_job_id tiebreaker); fetch one extra row to detect a next page @@ -394,7 +425,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } base := strings.TrimRight(query[:orderIdx], "\n; \t") query = fmt.Sprintf("%s\norder by\n %s %s, j.system_job_id %s\nlimit $%d", - base, orderExpr, direction, direction, len(args)+1) + base, sort.expr, sort.direction, sort.direction, len(args)+1) args = append(args, pageSize+1) rows, err := sess.Query(query, args...) @@ -431,7 +462,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) if len(result) > pageSize { last := result[pageSize-1] rs.HasMore = true - rs.NextCursor = encodeJobsCursor(jobSortValue(orderKey, last), last.SystemID) + rs.NextCursor = encodeJobsCursor(jobSortValue(sort.orderKey, last), last.SystemID) rs.Data = result[:pageSize] }