Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 47 additions & 14 deletions flyteplugins/go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,29 @@
taskLogs := make([]*core.TaskLog, 0, 3)
taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID()

// Extract Start and End times from SparkApplicationStatus
var startTime, finishTime int64
var RFC3999StartTime, RFC3999FinishTime string

// Use SubmissionTime as the start time
if !sj.Status.SubmissionTime.IsZero() {
RFC3999StartTime = sj.Status.SubmissionTime.Format(time.RFC3339)
startTime = sj.Status.SubmissionTime.Unix()
}

// Use CompletionTime or TerminationTime as the end time
var endTime metav1.Time
if !sj.Status.CompletionTime.IsZero() {

Check failure on line 406 in flyteplugins/go/tasks/plugins/k8s/spark/spark.go

View workflow job for this annotation

GitHub Actions / compile

sj.Status.CompletionTime undefined (type "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2".SparkApplicationStatus has no field or method CompletionTime)
endTime = sj.Status.CompletionTime

Check failure on line 407 in flyteplugins/go/tasks/plugins/k8s/spark/spark.go

View workflow job for this annotation

GitHub Actions / compile

sj.Status.CompletionTime undefined (type "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2".SparkApplicationStatus has no field or method CompletionTime)
} else if !sj.Status.TerminationTime.IsZero() {
endTime = sj.Status.TerminationTime
}

if !endTime.IsZero() {
RFC3999FinishTime = endTime.Format(time.RFC3339)
finishTime = endTime.Unix()
}

if sj.Status.DriverInfo.PodName != "" {
p, err := logs.InitializeLogPlugins(&sparkConfig.LogConfig.Mixed)
if err != nil {
Expand All @@ -399,10 +422,14 @@

if p != nil {
o, err := p.GetTaskLogs(tasklog.Input{
PodName: sj.Status.DriverInfo.PodName,
Namespace: sj.Namespace,
LogName: "(Driver Logs)",
TaskExecutionID: taskExecID,
PodName: sj.Status.DriverInfo.PodName,
Namespace: sj.Namespace,
LogName: "(Driver Logs)",
TaskExecutionID: taskExecID,
PodRFC3339StartTime: RFC3999StartTime,
PodRFC3339FinishTime: RFC3999FinishTime,
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
})

if err != nil {
Expand All @@ -419,18 +446,24 @@
}

if p != nil {
o, err := p.GetTaskLogs(tasklog.Input{
PodName: sj.Status.DriverInfo.PodName,
Namespace: sj.Namespace,
LogName: "(User Logs)",
TaskExecutionID: taskExecID,
})
if sj.Status.DriverInfo.PodName != "" {
o, err := p.GetTaskLogs(tasklog.Input{
PodName: sj.Status.DriverInfo.PodName,
Namespace: sj.Namespace,
LogName: "(User Logs)",
TaskExecutionID: taskExecID,
PodRFC3339StartTime: RFC3999StartTime,
PodRFC3339FinishTime: RFC3999FinishTime,
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
})

if err != nil {
return nil, err
}
if err != nil {
return nil, err
}

taskLogs = append(taskLogs, o.TaskLogs...)
taskLogs = append(taskLogs, o.TaskLogs...)
}
}

p, err = logs.InitializeLogPlugins(&sparkConfig.LogConfig.System)
Expand Down
Loading