Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Task struct {
Features []string `bson:"features" json:"features"`
IsRestart bool `bson:"is_restart" json:"is_restart"`
StorageEndpoint string `bson:"storage_endpoint" json:"storage_endpoint"`
Events *models.Events `bson:"-" json:"events,omitempty"`
}

func (Task) TableName() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,13 @@ type ImageAndServiceModule struct {
type JobTaskFreestyleSpec struct {
Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"`
Steps []*StepTask `bson:"steps" json:"steps" yaml:"steps"`
Events *Events `bson:"-" json:"events" yaml:"events"`
}

type JobTaskPluginSpec struct {
Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"`
Plugin *PluginTemplate `bson:"plugin" json:"plugin" yaml:"plugin"`
Events *Events `bson:"-" json:"events" yaml:"events"`
}

type JobTaskBlueGreenDeploySpec struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func NewFreestyleJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Wor
if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil {
logger.Error(err)
}
if jobTaskSpec.Events == nil {
jobTaskSpec.Events = &commonmodels.Events{}
}
job.Spec = jobTaskSpec
return &FreestyleJobCtl{
job: job,
Expand Down Expand Up @@ -1164,7 +1167,10 @@ func (c *FreestyleJobCtl) cleanupHelperPod(ctx context.Context, client crClient.
func (c *FreestyleJobCtl) wait(ctx context.Context) {
var err error
taskTimeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger, func(events *commonmodels.Events) {
c.jobTaskSpec.Events = events
c.ack()
})
if err != nil {
c.job.Error = err.Error()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func NewPluginsJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Workf
if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil {
logger.Error(err)
}
if jobTaskSpec.Events == nil {
jobTaskSpec.Events = &commonmodels.Events{}
}
job.Spec = jobTaskSpec
return &PluginJobCtl{
job: job,
Expand Down Expand Up @@ -150,7 +153,10 @@ func (c *PluginJobCtl) run(ctx context.Context) error {
func (c *PluginJobCtl) wait(ctx context.Context) {
var err error
timeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger, func(events *commonmodels.Events) {
c.jobTaskSpec.Events = events
c.ack()
})
if err != nil {
c.logger.Errorf("wait job start error: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ func int64Ptr(i int64) *int64 { return &i }

func WaitPlainJobEnd(ctx context.Context, taskTimeout int, namespace, jobName string, kubeClient crClient.Client, apiServer crClient.Reader, xl *zap.SugaredLogger) config.Status {
timeout := time.After(time.Duration(taskTimeout) * time.Minute)
status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl)
status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl, nil)
if err != nil {
xl.Errorf("wait job start error: %v", err)
}
Expand Down Expand Up @@ -973,12 +973,67 @@ func waitPlainJobEnd(ctx context.Context, taskTimeout int, timeout <-chan time.T
}
}

func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger) (config.Status, error) {
func appendPendingPodEvents(podName, namespace string, apiReader client.Reader, events *commonmodels.Events, reported map[string]struct{}, onUpdate func(*commonmodels.Events), xl *zap.SugaredLogger) {
if events == nil {
return
}

selector := fields.Set{"involvedObject.name": podName, "involvedObject.kind": setting.Pod}.AsSelector()
kubeEvents, err := getter.ListEvents(namespace, selector, apiReader)
if err != nil {
xl.Errorf("list events failed for pod %s/%s: %s", namespace, podName, err)
return
}

sort.SliceStable(kubeEvents, func(i, j int) bool {
return kubeEvents[i].CreationTimestamp.Unix() < kubeEvents[j].CreationTimestamp.Unix()
})

changed := false
for _, kubeEvent := range kubeEvents {
eventKey := fmt.Sprintf("%s|%s|%s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message)
if _, ok := reported[eventKey]; ok {
continue
}
reported[eventKey] = struct{}{}

eventType := "info"
if kubeEvent.Type == corev1.EventTypeWarning {
eventType = "error"
}

eventTime := kubeEvent.LastTimestamp
if eventTime.IsZero() {
eventTime = kubeEvent.FirstTimestamp
}
if eventTime.IsZero() && !kubeEvent.EventTime.IsZero() {
eventTime = metav1.NewTime(kubeEvent.EventTime.Time)
}
if eventTime.IsZero() {
eventTime = metav1.NewTime(time.Now())
}

*events = append(*events, &commonmodels.Event{
EventType: eventType,
Time: eventTime.Format("2006-01-02 15:04:05"),
Message: fmt.Sprintf("Pod event [%s/%s]: %s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message),
})
changed = true
}

if changed && onUpdate != nil {
onUpdate(events)
}
}

func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger, onEventsUpdate func(*commonmodels.Events)) (config.Status, error) {
xl.Infof("wait job to start: %s/%s", namespace, jobName)
xl.Infof("Timeout of preparing Pod: %s.", 120*time.Second)
waitPodReadyTimeout := time.After(120 * time.Second)

var podReadyTimeout bool
reportedEvents := make(map[string]struct{})
events := &commonmodels.Events{}
for {
select {
case <-ctx.Done():
Expand All @@ -1003,6 +1058,8 @@ func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crC
continue
}
for _, pod := range podList {
appendPendingPodEvents(pod.Name, namespace, apiReader, events, reportedEvents, onEventsUpdate, xl)

if pod.Status.Phase == corev1.PodFailed {
msg := ""
for _, condition := range pod.Status.Conditions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type JobTaskPreview struct {
ErrorHandlerUserID string `bson:"error_handler_user_id" yaml:"error_handler_user_id" json:"error_handler_user_id"`
ErrorHandlerUserName string `bson:"error_handler_username" yaml:"error_handler_username" json:"error_handler_username"`
RetryCount int `bson:"retry_count" yaml:"retry_count" json:"retry_count"`
Events *commonmodels.Events `bson:" - " json:"events"`
// JobInfo contains the fields that make up the job task name, for frontend display
JobInfo interface{} `bson:"job_info" json:"job_info"`
}
Expand Down Expand Up @@ -2529,6 +2530,23 @@ func HandleJobError(workflowName, jobName, userID, username string, taskID int64
return nil
}

// extractRuntimeJobEvents extracts the runtime job events from the job task spec.
func extractRuntimeJobEvents(job *commonmodels.JobTask) *commonmodels.Events {
switch job.JobType {
case string(config.JobFreestyle), string(config.JobZadigBuild), string(config.JobZadigTesting), string(config.JobZadigScanning), string(config.JobZadigDistributeImage):
taskJobSpec := &commonmodels.JobTaskFreestyleSpec{}
if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil {
return taskJobSpec.Events
}
case string(config.JobPlugin):
taskJobSpec := &commonmodels.JobTaskPluginSpec{}
if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil {
return taskJobSpec.Events
}
}
return nil
}

func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string, now int64, projectName string) []*JobTaskPreview {
envMap := make(map[string]*commonmodels.Product)
resp := []*JobTaskPreview{}
Expand Down Expand Up @@ -2560,6 +2578,7 @@ func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string,
ErrorHandlerUserID: job.ErrorHandlerUserID,
ErrorHandlerUserName: job.ErrorHandlerUserName,
RetryCount: job.RetryCount,
Events: extractRuntimeJobEvents(job),
}
switch job.JobType {
case string(config.JobFreestyle):
Expand Down
30 changes: 24 additions & 6 deletions pkg/microservice/aslan/core/workflow/testing/service/scanning.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,17 @@ func ListScanningTask(id string, pageNum, pageSize int, log *zap.SugaredLogger)
respList := make([]*ScanningTaskResp, 0)

for _, workflowTask := range workflowTasks {
status := workflowTask.Status
if len(workflowTask.Stages) == 1 && len(workflowTask.Stages[0].Jobs) == 1 && workflowTask.Stages[0].Jobs[0].Status != "" {
status = workflowTask.Stages[0].Jobs[0].Status
}
taskInfo := &ScanningTaskResp{
ScanID: workflowTask.TaskID,
Status: string(workflowTask.Status),
Status: string(status),
Creator: workflowTask.TaskCreator,
CreatedAt: workflowTask.CreateTime,
}
if workflowTask.Status == config.StatusPassed || workflowTask.Status == config.StatusCancelled || workflowTask.Status == config.StatusFailed {
if status == config.StatusPassed || status == config.StatusCancelled || status == config.StatusFailed {
taskInfo.RunTime = workflowTask.EndTime - workflowTask.StartTime
}
respList = append(respList, taskInfo)
Expand Down Expand Up @@ -432,11 +436,25 @@ func GetScanningTaskInfo(scanningID string, taskID int64, log *zap.SugaredLogger
repo.Username = ""
}

status := workflowTask.Status
if workflowTask.Stages[0].Jobs[0].Status != "" {
status = workflowTask.Stages[0].Jobs[0].Status
}
errorMsg := workflowTask.Stages[0].Jobs[0].Error
if errorMsg == "" {
errorMsg = workflowTask.Stages[0].Error
}
if errorMsg == "" {
errorMsg = workflowTask.Error
}

return &ScanningTaskDetail{
Creator: workflowTask.TaskCreator,
Status: string(workflowTask.Status),
Status: string(status),
Error: errorMsg,
CreateTime: workflowTask.CreateTime,
EndTime: workflowTask.EndTime,
Events: jobTaskSpec.Events,
RepoInfo: repoInfo,
SonarMetrics: sonarMetrics,
ResultLink: resultAddr,
Expand Down Expand Up @@ -543,10 +561,10 @@ func generateCustomWorkflowFromScanningModule(scanInfo *commonmodels.Scanning, a
Repos: repos,
KeyVals: renderKeyVals(args.KeyVals, kvs).ToRuntimeList(),
}

// validate kv in the standalone scanning
if err:= jobctrl.ValidateRequiredRuntimeKeyVals(scan.KeyVals,fmt.Sprintf("scan %s", scanInfo.Name));err!= nil{
return nil, err
if err := jobctrl.ValidateRequiredRuntimeKeyVals(scan.KeyVals, fmt.Sprintf("scan %s", scanInfo.Name)); err != nil {
return nil, err
}

job := make([]*commonmodels.Job, 0)
Expand Down
12 changes: 12 additions & 0 deletions pkg/microservice/aslan/core/workflow/testing/service/test_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,19 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar
}
}

errorMsg := workflowTask.Stages[0].Jobs[0].Error
if errorMsg == "" {
errorMsg = workflowTask.Stages[0].Error
}
if errorMsg == "" {
errorMsg = workflowTask.Error
}

subTaskInfo[testName] = map[string]interface{}{
"start_time": workflowTask.Stages[0].Jobs[0].StartTime,
"end_time": workflowTask.Stages[0].Jobs[0].EndTime,
"status": workflowTask.Stages[0].Jobs[0].Status,
"error": errorMsg,
"job_ctx": struct {
JobName string `json:"job_name"`
IsHasArtifact bool `json:"is_has_artifact"`
Expand All @@ -266,6 +275,7 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar
SubTasks: subTaskInfo,
StartTime: workflowTask.Stages[0].StartTime,
EndTime: workflowTask.Stages[0].EndTime,
Error: errorMsg,
TypeName: workflowTask.Stages[0].Name,
})

Expand All @@ -281,9 +291,11 @@ func GetTestTaskDetail(projectKey, testName string, taskID int64, log *zap.Sugar
CreateTime: workflowTask.CreateTime,
StartTime: workflowTask.StartTime,
EndTime: workflowTask.EndTime,
Error: errorMsg,
Stages: stages,
TestReports: testResultMap,
IsRestart: workflowTask.IsRestart,
Events: jobSpec.Events,
}, nil
}

Expand Down
22 changes: 12 additions & 10 deletions pkg/microservice/aslan/core/workflow/testing/service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,18 @@ type ScanningTaskResp struct {
}

type ScanningTaskDetail struct {
Creator string `json:"creator"`
Status string `json:"status"`
CreateTime int64 `json:"create_time"`
EndTime int64 `json:"end_time"`
RepoInfo []*types.Repository `json:"repo_info"`
SonarMetrics *step.SonarMetrics `json:"sonar_metrics"`
ResultLink string `json:"result_link,omitempty"`
IsHasArtifact bool `json:"is_has_artifact"`
JobName string `json:"job_name"`
JobDisplayName string `json:"job_display_name"`
Creator string `json:"creator"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
CreateTime int64 `json:"create_time"`
EndTime int64 `json:"end_time"`
Events *commonmodels.Events `json:"events,omitempty"`
RepoInfo []*types.Repository `json:"repo_info"`
SonarMetrics *step.SonarMetrics `json:"sonar_metrics"`
ResultLink string `json:"result_link,omitempty"`
IsHasArtifact bool `json:"is_has_artifact"`
JobName string `json:"job_name"`
JobDisplayName string `json:"job_display_name"`
}

func ConvertToDBScanningModule(args *Scanning) *commonmodels.Scanning {
Expand Down
Loading