Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 159 additions & 14 deletions internal/pkg/heimdall/job_dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
_ "embed"
"encoding/base64"
"encoding/json"
"fmt"
"strconv"
Expand Down Expand Up @@ -56,6 +57,7 @@ var queryJobStatusesSelect string

var (
ErrUnknownJobID = fmt.Errorf(`unknown job_id`)
ErrInvalidCursor = fmt.Errorf(`invalid pagination cursor`)
insertJobMethod = telemetry.NewMethod("db_connection", "insert_job")
getJobMethod = telemetry.NewMethod("db_connection", "get_job")
getJobsMethod = telemetry.NewMethod("db_connection", "get_jobs")
Expand Down Expand Up @@ -90,12 +92,16 @@ var (
Join: `, `,
Value: `js.job_status_name in ({{ .Slice }})`,
},
`cursor`: {
Value: `j.system_job_id < $%d`,
},
},
}

// Whitelisted sortable columns -> SQL expr + value type; unknown order_by falls back to system_job_id (no injection).
jobsSortColumns = map[string]sortColumn{
`id`: {expr: `j.job_id`, isInt: false},
`created_at`: {expr: `j.created_at`, isInt: true},
`updated_at`: {expr: `j.updated_at`, isInt: true},
}

// tag filters use correlated EXISTS — one clause per value, all must match (AND semantics)
jobsTagsFilterConfig = map[string]string{
`tags`: `exists (select 1 from job_tags jt where jt.system_job_id = j.system_job_id and jt.job_tag = $%d)`,
Expand Down Expand Up @@ -230,6 +236,83 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) {

const defaultPageSize = 101

// sortColumn is a whitelisted sortable column: SQL expr and whether its value is an integer (for cursor decoding).
type sortColumn struct {
expr string
isInt bool
}

type resolvedSort struct {
orderKey string
column sortColumn
sorted bool
expr string
direction string
cmp string
}

func resolveSortColumns(f *database.Filter) resolvedSort {
orderKey := (*f)[`order_by`]
sortCol, sorted := jobsSortColumns[orderKey]
expr := `j.system_job_id`
if sorted {
expr = sortCol.expr
}
delete(*f, `order_by`)

direction, cmp := `desc`, `<`
if strings.EqualFold((*f)[`direction`], `asc`) {
direction, cmp = `asc`, `>`
}
delete(*f, `direction`)

return resolvedSort{
orderKey: orderKey,
column: sortCol,
sorted: sorted,
expr: expr,
direction: direction,
cmp: cmp,
}
}

// jobsCursor is the opaque keyset position: last row's sort value + system_job_id tiebreaker, base64-JSON encoded.
type jobsCursor struct {
Value any `json:"v"`
ID int64 `json:"i"`
}

func encodeJobsCursor(value any, id int64) string {
// json.Marshal of these primitive types never fails
raw, _ := json.Marshal(jobsCursor{Value: value, ID: id})
return base64.StdEncoding.EncodeToString(raw)
}

func decodeJobsCursor(s string) (*jobsCursor, error) {
raw, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return nil, ErrInvalidCursor
}
var c jobsCursor
if err := json.Unmarshal(raw, &c); err != nil {
return nil, ErrInvalidCursor
}
return &c, nil
}

// appendWhereClause inserts a condition into the WHERE, before the trailing ORDER BY (adds WHERE if absent).
func appendWhereClause(query, clause string) string {
idx := strings.Index(query, "\norder by")
if idx < 0 {
idx = len(query)
}
before, after := query[:idx], query[idx:]
if strings.Contains(before, "where") {
return before + " and\n " + clause + after
}
return before + "\nwhere\n " + clause + after
}

func injectTagsFilter(f *database.Filter, key, existsTemplate string, query string, args []any) (string, []any) {
v, ok := (*f)[key]
if !ok {
Expand All @@ -248,15 +331,34 @@ func injectTagsFilter(f *database.Filter, key, existsTemplate string, query stri
return query, args
}

idx := strings.Index(query, "\norder by")
if idx < 0 {
return query, args
return appendWhereClause(query, strings.Join(clauses, " and\n ")), args
}

func applyKeysetSeek(query string, args []any, sort resolvedSort, cursor *jobsCursor) (string, []any, error) {
if cursor == nil {
return query, args, nil
}
before, after, clause := query[:idx], query[idx:], strings.Join(clauses, " and\n ")
if strings.Contains(before, "where") {
return before + " and\n " + clause + after, args

if !sort.sorted {
query = appendWhereClause(query, fmt.Sprintf(`j.system_job_id %s $%d`, sort.cmp, len(args)+1))
return query, append(args, cursor.ID), nil
}
return before + "\nwhere\n " + clause + after, args

value := cursor.Value
if sort.column.isInt {
fv, ok := value.(float64)
if !ok {
return query, args, ErrInvalidCursor
}
value = int64(fv)
}

valIdx := len(args) + 1
args = append(args, value, cursor.ID)
query = appendWhereClause(query, fmt.Sprintf(
`(%s %s $%d or (%s = $%d and j.system_job_id %s $%d))`,
sort.expr, sort.cmp, valIdx, sort.expr, valIdx, sort.cmp, valIdx+1))
return query, args, nil
}

func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) {
Expand All @@ -265,7 +367,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error)
defer getJobsMethod.RecordLatency(time.Now())
getJobsMethod.CountRequest()

// extract limit before rendering WHERE clause (it is not a filter condition)
// pull pagination/sort params out before rendering the filter WHERE
pageSize := defaultPageSize
if v, ok := (*f)[`limit`]; ok {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
Expand All @@ -277,6 +379,20 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error)
delete(*f, `limit`)
}

sort := resolveSortColumns(f)

// decode opaque keyset cursor (absent = first page)
var cursor *jobsCursor
if cursorStr := (*f)[`cursor`]; cursorStr != `` {
c, err := decodeJobsCursor(cursorStr)
if err != nil {
getJobsMethod.LogAndCountError(err, "cursor_decode")
return nil, err
}
cursor = c
}
delete(*f, `cursor`)

// open connection
sess, err := h.Database.NewSession(false)
if err != nil {
Expand All @@ -295,8 +411,21 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error)
query, args = injectTagsFilter(f, key, tmpl, query, args)
}

// append LIMIT; fetch one extra row to detect whether a next page exists
query = fmt.Sprintf("%s\nlimit $%d", strings.TrimRight(query, "\n; \t"), len(args)+1)
// keyset seek: rows after the cursor, sort column + system_job_id tiebreaker (no dropped/repeated rows)
query, args, err = applyKeysetSeek(query, args, sort, cursor)
if err != nil {
getJobsMethod.LogAndCountError(err, "keyset_seek")
return nil, err
}

// swap in chosen ORDER BY (+ system_job_id tiebreaker); fetch one extra row to detect a next page
orderIdx := strings.Index(query, "\norder by")
if orderIdx < 0 {
orderIdx = len(query)
}
base := strings.TrimRight(query[:orderIdx], "\n; \t")
query = fmt.Sprintf("%s\norder by\n %s %s, j.system_job_id %s\nlimit $%d",
base, sort.expr, sort.direction, sort.direction, len(args)+1)
args = append(args, pageSize+1)

rows, err := sess.Query(query, args...)
Expand Down Expand Up @@ -328,10 +457,12 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error)

}

// extra row => next page exists; trim it and emit a cursor from the last returned row
rs := &resultset{Data: result}
if len(result) > pageSize {
last := result[pageSize-1]
rs.HasMore = true
rs.NextCursor = result[pageSize-1].SystemID
rs.NextCursor = encodeJobsCursor(jobSortValue(sort.orderKey, last), last.SystemID)
rs.Data = result[:pageSize]
}

Expand All @@ -340,6 +471,20 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error)

}

