add: continue partial operations in uring engine#168
add: continue partial operations in uring engine#168VasilyStepanov wants to merge 2 commits intomainfrom
Conversation
Summary of ChangesHello @VasilyStepanov, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant architectural change to the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces an Event abstraction layer for io_uring operations, with the goal of supporting continued partial operations. While the overall structure is a good step, the core logic for handling partial I/O is incomplete and incorrect. The prep methods for all read/write events do not adjust for partially completed operations, and the wait function in Queue does not handle the resubmission of incomplete events. These issues are critical as they defeat the purpose of the change and will lead to incorrect I/O behavior. My review provides specific feedback and suggestions to correctly implement the partial operation handling.
librawstorio/src/uring_event.cpp
Outdated
| void EventScalarRead::prep() { | ||
| io_uring_sqe* sqe = _q.get_sqe(); | ||
|
|
||
| io_uring_prep_read( | ||
| sqe, _t->fd(), static_cast<rawstor::io::TaskScalar*>(_t.get())->buf(), | ||
| static_cast<rawstor::io::TaskScalar*>(_t.get())->size(), 0 | ||
| ); | ||
|
|
||
| io_uring_sqe_set_data(sqe, this); | ||
| } |
There was a problem hiding this comment.
This prep method doesn't account for partial reads. To support continuing partial operations, you need to adjust the buffer pointer and the size of the read based on the number of bytes already read, which is stored in _result. The current implementation will always re-read from the beginning of the buffer, potentially overwriting data or reading the same data again.
This issue applies to all other scalar prep methods for read and write operations in this file.
void EventScalarRead::prep() {
io_uring_sqe* sqe = _q.get_sqe();
auto* task = static_cast<rawstor::io::TaskScalar*>(_t.get());
io_uring_prep_read(
sqe, task->fd(), static_cast<char*>(task->buf()) + _result,
task->size() - _result, 0
);
io_uring_sqe_set_data(sqe, this);
}| void EventVectorRead::prep() { | ||
| io_uring_sqe* sqe = _q.get_sqe(); | ||
|
|
||
| io_uring_prep_readv( | ||
| sqe, _t->fd(), static_cast<rawstor::io::TaskVector*>(_t.get())->iov(), | ||
| static_cast<rawstor::io::TaskVector*>(_t.get())->niov(), 0 | ||
| ); | ||
|
|
||
| io_uring_sqe_set_data(sqe, this); | ||
| } |
There was a problem hiding this comment.
Similar to EventScalarRead, this prep method for vectorized reads does not account for partial operations. It will always resubmit the entire iovec array from the beginning.
To fix this, you need to adjust the iovec array to skip completed vectors and modify the first partially read vector to point to the remaining data. Since io_uring_prep_readv does not take an offset into the first vector, you may need to modify the iovec array. One approach is to advance a pointer through the iovec array and adjust the iov_base and iov_len of the first non-completed vector.
For example:
auto* task = static_cast<rawstor::io::TaskVector*>(_t.get());
struct iovec* iov = task->iov();
unsigned int niov = task->niov();
size_t bytes_done = _result;
// Find the starting iovec and adjust it
while (bytes_done > 0 && niov > 0) {
if (bytes_done < iov->iov_len) {
iov->iov_base = static_cast<char*>(iov->iov_base) + bytes_done;
iov->iov_len -= bytes_done;
break;
} else {
bytes_done -= iov->iov_len;
iov++;
niov--;
}
}
if (niov > 0) {
io_uring_prep_readv(sqe, _t->fd(), iov, niov, 0);
}This approach modifies the iovec array in the task, which might have side effects. Consider if this is acceptable or if a copy is needed. This same logic applies to all other vectorized and message-based prep methods.
| void EventScalarPositionalRead::prep() { | ||
| io_uring_sqe* sqe = _q.get_sqe(); | ||
|
|
||
| io_uring_prep_read( | ||
| sqe, _t->fd(), | ||
| static_cast<rawstor::io::TaskScalarPositional*>(_t.get())->buf(), | ||
| static_cast<rawstor::io::TaskScalarPositional*>(_t.get())->size(), | ||
| static_cast<rawstor::io::TaskScalarPositional*>(_t.get())->offset() | ||
| ); | ||
|
|
||
| io_uring_sqe_set_data(sqe, this); | ||
| } |
There was a problem hiding this comment.
Similar to non-positional reads, this prep method for positional reads doesn't account for partial operations. In addition to adjusting the buffer pointer and size, you must also adjust the file offset based on the bytes already transferred (_result).
void EventScalarPositionalRead::prep() {
io_uring_sqe* sqe = _q.get_sqe();
auto* task = static_cast<rawstor::io::TaskScalarPositional*>(_t.get());
io_uring_prep_read(
sqe, task->fd(),
static_cast<char*>(task->buf()) + _result,
task->size() - _result,
task->offset() + _result
);
io_uring_sqe_set_data(sqe, this);
}
librawstorio/src/uring_queue.cpp
Outdated
| std::unique_ptr<Event> event( | ||
| static_cast<Event*>(io_uring_cqe_get_data(cqe)) | ||
| ); | ||
|
|
||
| size_t result = cqe->res >= 0 ? cqe->res : 0; | ||
| int error = cqe->res < 0 ? -cqe->res : 0; | ||
| event->set_result(cqe->res); | ||
|
|
||
| io_uring_cqe_seen(&_ring, cqe); | ||
|
|
||
| --_events; | ||
|
|
||
| (*t)(result, error); | ||
| event->dispatch(); |
There was a problem hiding this comment.
The current implementation of wait assumes that every completed operation is final. It takes ownership of the Event object via std::unique_ptr and destroys it after dispatching. This loses the state (_result) required for continuing partial operations.
To correctly handle partial reads/writes, you should check if the operation is complete. If it is not, you should re-prepare the event for the next part of the operation and resubmit it, instead of dispatching the callback and destroying the event. This would involve:
- Adding a virtual
is_complete()method to theEventclass hierarchy to check against the total expected size from theTask. - In this
waitfunction, callingis_complete()after updating the result. - If not complete, call
event->prep()and thenevent.release()to prevent theunique_ptrfrom destroying it. The event will be picked up on a subsequentio_uring_submit. - If complete, then decrement
_eventsand callevent->dispatch()as you do now.
cd3b36c to
8e3e021
Compare
8e3e021 to
bea1473
Compare
No description provided.