Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions assets/databases/heimdall/tables/jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ create table if not exists jobs

alter table jobs add column if not exists store_result_sync boolean not null default false;
alter table jobs add column if not exists canceled_by varchar(64) null;
alter table jobs add column if not exists spark_application_id varchar(128) not null default '';

-- Originally had "cancelled_by" column and "cancelling" status, but we aren't british. Whoops.
do $$ begin
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/heimdall/job_dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) {
var jobContext string

if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync,
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil {
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy, &r.SparkApplicationID); err != nil {
if err == sql.ErrNoRows {
return nil, ErrUnknownJobID
} else {
Expand Down Expand Up @@ -314,7 +314,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error)
r := &job.Job{}

if err := rows.Scan(&r.SystemID, &r.ID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync,
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy); err != nil {
&r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.ClusterID, &r.ClusterName, &r.StoreResultSync, &r.CanceledBy, &r.SparkApplicationID); err != nil {
getJobsMethod.LogAndCountError(err, "scan")
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/heimdall/jobs_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (h *Heimdall) runAsyncJob(ctx context.Context, j *job.Job) error {
}
defer sess.Close()

if _, err := sess.Exec(queryJobStatusUpdate, status.Running, ``, j.SystemID); err != nil {
if _, err := sess.Exec(queryJobStatusUpdate, status.Running, ``, j.SparkApplicationID, j.SystemID); err != nil {
return h.updateAsyncJobStatus(j, err)
}

Expand Down Expand Up @@ -154,7 +154,7 @@ func (h *Heimdall) updateAsyncJobStatus(j *job.Job, jobError error) error {
}
defer sess.Close()

if _, err := sess.Exec(queryJobStatusUpdate, j.Status, j.Error, j.SystemID); err != nil {
if _, err := sess.Exec(queryJobStatusUpdate, j.Status, j.Error, j.SparkApplicationID, j.SystemID); err != nil {
// TODO: implement proper logging
fmt.Println(`job status update error:`, err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/heimdall/queries/job/select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ select
cl.cluster_id,
cl.cluster_name,
j.store_result_sync,
j.canceled_by
j.canceled_by,
j.spark_application_id
from
jobs j
left join commands cm on cm.system_command_id = j.job_command_id
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/heimdall/queries/job/select_jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ select
cl.cluster_id,
cl.cluster_name,
j.store_result_sync,
j.canceled_by
j.canceled_by,
j.spark_application_id
from
jobs j
join job_statuses js on js.job_status_id = j.job_status_id
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/heimdall/queries/job/status_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ update jobs
set
job_status_id = $1,
job_error = left($2::text, 1024),
spark_application_id = coalesce(nullif($3::text, ''), spark_application_id),
updated_at = extract(epoch from now())::int
where
system_job_id = $3;
system_job_id = $4;
4 changes: 4 additions & 0 deletions internal/pkg/object/command/sparkeks/sparkeks.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,10 @@ func (e *executionContext) monitorJobAndCollectLogs(ctx context.Context) error {
}

finalSparkApp = sparkApp
// Capture Spark's runtime application id (used by the Spark History Server)
if id := sparkApp.Status.SparkApplicationID; id != "" {
e.job.SparkApplicationID = id
}
state := sparkApp.Status.AppState.State

if time.Since(lastReport) >= statusReportInterval || isTerminalState(state) {
Expand Down
27 changes: 14 additions & 13 deletions pkg/object/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import (
)

type Job struct {
object.Object `yaml:",inline" json:",inline"`
Status status.Status `yaml:"status,omitempty" json:"status,omitempty"`
IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"`
StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"`
Error string `yaml:"error,omitempty" json:"error,omitempty"`
CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"`
ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"`
CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"`
CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"`
ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"`
ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"`
CanceledBy string `yaml:"canceled_by,omitempty" json:"canceled_by,omitempty"`
Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"`
object.Object `yaml:",inline" json:",inline"`
Status status.Status `yaml:"status,omitempty" json:"status,omitempty"`
IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"`
StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"`
Error string `yaml:"error,omitempty" json:"error,omitempty"`
CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"`
ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"`
CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"`
CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"`
ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"`
ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"`
CanceledBy string `yaml:"canceled_by,omitempty" json:"canceled_by,omitempty"`
SparkApplicationID string `yaml:"spark_application_id,omitempty" json:"spark_application_id,omitempty"`
Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"`
}

func (j *Job) Init() error {
Expand Down
1 change: 1 addition & 0 deletions web/src/modules/Jobs/Helper.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export type JobType = {
cluster_id: string
cluster_name: string
canceled_by?: string
spark_application_id?: string
error?: string
context?: {
properties: {
Expand Down
21 changes: 21 additions & 0 deletions web/src/modules/Jobs/JobDetails/JobInformationPane.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@ const JobInformationPane = ({
]}
isTwoColumns
/>
{!!jobData?.spark_application_id && (
<InformationPane.Section
data={[
{
label: '',
data: (
<Button
styleType='text-blue'
as='externalLink'
href={`https://spark-history.data-platform.aws.pattern.com/history/${jobData?.spark_application_id}/jobs/`}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shivang Nagta (@ShivangNagta) this is going to be shipped with oss docker image. Lets find another way to inject this in UI

className='gap-1'
>
<span>Spark History</span>
<Icon icon='launch' color='dark-blue' iconSize='12px' />
</Button>
),
check: true,
},
]}
/>
)}
</div>
)}
</InformationPane>
Expand Down
Loading