diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index 0fdc1e30..142be6ae 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" _ "embed" + "encoding/base64" "encoding/json" "fmt" "strconv" @@ -56,6 +57,7 @@ var queryJobStatusesSelect string var ( ErrUnknownJobID = fmt.Errorf(`unknown job_id`) + ErrInvalidCursor = fmt.Errorf(`invalid pagination cursor`) insertJobMethod = telemetry.NewMethod("db_connection", "insert_job") getJobMethod = telemetry.NewMethod("db_connection", "get_job") getJobsMethod = telemetry.NewMethod("db_connection", "get_jobs") @@ -90,12 +92,16 @@ var ( Join: `, `, Value: `js.job_status_name in ({{ .Slice }})`, }, - `cursor`: { - Value: `j.system_job_id < $%d`, - }, }, } + // Whitelisted sortable columns -> SQL expr + value type; unknown order_by falls back to system_job_id (no injection). + jobsSortColumns = map[string]sortColumn{ + `id`: {expr: `j.job_id`, isInt: false}, + `created_at`: {expr: `j.created_at`, isInt: true}, + `updated_at`: {expr: `j.updated_at`, isInt: true}, + } + // tag filters use correlated EXISTS — one clause per value, all must match (AND semantics) jobsTagsFilterConfig = map[string]string{ `tags`: `exists (select 1 from job_tags jt where jt.system_job_id = j.system_job_id and jt.job_tag = $%d)`, @@ -230,6 +236,83 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) { const defaultPageSize = 101 +// sortColumn is a whitelisted sortable column: SQL expr and whether its value is an integer (for cursor decoding). +type sortColumn struct { + expr string + isInt bool +} + +type resolvedSort struct { + orderKey string + column sortColumn + sorted bool + expr string + direction string + cmp string +} + +func resolveSortColumns(f *database.Filter) resolvedSort { + orderKey := (*f)[`order_by`] + sortCol, sorted := jobsSortColumns[orderKey] + expr := `j.system_job_id` + if sorted { + expr = sortCol.expr + } + delete(*f, `order_by`) + + direction, cmp := `desc`, `<` + if strings.EqualFold((*f)[`direction`], `asc`) { + direction, cmp = `asc`, `>` + } + delete(*f, `direction`) + + return resolvedSort{ + orderKey: orderKey, + column: sortCol, + sorted: sorted, + expr: expr, + direction: direction, + cmp: cmp, + } +} + +// jobsCursor is the opaque keyset position: last row's sort value + system_job_id tiebreaker, base64-JSON encoded. +type jobsCursor struct { + Value any `json:"v"` + ID int64 `json:"i"` +} + +func encodeJobsCursor(value any, id int64) string { + // json.Marshal of these primitive types never fails + raw, _ := json.Marshal(jobsCursor{Value: value, ID: id}) + return base64.StdEncoding.EncodeToString(raw) +} + +func decodeJobsCursor(s string) (*jobsCursor, error) { + raw, err := base64.StdEncoding.DecodeString(s) + if err != nil { + return nil, ErrInvalidCursor + } + var c jobsCursor + if err := json.Unmarshal(raw, &c); err != nil { + return nil, ErrInvalidCursor + } + return &c, nil +} + +// appendWhereClause inserts a condition into the WHERE, before the trailing ORDER BY (adds WHERE if absent). +func appendWhereClause(query, clause string) string { + idx := strings.Index(query, "\norder by") + if idx < 0 { + idx = len(query) + } + before, after := query[:idx], query[idx:] + if strings.Contains(before, "where") { + return before + " and\n " + clause + after + } + return before + "\nwhere\n " + clause + after +} + func injectTagsFilter(f *database.Filter, key, existsTemplate string, query string, args []any) (string, []any) { v, ok := (*f)[key] if !ok { @@ -248,15 +331,34 @@ func injectTagsFilter(f *database.Filter, key, existsTemplate string, query stri return query, args } - idx := strings.Index(query, "\norder by") - if idx < 0 { - return query, args + return appendWhereClause(query, strings.Join(clauses, " and\n ")), args +} + +func applyKeysetSeek(query string, args []any, sort resolvedSort, cursor *jobsCursor) (string, []any, error) { + if cursor == nil { + return query, args, nil } - before, after, clause := query[:idx], query[idx:], strings.Join(clauses, " and\n ") - if strings.Contains(before, "where") { - return before + " and\n " + clause + after, args + + if !sort.sorted { + query = appendWhereClause(query, fmt.Sprintf(`j.system_job_id %s $%d`, sort.cmp, len(args)+1)) + return query, append(args, cursor.ID), nil } - return before + "\nwhere\n " + clause + after, args + + value := cursor.Value + if sort.column.isInt { + fv, ok := value.(float64) + if !ok { + return query, args, ErrInvalidCursor + } + value = int64(fv) + } + + valIdx := len(args) + 1 + args = append(args, value, cursor.ID) + query = appendWhereClause(query, fmt.Sprintf( + `(%s %s $%d or (%s = $%d and j.system_job_id %s $%d))`, + sort.expr, sort.cmp, valIdx, sort.expr, valIdx, sort.cmp, valIdx+1)) + return query, args, nil } func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) { @@ -265,7 +367,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) defer getJobsMethod.RecordLatency(time.Now()) getJobsMethod.CountRequest() - // extract limit before rendering WHERE clause (it is not a filter condition) + // pull pagination/sort params out before rendering the filter WHERE pageSize := defaultPageSize if v, ok := (*f)[`limit`]; ok { if n, err := strconv.Atoi(v); err == nil && n > 0 { @@ -277,6 +379,20 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) delete(*f, `limit`) } + sort := resolveSortColumns(f) + + // decode opaque keyset cursor (absent = first page) + var cursor *jobsCursor + if cursorStr := (*f)[`cursor`]; cursorStr != `` { + c, err := decodeJobsCursor(cursorStr) + if err != nil { + getJobsMethod.LogAndCountError(err, "cursor_decode") + return nil, err + } + cursor = c + } + delete(*f, `cursor`) + // open connection sess, err := h.Database.NewSession(false) if err != nil { @@ -295,8 +411,21 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) query, args = injectTagsFilter(f, key, tmpl, query, args) } - // append LIMIT; fetch one extra row to detect whether a next page exists - query = fmt.Sprintf("%s\nlimit $%d", strings.TrimRight(query, "\n; \t"), len(args)+1) + // keyset seek: rows after the cursor, sort column + system_job_id tiebreaker (no dropped/repeated rows) + query, args, err = applyKeysetSeek(query, args, sort, cursor) + if err != nil { + getJobsMethod.LogAndCountError(err, "keyset_seek") + return nil, err + } + + // swap in chosen ORDER BY (+ system_job_id tiebreaker); fetch one extra row to detect a next page + orderIdx := strings.Index(query, "\norder by") + if orderIdx < 0 { + orderIdx = len(query) + } + base := strings.TrimRight(query[:orderIdx], "\n; \t") + query = fmt.Sprintf("%s\norder by\n %s %s, j.system_job_id %s\nlimit $%d", + base, sort.expr, sort.direction, sort.direction, len(args)+1) args = append(args, pageSize+1) rows, err := sess.Query(query, args...) @@ -328,10 +457,12 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } + // extra row => next page exists; trim it and emit a cursor from the last returned row rs := &resultset{Data: result} if len(result) > pageSize { + last := result[pageSize-1] rs.HasMore = true - rs.NextCursor = result[pageSize-1].SystemID + rs.NextCursor = encodeJobsCursor(jobSortValue(sort.orderKey, last), last.SystemID) rs.Data = result[:pageSize] } @@ -340,6 +471,20 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } +// jobSortValue returns a job's value for the sort column, for the next-page cursor. +func jobSortValue(orderKey string, j *job.Job) any { + switch orderKey { + case `id`: + return j.ID + case `created_at`: + return j.CreatedAt + case `updated_at`: + return j.UpdatedAt + default: + return j.SystemID + } +} + func (h *Heimdall) getJobStatus(ctx context.Context, j *jobRequest) (any, error) { // Track DB connection for job status operation diff --git a/internal/pkg/heimdall/result.go b/internal/pkg/heimdall/result.go index bf39e19c..caa349c4 100644 --- a/internal/pkg/heimdall/result.go +++ b/internal/pkg/heimdall/result.go @@ -1,7 +1,7 @@ package heimdall type resultset struct { - Data any `json:"data,omitempty"` - HasMore bool `json:"has_more,omitempty"` - NextCursor int64 `json:"next_cursor,omitempty"` + Data any `json:"data,omitempty"` + HasMore bool `json:"has_more,omitempty"` + NextCursor string `json:"next_cursor,omitempty"` } diff --git a/web/src/app/api/jobs/jobs.ts b/web/src/app/api/jobs/jobs.ts index 4e2ba8a4..37f3feb2 100644 --- a/web/src/app/api/jobs/jobs.ts +++ b/web/src/app/api/jobs/jobs.ts @@ -1,17 +1,24 @@ import { API_URL, buildQueryString } from '@/common/Services' -import { ApiParams } from '@/modules/Jobs/Helper' +import { ApiParams, JOBS_PAGE_SIZE, JobsResponse } from '@/modules/Jobs/Helper' -export const fetchJobs = async (params: ApiParams, pageParam?: number) => { - if (pageParam) { - params.page = pageParam.toString() +export const fetchJobs = async ( + params: ApiParams, + cursor: string | null, + sort: { prop: string; flip: boolean }, +): Promise => { + const queryParams: ApiParams = { + ...params, + limit: JOBS_PAGE_SIZE.toString(), + order_by: sort.prop, + direction: sort.flip ? 'desc' : 'asc', } - const queryString = buildQueryString(params) + // keyset: send the opaque cursor for the previous page (omitted on first page) + if (cursor) queryParams.cursor = cursor + const queryString = buildQueryString(queryParams) const response = await fetch(`${API_URL}/jobs?${queryString}`) - const data = await response.json() - - return data?.data + return await response.json() } export const getJobDetails = async (id: string) => { diff --git a/web/src/modules/Jobs/Helper.tsx b/web/src/modules/Jobs/Helper.tsx index 3785a1cb..39a308d3 100644 --- a/web/src/modules/Jobs/Helper.tsx +++ b/web/src/modules/Jobs/Helper.tsx @@ -20,7 +20,18 @@ export type ApiParams = { command?: string status?: string[] tags?: string - page?: string + limit?: string + cursor?: string + order_by?: string + direction?: string +} + +export const JOBS_PAGE_SIZE = 20 + +export type JobsResponse = { + data: JobType[] + has_more?: boolean + next_cursor?: string } export type TagPair = { @@ -125,6 +136,7 @@ export const useJobConfig = ({ { name: 'name', label: 'Name', + noSort: true, cell: { children: (row: JobType) => { return ( @@ -144,6 +156,7 @@ export const useJobConfig = ({ { name: 'version', label: 'Version', + noSort: true, cell: { children: (row: JobType) => (
@@ -155,6 +168,7 @@ export const useJobConfig = ({ { name: 'user', label: 'User', + noSort: true, cell: { children: (row: JobType) => (
@@ -166,6 +180,7 @@ export const useJobConfig = ({ { name: 'cluster_id', label: 'Cluster ID', + noSort: true, cell: { children: (row: JobType) => (
@@ -177,6 +192,7 @@ export const useJobConfig = ({ { name: 'command_id', label: 'Command ID', + noSort: true, cell: { children: (row: JobType) => (
@@ -214,6 +230,7 @@ export const useJobConfig = ({ { name: 'status', label: 'Status', + noSort: true, cell: { children: (row: JobType) => (
diff --git a/web/src/modules/Jobs/Jobs.tsx b/web/src/modules/Jobs/Jobs.tsx index c4bf4cb6..2b8041ea 100644 --- a/web/src/modules/Jobs/Jobs.tsx +++ b/web/src/modules/Jobs/Jobs.tsx @@ -8,16 +8,17 @@ import React, { useState, } from 'react' import { + Button, SortByProps, SortColumnProps, StandardTable, } from '@patterninc/react-ui' import { BreadcrumbContext } from '@/common/BreadCrumbsProvider/context' import { fetchJobs, getJobStatus } from '@/app/api/jobs/jobs' -import { useInfiniteQuery, useQuery } from '@tanstack/react-query' +import { useQuery } from '@tanstack/react-query' import { ApiParams, - JobType, + JOBS_PAGE_SIZE, TagPair, parseTags, serializeTags, @@ -25,11 +26,7 @@ import { } from './Helper' import { useQueryState } from 'nuqs' import { FilterStatesType } from '@patterninc/react-ui' -import { - noDataAvailable, - noDataAvailableDescription, - sortData, -} from '@/common/Services' +import { noDataAvailable, noDataAvailableDescription } from '@/common/Services' import { AutoRefreshContext } from '@/common/AutoRefreshProvider/context' import TagFilter from './TagFilter' @@ -101,22 +98,10 @@ const Jobs = (): React.JSX.Element => { return params }, [jobId, name, user, version, clusterId, commandId, status, tags]) - // Fetch Jobs with filters applied - const { data, isLoading, fetchNextPage, hasNextPage, isSuccess } = - useInfiniteQuery({ - queryKey: ['jobs', filterParams], - queryFn: ({ pageParam }) => fetchJobs(filterParams, pageParam), - getNextPageParam: (lastPage) => lastPage?.nextPage, - enabled: !!filterParams, - initialPageParam: 1, - refetchInterval: refreshInterval.value, - }) - - const { data: jobStatus } = useQuery({ - queryKey: ['jobs'], - queryFn: getJobStatus, - }) + const [pageIndex, setPageIndex] = useState(0) + const [cursors, setCursors] = useState<(string | null)[]>([null]) + // Sort state drives the server-side ORDER BY. `flip` is the descending flag. const [sortBy, setSort] = useState({ prop: 'created_at', flip: true, @@ -130,16 +115,48 @@ const Jobs = (): React.JSX.Element => { prop: obj.activeColumn, flip: obj.direction, }) + setPageIndex(0) + setCursors([null]) } - const jobs = useMemo(() => data?.pages?.flatMap((page) => page) || [], [data]) + // `placeholderData` keeps the previous page visible while the next one loads. + const { data, isLoading, isFetching, isSuccess } = useQuery({ + queryKey: ['jobs', filterParams, sortBy, pageIndex, cursors[pageIndex]], + queryFn: () => fetchJobs(filterParams, cursors[pageIndex] ?? null, sortBy), + enabled: !!filterParams, + refetchInterval: refreshInterval.value, + placeholderData: (prev) => prev, + }) + + const { data: jobStatus } = useQuery({ + queryKey: ['jobs'], + queryFn: getJobStatus, + }) + + const jobs = useMemo(() => data?.data ?? [], [data]) - const sortedData: JobType[] = useMemo(() => { - return sortData(jobs, sortBy) - }, [jobs, sortBy]) + const rowsSoFar = pageIndex * JOBS_PAGE_SIZE + jobs.length + const totalResults = data?.has_more ? `${rowsSoFar}+` : `${rowsSoFar}` + + const goToNextPage = useCallback(() => { + const next = data?.next_cursor + if (!next) return + setCursors((prev) => { + const copy = [...prev] + copy[pageIndex + 1] = next + return copy + }) + setPageIndex((i) => i + 1) + }, [data?.next_cursor, pageIndex]) + + const goToPrevPage = useCallback(() => { + setPageIndex((i) => Math.max(0, i - 1)) + }, []) const updateFilter = useCallback(() => { const queryParams = new URLSearchParams() + setPageIndex(0) + setCursors([null]) setJobId(filter.id) setName(filter.name) setUser(filter.user) @@ -279,6 +296,8 @@ const Jobs = (): React.JSX.Element => { ) const resetFilters = useCallback(() => { + setPageIndex(0) + setCursors([null]) setJobId(null) setName(null) setUser(null) @@ -350,14 +369,12 @@ const Jobs = (): React.JSX.Element => { return (
0} successStatus={isSuccess} - getData={fetchNextPage} - hasMore={!!hasNextPage} loading={isLoading} tableId='tableId' sort={setSortBy} @@ -369,7 +386,7 @@ const Jobs = (): React.JSX.Element => { tableHeaderProps={{ header: { name: 'Results', - value: jobs?.length > 100 ? '100+' : jobs?.length, + value: totalResults, }, pageFilterProps: { filterStates: filters, @@ -384,6 +401,23 @@ const Jobs = (): React.JSX.Element => { }, }} /> +
+ + Page {pageIndex + 1} + +
) }