-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
File open call in preload #1627
base: feature/xload
Are you sure you want to change the base?
Conversation
…e-fuse into feature/xload
…to feature/xload
…to sourav/xload
…to sourav/xload
…to sourav/xload
…to sourav/xload
func (threadPool *ThreadPool) Do() { | ||
func (threadPool *ThreadPool) Do(priority bool) { | ||
defer threadPool.waitGroup.Done() | ||
|
||
// This thread will work only on both high and low priority channel | ||
for item := range threadPool.workItems { | ||
dataLength, err := threadPool.callback(item) | ||
if err != nil { | ||
// TODO:: xload : add retry logic | ||
log.Err("ThreadPool::Do : Error in %s processing workitem %s : %v", item.CompName, item.Path, err) | ||
} | ||
if priority { | ||
// This thread will work only on high priority channel | ||
for { | ||
select { | ||
case item := <-threadPool.priorityItems: | ||
threadPool.process(item) | ||
|
||
// add this error in response channel | ||
if cap(item.ResponseChannel) > 0 { | ||
item.Err = err | ||
item.DataLen = (uint64)(dataLength) | ||
item.ResponseChannel <- item | ||
case <-threadPool.ctx.Done(): // listen to cancellation signal | ||
return | ||
} | ||
} | ||
} else { | ||
// This thread will work only on both high and low priority channel | ||
for { | ||
select { | ||
case item := <-threadPool.priorityItems: | ||
threadPool.process(item) | ||
|
||
case item := <-threadPool.workItems: | ||
threadPool.process(item) | ||
|
||
case <-threadPool.ctx.Done(): // listen to cancellation signal | ||
return | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optimised version
`
// Do is the core task to be executed by each worker thread
func (threadPool *ThreadPool) Do(priority bool) {
defer threadPool.waitGroup.Done()
for {
select {
case item := <-threadPool.priorityItems:
threadPool.process(item)
case item := <-threadPool.workItems:
if !priority { // Only process workItems if it's not a priority thread
threadPool.process(item)
}
case <-threadPool.ctx.Done(): // Listen to cancellation signal
return
}
}
}
`
Please check the correctness if I understood your point correctly.
If possible write down the unit test for this.
So this use case are covered for this. And we don't miss any scenario
fh, err := os.OpenFile(localPath, options.Flags, options.Mode) | ||
if err != nil { | ||
log.Err("Xload::OpenFile : error opening cached file %s [%s]", options.Name, err.Error()) | ||
return nil, err | ||
} | ||
|
||
// Increment the handle count in this lock item as there is one handle open for this now | ||
flock.Inc() | ||
|
||
handle := handlemap.NewHandle(options.Name) | ||
info, err := fh.Stat() | ||
if err == nil { | ||
handle.Size = info.Size() | ||
} | ||
|
||
handle.UnixFD = uint64(fh.Fd()) | ||
handle.Flags.Set(handlemap.HandleFlagCached) | ||
|
||
log.Info("Xload::OpenFile : file=%s, fd=%d", options.Name, fh.Fd()) | ||
handle.SetFileObject(fh) | ||
|
||
return handle, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`
return openFile(localPath, options, flock)
}
func openFile(localPath string, options internal.OpenFileOptions, flock *common.LockMapItem) (*handlemap.Handle, error) {
fh, err := os.OpenFile(localPath, options.Flags, options.Mode)
if err != nil {
log.Err("Xload::OpenFile : error opening cached file %s [%s]", options.Name, err.Error())
return nil, err
}
// Increment the handle count in this lock item as there is one handle open for this now
flock.Inc()
handle := handlemap.NewHandle(options.Name)
info, err := fh.Stat()
if err == nil {
handle.Size = info.Size()
}
handle.UnixFD = uint64(fh.Fd())
handle.Flags.Set(handlemap.HandleFlagCached)
log.Info("Xload::OpenFile : file=%s, fd=%d", options.Name, fh.Fd())
handle.SetFileObject(fh)
return handle, nil
}
`
Extract separate method to segregate out validations and proper openFile implementation
✅ What
Preload operation downloads the blobs in parallel. If open call comes for a file and it has not been downloaded yet, then it will be scheduled for download on priority.
🤔 Why
The open call for a file downloads it on priority, if not yet scheduled for download. This is done so that the users will not have to wait till the scheduler picks it up for download.
NOTE
The UT will come in the next PR