// jobSortValue returns a job's value for the sort column, for the next-page cursor.
func jobSortValue(orderKey string, j *job.Job) any {
switch orderKey {
case `id`:
return j.ID
case `created_at`:
return j.CreatedAt
case `updated_at`:
return j.UpdatedAt
default:
return j.SystemID
}
}

func (h *Heimdall) getJobStatus(ctx context.Context, j *jobRequest) (any, error) {

// Track DB connection for job status operation
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/heimdall/result.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package heimdall

type resultset struct {
Data any `json:"data,omitempty"`
HasMore bool `json:"has_more,omitempty"`
NextCursor int64 `json:"next_cursor,omitempty"`
Data any `json:"data,omitempty"`
HasMore bool `json:"has_more,omitempty"`
NextCursor string `json:"next_cursor,omitempty"`
}
23 changes: 15 additions & 8 deletions web/src/app/api/jobs/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import { API_URL, buildQueryString } from '@/common/Services'
import { ApiParams } from '@/modules/Jobs/Helper'
import { ApiParams, JOBS_PAGE_SIZE, JobsResponse } from '@/modules/Jobs/Helper'

export const fetchJobs = async (params: ApiParams, pageParam?: number) => {
if (pageParam) {
params.page = pageParam.toString()
export const fetchJobs = async (
params: ApiParams,
cursor: string | null,
sort: { prop: string; flip: boolean },
): Promise<JobsResponse> => {
const queryParams: ApiParams = {
...params,
limit: JOBS_PAGE_SIZE.toString(),
order_by: sort.prop,
direction: sort.flip ? 'desc' : 'asc',
}
const queryString = buildQueryString(params)
// keyset: send the opaque cursor for the previous page (omitted on first page)
if (cursor) queryParams.cursor = cursor
const queryString = buildQueryString(queryParams)

const response = await fetch(`${API_URL}/jobs?${queryString}`)

const data = await response.json()

return data?.data
return await response.json()
}

export const getJobDetails = async (id: string) => {
Expand Down
19 changes: 18 additions & 1 deletion web/src/modules/Jobs/Helper.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,18 @@ export type ApiParams = {
command?: string
status?: string[]
tags?: string
page?: string
limit?: string
cursor?: string
order_by?: string
direction?: string
}

export const JOBS_PAGE_SIZE = 20

export type JobsResponse = {
data: JobType[]
has_more?: boolean
next_cursor?: string
}

export type TagPair = {
Expand Down Expand Up @@ -125,6 +136,7 @@ export const useJobConfig = ({
{
name: 'name',
label: 'Name',
noSort: true,
cell: {
children: (row: JobType) => {
return (
Expand All @@ -144,6 +156,7 @@ export const useJobConfig = ({
{
name: 'version',
label: 'Version',
noSort: true,
cell: {
children: (row: JobType) => (
<div className={sortBy.prop === 'version' ? 'fw-semi-bold' : ''}>
Expand All @@ -155,6 +168,7 @@ export const useJobConfig = ({
{
name: 'user',
label: 'User',
noSort: true,
cell: {
children: (row: JobType) => (
<div className={sortBy.prop === 'user' ? 'fw-semi-bold' : ''}>
Expand All @@ -166,6 +180,7 @@ export const useJobConfig = ({
{
name: 'cluster_id',
label: 'Cluster ID',
noSort: true,
cell: {
children: (row: JobType) => (
<div className={sortBy.prop === 'cluster_id' ? 'fw-semi-bold' : ''}>
Expand All @@ -177,6 +192,7 @@ export const useJobConfig = ({
{
name: 'command_id',
label: 'Command ID',
noSort: true,
cell: {
children: (row: JobType) => (
<div className={sortBy.prop === 'command_id' ? 'fw-semi-bold' : ''}>
Expand Down Expand Up @@ -214,6 +230,7 @@ export const useJobConfig = ({
{
name: 'status',
label: 'Status',
noSort: true,
cell: {
children: (row: JobType) => (
<div className={sortBy.prop === 'status' ? 'fw-semi-bold' : ''}>
Expand Down
Loading
Loading