Skip to content

Conversation

owenowenisme
Copy link
Member

Why are these changes needed?

Related issue number

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@owenowenisme owenowenisme force-pushed the poc/rayjob-bg-goroutine-for-dsashboard-http branch from 372c33b to 9596618 Compare September 3, 2025 12:47
@owenowenisme owenowenisme force-pushed the poc/rayjob-bg-goroutine-for-dsashboard-http branch from 38ba83a to 4069033 Compare September 17, 2025 17:54
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. do we prevent go routine leak?

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that we also need a mechanism to delete the data in sync.Map when delete the CR right?

@owenowenisme
Copy link
Member Author

owenowenisme commented Sep 18, 2025

I am thinking that we also need a mechanism to delete the data in sync.Map when delete the CR right?

You're right!

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the RayCluster dashboard becomes unresponsive or slow, the RayJob controller's reconciliation process continues to invoke AsyncGetJobInfo for the same RayJob Custom Resource (CR), leading to multiple identical function instances accumulating in the taskQueue.

To be more specific, 1000 concurrency + 1000 RayJob CR all using 1 RayCluster (cluster selector), what will happen?

Can we avoid in taskQueue, there's no same RayJob CR?
In extreme case, this will eventually lead to OOM, right?

or maybe we should let every RayJob launch a go routine to async query the status?

Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
r.workerPool.channelContent.Set(jobId, struct{}{})
r.workerPool.taskQueue <- func() {
jobInfo, err := r.GetJobInfo(ctx, jobId)
r.workerPool.channelContent.Remove(jobId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use defer r.workerPool.channelContent.Remove(jobId)

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just come to my mind an interesting edge case.

if a rayjob's key is in the channel, but the CR is deleted, what should happend?
we shouldn't store the information to the map in this scenario, but how?

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: We should put error in jobInfoMap.

Comment on lines 170 to 186
func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) {
if _, ok := r.workerPool.channelContent.Get(jobId); ok {
return
}
r.workerPool.channelContent.Set(jobId, struct{}{})
r.workerPool.taskQueue <- func() {
jobInfo, err := r.GetJobInfo(ctx, jobId)
r.workerPool.channelContent.Remove(jobId)
if err != nil {
fmt.Printf("AsyncGetJobInfo: error: %v\n", err)
return
}
if jobInfo != nil {
r.jobInfoMap.Set(jobId, jobInfo)
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actually an edge case.
Let's assume

  1. RayJob finalizer deletes the jobID item from jobInfoMap and workerPool.channelContent
  2. the background go routine pool retrieves jobID from r.workerPool.taskQueue
  3. query job info using jobID from 2
  4. store result from 3 in jobInfoMap

In this case, we shouldn't store the result.
However, it's hard to handle this edge case, and the data we store will be near 100 bytes, is it ok not to handle this?
(Let's do the calculation, let's say we have 100,000 RayJob CR, the most stale cache we can produce will be 10MB (100 bytes *100000)

I think the solution to handle this edge case is using another backgroud go routine to list all rayjob CR, and check is there any additional key in jobInfoMap, and delete them

cc @rueian @andrewsykim

need your two's advice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is not hard to avoid. We just need to put a placeholder into the map and only update the map if the placeholder exists.

Copy link
Collaborator

@rueian rueian Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we also need to clear the jobInfoMap before each job retry and deletion.


Scheme *runtime.Scheme
Recorder record.EventRecorder
JobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try not injecting this into the RayJobReconciler? I think it should be an implementation detail of the dashboard client and should be better hidden by the it.

…outine-for-dsashboard-http

# Conflicts:
#	ray-operator/controllers/ray/rayjob_controller.go
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
)

type WorkerPool struct {
channelContent cmap.ConcurrentMap[string, struct{}]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to the dashboard client.

}

// Start launches worker goroutines to consume from queue
func (wp *WorkerPool) Start() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this private.

workers int
}

func NewWorkerPool(taskQueue chan func()) *WorkerPool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewWorkerPool(taskQueue chan func()) *WorkerPool {
func NewWorkerPool(workers int) *WorkerPool {


type RayDashboardClientInterface interface {
InitClient(client *http.Client, dashboardURL string)
InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache])
Copy link
Collaborator

@rueian rueian Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache])
InitClient(client *http.Client, dashboardURL string)

Hide the jobInfoMap and workerPool implementation details.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to keep it if controller not directly call InitClient?
Currently controller will create a new DashboardClient for every reconciliation if we need to put the creation of workerPool , cmap .. in InitClient we will recreate them every reconciliation which is not what we want.

Current GetRayDashboardClientFunc acts kind of like a factory.

@Future-Outlier
Copy link
Member

stale cache -> LRU cache

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants