diff --git a/assets/databases/heimdall/tables/jobs.sql b/assets/databases/heimdall/tables/jobs.sql index 67aa6021..c7ff5fc9 100644 --- a/assets/databases/heimdall/tables/jobs.sql +++ b/assets/databases/heimdall/tables/jobs.sql @@ -21,6 +21,7 @@ create table if not exists jobs alter table jobs add column if not exists store_result_sync boolean not null default false; alter table jobs add column if not exists canceled_by varchar(64) null; +alter table jobs add column if not exists spark_application_id varchar(128) not null default ''; -- Originally had "cancelled_by" column and "cancelling" status, but we aren't british. Whoops. do $$ begin diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index 0fdc1e30..d333e900 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -210,7 +210,7 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) { var jobContext string if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync, - &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil { + &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy, &r.SparkApplicationID); err != nil { if err == sql.ErrNoRows { return nil, ErrUnknownJobID } else { @@ -314,7 +314,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) r := &job.Job{} if err := rows.Scan(&r.SystemID, &r.ID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync, - &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil { + &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy, &r.SparkApplicationID); err != nil { getJobsMethod.LogAndCountError(err, "scan") return nil, err } diff --git a/internal/pkg/heimdall/jobs_async.go b/internal/pkg/heimdall/jobs_async.go index 0f2e82ab..24417950 100644 --- a/internal/pkg/heimdall/jobs_async.go +++ b/internal/pkg/heimdall/jobs_async.go @@ -118,7 +118,7 @@ func (h *Heimdall) runAsyncJob(ctx context.Context, j *job.Job) error { } defer sess.Close() - if _, err := sess.Exec(queryJobStatusUpdate, status.Running, ``, j.SystemID); err != nil { + if _, err := sess.Exec(queryJobStatusUpdate, status.Running, ``, j.SparkApplicationID, j.SystemID); err != nil { return h.updateAsyncJobStatus(j, err) } @@ -154,7 +154,7 @@ func (h *Heimdall) updateAsyncJobStatus(j *job.Job, jobError error) error { } defer sess.Close() - if _, err := sess.Exec(queryJobStatusUpdate, j.Status, j.Error, j.SystemID); err != nil { + if _, err := sess.Exec(queryJobStatusUpdate, j.Status, j.Error, j.SparkApplicationID, j.SystemID); err != nil { // TODO: implement proper logging fmt.Println(`job status update error:`, err) } diff --git a/internal/pkg/heimdall/queries/job/select.sql b/internal/pkg/heimdall/queries/job/select.sql index 3019fcdd..5a2c2f82 100644 --- a/internal/pkg/heimdall/queries/job/select.sql +++ b/internal/pkg/heimdall/queries/job/select.sql @@ -15,7 +15,8 @@ select cl.cluster_id, cl.cluster_name, j.store_result_sync, - j.canceled_by + j.canceled_by, + j.spark_application_id from jobs j left join commands cm on cm.system_command_id = j.job_command_id diff --git a/internal/pkg/heimdall/queries/job/select_jobs.sql b/internal/pkg/heimdall/queries/job/select_jobs.sql index 74b26ac2..6e4b57c2 100644 --- a/internal/pkg/heimdall/queries/job/select_jobs.sql +++ b/internal/pkg/heimdall/queries/job/select_jobs.sql @@ -16,7 +16,8 @@ select cl.cluster_id, cl.cluster_name, j.store_result_sync, - j.canceled_by + j.canceled_by, + j.spark_application_id from jobs j join job_statuses js on js.job_status_id = j.job_status_id diff --git a/internal/pkg/heimdall/queries/job/status_update.sql b/internal/pkg/heimdall/queries/job/status_update.sql index 89786365..484163ce 100644 --- a/internal/pkg/heimdall/queries/job/status_update.sql +++ b/internal/pkg/heimdall/queries/job/status_update.sql @@ -2,6 +2,7 @@ update jobs set job_status_id = $1, job_error = left($2::text, 1024), + spark_application_id = coalesce(nullif($3::text, ''), spark_application_id), updated_at = extract(epoch from now())::int where - system_job_id = $3; + system_job_id = $4; diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index 4e44ab03..5c327364 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -929,6 +929,10 @@ func (e *executionContext) monitorJobAndCollectLogs(ctx context.Context) error { } finalSparkApp = sparkApp + // Capture Spark's runtime application id (used by the Spark History Server) + if id := sparkApp.Status.SparkApplicationID; id != "" { + e.job.SparkApplicationID = id + } state := sparkApp.Status.AppState.State if time.Since(lastReport) >= statusReportInterval || isTerminalState(state) { diff --git a/pkg/object/job/job.go b/pkg/object/job/job.go index 2ca17142..490024b0 100644 --- a/pkg/object/job/job.go +++ b/pkg/object/job/job.go @@ -10,19 +10,20 @@ import ( ) type Job struct { - object.Object `yaml:",inline" json:",inline"` - Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` - IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"` - StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"` - Error string `yaml:"error,omitempty" json:"error,omitempty"` - CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"` - ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"` - CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"` - CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"` - ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"` - ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` - CanceledBy string `yaml:"canceled_by,omitempty" json:"canceled_by,omitempty"` - Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"` + object.Object `yaml:",inline" json:",inline"` + Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` + IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"` + StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"` + Error string `yaml:"error,omitempty" json:"error,omitempty"` + CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"` + ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"` + CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"` + CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"` + ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"` + ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` + CanceledBy string `yaml:"canceled_by,omitempty" json:"canceled_by,omitempty"` + SparkApplicationID string `yaml:"spark_application_id,omitempty" json:"spark_application_id,omitempty"` + Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"` } func (j *Job) Init() error { diff --git a/web/src/modules/Jobs/Helper.tsx b/web/src/modules/Jobs/Helper.tsx index b1c5698a..b5f20bed 100644 --- a/web/src/modules/Jobs/Helper.tsx +++ b/web/src/modules/Jobs/Helper.tsx @@ -39,6 +39,7 @@ export type JobType = { cluster_id: string cluster_name: string canceled_by?: string + spark_application_id?: string error?: string context?: { properties: { diff --git a/web/src/modules/Jobs/JobDetails/JobInformationPane.tsx b/web/src/modules/Jobs/JobDetails/JobInformationPane.tsx index 55608078..1479e921 100644 --- a/web/src/modules/Jobs/JobDetails/JobInformationPane.tsx +++ b/web/src/modules/Jobs/JobDetails/JobInformationPane.tsx @@ -124,6 +124,27 @@ const JobInformationPane = ({ ]} isTwoColumns /> + {!!jobData?.spark_application_id && ( + + Spark History + + + ), + check: true, + }, + ]} + /> + )} )}