55 "fmt"
66 "strconv"
77 "strings"
8- "time"
98
10- "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
9+ "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"
10+
1111 "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins"
1212 "github.com/flyteorg/flyteplugins/go/tasks/logs"
1313 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery"
@@ -27,11 +27,12 @@ import (
2727)
2828
2929const (
30- rayTaskType = "ray"
31- KindRayJob = "RayJob"
32- IncludeDashboard = "include-dashboard"
33- NodeIPAddress = "node-ip-address"
34- DashboardHost = "dashboard-host"
30+ rayTaskType = "ray"
31+ KindRayJob = "RayJob"
32+ IncludeDashboard = "include-dashboard"
33+ NodeIPAddress = "node-ip-address"
34+ DashboardHost = "dashboard-host"
35+ DisableUsageStatsStartParameter = "disable-usage-stats"
3536)
3637
3738type rayJobResourceHandler struct {
@@ -57,7 +58,6 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
5758 }
5859
5960 podSpec , objectMeta , primaryContainerName , err := flytek8s .ToK8sPodSpec (ctx , taskCtx )
60-
6161 if err != nil {
6262 return nil , flyteerr .Errorf (flyteerr .BadTaskSpecification , "Unable to create pod spec: [%v]" , err .Error ())
6363 }
@@ -76,26 +76,36 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
7676 return nil , flyteerr .Errorf (flyteerr .BadTaskSpecification , "Unable to get primary container from the pod: [%v]" , err .Error ())
7777 }
7878
79+ cfg := GetConfig ()
7980 headReplicas := int32 (1 )
8081 headNodeRayStartParams := make (map [string ]string )
8182 if rayJob .RayCluster .HeadGroupSpec != nil && rayJob .RayCluster .HeadGroupSpec .RayStartParams != nil {
8283 headNodeRayStartParams = rayJob .RayCluster .HeadGroupSpec .RayStartParams
84+ } else if headNode := cfg .Defaults .HeadNode ; len (headNode .StartParameters ) > 0 {
85+ headNodeRayStartParams = headNode .StartParameters
8386 }
87+
8488 if _ , exist := headNodeRayStartParams [IncludeDashboard ]; ! exist {
8589 headNodeRayStartParams [IncludeDashboard ] = strconv .FormatBool (GetConfig ().IncludeDashboard )
8690 }
91+
8792 if _ , exist := headNodeRayStartParams [NodeIPAddress ]; ! exist {
88- headNodeRayStartParams [NodeIPAddress ] = GetConfig (). NodeIPAddress
93+ headNodeRayStartParams [NodeIPAddress ] = cfg . Defaults . HeadNode . IPAddress
8994 }
95+
9096 if _ , exist := headNodeRayStartParams [DashboardHost ]; ! exist {
91- headNodeRayStartParams [DashboardHost ] = GetConfig ().DashboardHost
97+ headNodeRayStartParams [DashboardHost ] = cfg .DashboardHost
98+ }
99+
100+ if _ , exists := headNodeRayStartParams [DisableUsageStatsStartParameter ]; ! exists && ! cfg .EnableUsageStats {
101+ headNodeRayStartParams [DisableUsageStatsStartParameter ] = "true"
92102 }
93103
94104 enableIngress := true
95105 rayClusterSpec := rayv1alpha1.RayClusterSpec {
96106 HeadGroupSpec : rayv1alpha1.HeadGroupSpec {
97107 Template : buildHeadPodTemplate (& container , podSpec , objectMeta , taskCtx ),
98- ServiceType : v1 .ServiceType (GetConfig () .ServiceType ),
108+ ServiceType : v1 .ServiceType (cfg .ServiceType ),
99109 Replicas : & headReplicas ,
100110 EnableIngress : & enableIngress ,
101111 RayStartParams : headNodeRayStartParams ,
@@ -111,16 +121,24 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
111121 if spec .MinReplicas != 0 {
112122 minReplicas = spec .MinReplicas
113123 }
124+
114125 if spec .MaxReplicas != 0 {
115126 maxReplicas = spec .MaxReplicas
116127 }
117128
118129 workerNodeRayStartParams := make (map [string ]string )
119130 if spec .RayStartParams != nil {
120131 workerNodeRayStartParams = spec .RayStartParams
132+ } else if workerNode := cfg .Defaults .WorkerNode ; len (workerNode .StartParameters ) > 0 {
133+ workerNodeRayStartParams = workerNode .StartParameters
121134 }
135+
122136 if _ , exist := workerNodeRayStartParams [NodeIPAddress ]; ! exist {
123- workerNodeRayStartParams [NodeIPAddress ] = GetConfig ().NodeIPAddress
137+ workerNodeRayStartParams [NodeIPAddress ] = cfg .Defaults .WorkerNode .IPAddress
138+ }
139+
140+ if _ , exists := workerNodeRayStartParams [DisableUsageStatsStartParameter ]; ! exists && ! cfg .EnableUsageStats {
141+ workerNodeRayStartParams [DisableUsageStatsStartParameter ] = "true"
124142 }
125143
126144 workerNodeSpec := rayv1alpha1.WorkerGroupSpec {
@@ -145,8 +163,8 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
145163 jobSpec := rayv1alpha1.RayJobSpec {
146164 RayClusterSpec : rayClusterSpec ,
147165 Entrypoint : strings .Join (container .Args , " " ),
148- ShutdownAfterJobFinishes : GetConfig () .ShutdownAfterJobFinishes ,
149- TTLSecondsAfterFinished : & GetConfig () .TTLSecondsAfterFinished ,
166+ ShutdownAfterJobFinishes : cfg .ShutdownAfterJobFinishes ,
167+ TTLSecondsAfterFinished : & cfg .TTLSecondsAfterFinished ,
150168 RuntimeEnv : rayJob .RuntimeEnv ,
151169 }
152170
@@ -347,12 +365,10 @@ func (rayJobResourceHandler) BuildIdentityResource(ctx context.Context, taskCtx
347365 }, nil
348366}
349367
350- func getEventInfoForRayJob () (* pluginsCore.TaskInfo , error ) {
351- taskLogs := make ([]* core.TaskLog , 0 , 3 )
352- logPlugin , err := logs .InitializeLogPlugins (logs .GetLogConfig ())
353-
368+ func getEventInfoForRayJob (logConfig logs.LogConfig , pluginContext k8s.PluginContext , rayJob * rayv1alpha1.RayJob ) (* pluginsCore.TaskInfo , error ) {
369+ logPlugin , err := logs .InitializeLogPlugins (& logConfig )
354370 if err != nil {
355- return nil , err
371+ return nil , fmt . Errorf ( "failed to initialize log plugins. Error: %w" , err )
356372 }
357373
358374 if logPlugin == nil {
@@ -362,22 +378,31 @@ func getEventInfoForRayJob() (*pluginsCore.TaskInfo, error) {
362378 // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs
363379 // RayJob CRD does not include the name of the worker or head pod for now
364380
365- // TODO: Add ray Dashboard URI to task logs
381+ taskID := pluginContext .TaskExecutionMetadata ().GetTaskExecutionID ().GetID ()
382+ logOutput , err := logPlugin .GetTaskLogs (tasklog.Input {
383+ Namespace : rayJob .Namespace ,
384+ TaskExecutionIdentifier : & taskID ,
385+ })
386+
387+ if err != nil {
388+ return nil , fmt .Errorf ("failed to generate task logs. Error: %w" , err )
389+ }
366390
367391 return & pluginsCore.TaskInfo {
368- Logs : taskLogs ,
392+ Logs : logOutput . TaskLogs ,
369393 }, nil
370394}
371395
372- func (rayJobResourceHandler ) GetTaskPhase (ctx context.Context , pluginContext k8s.PluginContext , resource client.Object ) (pluginsCore.PhaseInfo , error ) {
396+ func (plugin rayJobResourceHandler ) GetTaskPhase (ctx context.Context , pluginContext k8s.PluginContext , resource client.Object ) (pluginsCore.PhaseInfo , error ) {
373397 rayJob := resource .(* rayv1alpha1.RayJob )
374- info , err := getEventInfoForRayJob ()
398+ info , err := getEventInfoForRayJob (GetConfig (). Logs , pluginContext , rayJob )
375399 if err != nil {
376400 return pluginsCore .PhaseInfoUndefined , err
377401 }
402+
378403 switch rayJob .Status .JobStatus {
379404 case rayv1alpha1 .JobStatusPending :
380- return pluginsCore .PhaseInfoNotReady ( time . Now () , pluginsCore .DefaultPhaseVersion , "job is pending" ), nil
405+ return pluginsCore .PhaseInfoInitializing ( rayJob . Status . StartTime . Time , pluginsCore .DefaultPhaseVersion , "job is pending" , info ), nil
381406 case rayv1alpha1 .JobStatusFailed :
382407 reason := fmt .Sprintf ("Failed to create Ray job: %s" , rayJob .Name )
383408 return pluginsCore .PhaseInfoFailure (flyteerr .TaskFailedWithError , reason , info ), nil
@@ -386,7 +411,8 @@ func (rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s
386411 case rayv1alpha1 .JobStatusRunning :
387412 return pluginsCore .PhaseInfoRunning (pluginsCore .DefaultPhaseVersion , info ), nil
388413 }
389- return pluginsCore .PhaseInfoQueued (time .Now (), pluginsCore .DefaultPhaseVersion , "JobCreated" ), nil
414+
415+ return pluginsCore .PhaseInfoQueued (rayJob .CreationTimestamp .Time , pluginsCore .DefaultPhaseVersion , "JobCreated" ), nil
390416}
391417
392418func init () {
0 commit comments