diff --git a/Cargo.lock b/Cargo.lock index 531cb28db1..6dbcd3b5a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -130,18 +130,6 @@ dependencies = [ "bitflags 2.6.0", ] -[[package]] -name = "async-channel" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-lock" version = "3.4.0" @@ -519,12 +507,6 @@ dependencies = [ "zerocopy-derive", ] -[[package]] -name = "futures-core" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" - [[package]] name = "generic_once_cell" version = "0.1.1" @@ -613,7 +595,6 @@ dependencies = [ "anstyle", "anyhow", "arm-gic", - "async-channel", "async-lock", "async-trait", "bit_field", diff --git a/Cargo.toml b/Cargo.toml index e23c08b637..547844923a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,7 +76,6 @@ virtio = { package = "virtio-spec", version = "0.1", features = ["alloc", "mmio" ahash = { version = "0.8", default-features = false } align-address = "0.3" anstyle = { version = "1", default-features = false } -async-channel = { version = "2.3", default-features = false } async-lock = { version = "3.4.0", default-features = false } async-trait = "0.1.82" bit_field = "0.10" diff --git a/src/drivers/fs/virtio_fs.rs b/src/drivers/fs/virtio_fs.rs index 9a10dc4f5f..2bf15ea21d 100644 --- a/src/drivers/fs/virtio_fs.rs +++ b/src/drivers/fs/virtio_fs.rs @@ -1,8 +1,11 @@ use alloc::boxed::Box; +use alloc::rc::Rc; use alloc::string::{String, ToString}; use alloc::vec::Vec; use core::str; +use core::task::Waker; +use hermit_sync::SpinMutex; use pci_types::InterruptLine; use virtio::fs::ConfigVolatileFieldAccess; use virtio::FeatureBits; @@ -10,6 +13,7 @@ use volatile::access::ReadOnly; use volatile::VolatileRef; use crate::config::VIRTIO_MAX_QUEUE_SIZE; +use crate::drivers::pci::get_filesystem_driver; use crate::drivers::virtio::error::VirtioFsError; #[cfg(not(feature = "pci"))] use crate::drivers::virtio::transport::mmio::{ComCfg, IsrStatus, NotifCfg}; @@ -18,9 +22,11 @@ use crate::drivers::virtio::transport::pci::{ComCfg, IsrStatus, NotifCfg}; use crate::drivers::virtio::virtqueue::error::VirtqError; use crate::drivers::virtio::virtqueue::split::SplitVq; use crate::drivers::virtio::virtqueue::{ - AvailBufferToken, BufferElem, BufferType, Virtq, VqIndex, VqSize, + AvailBufferToken, BufferElem, BufferType, Virtq, VirtqMutex, VqIndex, VqSize, }; +use crate::executor::block_on; use crate::fs::fuse::{self, FuseInterface, Rsp, RspHeader}; +use crate::io; use crate::mm::device_alloc::DeviceAlloc; /// A wrapper struct for the raw configuration structure. @@ -42,8 +48,9 @@ pub(crate) struct VirtioFsDriver { pub(super) com_cfg: ComCfg, pub(super) isr_stat: IsrStatus, pub(super) notif_cfg: NotifCfg, - pub(super) vqueues: Vec>, + pub(super) vqueues: Vec, pub(super) irq: InterruptLine, + pub(super) waker: Option, } // Backend-independent interface for Virtio network driver @@ -136,9 +143,10 @@ impl VirtioFsDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(i), self.dev_cfg.features.into(), + Box::new(|waker| get_filesystem_driver().unwrap().lock().waker = Some(waker)), ) .unwrap(); - self.vqueues.push(Box::new(vq)); + self.vqueues.push(Rc::new(SpinMutex::new(Box::new(vq)))); } // At this point the device is "live" @@ -146,6 +154,28 @@ impl VirtioFsDriver { Ok(()) } + + pub(crate) fn handle_interrupt(&mut self) { + let status = self.isr_stat.is_queue_interrupt(); + + #[cfg(not(feature = "pci"))] + if status.contains(virtio::mmio::InterruptStatus::CONFIGURATION_CHANGE_NOTIFICATION) { + info!("Configuration changes are not possible! Aborting"); + todo!("Implement possibility to change config on the fly...") + } + + #[cfg(feature = "pci")] + if status.contains(virtio::pci::IsrStatus::DEVICE_CONFIGURATION_INTERRUPT) { + info!("Configuration changes are not possible! Aborting"); + todo!("Implement possibility to change config on the fly...") + } + + if let Some(waker) = &self.waker { + waker.wake_by_ref(); + } + + self.isr_stat.acknowledge(); + } } impl FuseInterface for VirtioFsDriver { @@ -179,8 +209,17 @@ impl FuseInterface for VirtioFsDriver { }; let buffer_tkn = AvailBufferToken::new(send, recv).unwrap(); + let vq = &mut self.vqueues[1]; + let vq_clone = vq.clone(); + let recv_future = { + let mut vq_guard = vq.lock(); + vq_guard.dispatch(buffer_tkn, false, BufferType::Direct)?; + dbg!(); + vq_guard.recv(vq_clone) + }; let mut transfer_result = - self.vqueues[1].dispatch_blocking(buffer_tkn, BufferType::Direct)?; + block_on(async { recv_future.await.or(Err(io::Error::EIO)) }, None) + .or(Err(VirtqError::General))?; let headers = transfer_result.used_recv_buff.pop_front_downcast().unwrap(); let payload = transfer_result.used_recv_buff.pop_front_vec(); diff --git a/src/drivers/fs/virtio_pci.rs b/src/drivers/fs/virtio_pci.rs index af2be50f7e..5d17fed49a 100644 --- a/src/drivers/fs/virtio_pci.rs +++ b/src/drivers/fs/virtio_pci.rs @@ -78,6 +78,7 @@ impl VirtioFsDriver { notif_cfg, vqueues: Vec::new(), irq: device.get_irq().unwrap(), + waker: None, }) } diff --git a/src/drivers/net/virtio/mod.rs b/src/drivers/net/virtio/mod.rs index 4c39d12b8b..fb8f231b15 100644 --- a/src/drivers/net/virtio/mod.rs +++ b/src/drivers/net/virtio/mod.rs @@ -56,15 +56,11 @@ impl CtrlQueue { pub struct RxQueues { vqs: Vec>, - poll_sender: async_channel::Sender, - poll_receiver: async_channel::Receiver, packet_size: u32, } impl RxQueues { pub fn new(vqs: Vec>, dev_cfg: &NetDevCfg) -> Self { - let (poll_sender, poll_receiver) = async_channel::unbounded(); - // See Virtio specification v1.1 - 5.1.6.3.1 // let packet_size = if dev_cfg.features.contains(virtio::net::F::MRG_RXBUF) { @@ -73,12 +69,7 @@ impl RxQueues { dev_cfg.raw.as_ptr().mtu().read().to_ne().into() }; - Self { - vqs, - poll_sender, - poll_receiver, - packet_size, - } + Self { vqs, packet_size } } /// Takes care of handling packets correctly which need some processing after being received. @@ -95,32 +86,12 @@ impl RxQueues { fn add(&mut self, mut vq: Box) { const BUFF_PER_PACKET: u16 = 2; let num_packets: u16 = u16::from(vq.size()) / BUFF_PER_PACKET; - fill_queue( - vq.as_mut(), - num_packets, - self.packet_size, - self.poll_sender.clone(), - ); + fill_queue(vq.as_mut(), num_packets, self.packet_size); self.vqs.push(vq); } fn get_next(&mut self) -> Option { - let transfer = self.poll_receiver.try_recv(); - - transfer - .or_else(|_| { - // Check if any not yet provided transfers are in the queue. - self.poll(); - - self.poll_receiver.try_recv() - }) - .ok() - } - - fn poll(&mut self) { - for vq in &mut self.vqs { - vq.poll(); - } + self.vqs[0].try_recv().ok() } fn enable_notifs(&mut self) { @@ -140,12 +111,7 @@ impl RxQueues { } } -fn fill_queue( - vq: &mut dyn Virtq, - num_packets: u16, - packet_size: u32, - poll_sender: async_channel::Sender, -) { +fn fill_queue(vq: &mut dyn Virtq, num_packets: u16, packet_size: u32) { for _ in 0..num_packets { let buff_tkn = match AvailBufferToken::new( vec![], @@ -167,12 +133,7 @@ fn fill_queue( // BufferTokens are directly provided to the queue // TransferTokens are directly dispatched // Transfers will be awaited at the queue - match vq.dispatch( - buff_tkn, - Some(poll_sender.clone()), - false, - BufferType::Direct, - ) { + match vq.dispatch(buff_tkn, false, BufferType::Direct) { Ok(_) => (), Err(err) => { error!("{:#?}", err); @@ -220,7 +181,9 @@ impl TxQueues { fn poll(&mut self) { for vq in &mut self.vqs { - vq.poll(); + // We don't do anything with the buffers but we need to receive them for the + // ring slots to be emptied and the memory from the previous transfers to be freed. + while vq.try_recv().is_ok() {} } } @@ -339,7 +302,7 @@ impl NetworkDriver for VirtioNetDriver { .unwrap(); self.send_vqs.vqs[0] - .dispatch(buff_tkn, None, false, BufferType::Direct) + .dispatch(buff_tkn, false, BufferType::Direct) .unwrap(); result @@ -373,7 +336,6 @@ impl NetworkDriver for VirtioNetDriver { self.recv_vqs.vqs[0].as_mut(), num_buffers, self.recv_vqs.packet_size, - self.recv_vqs.poll_sender.clone(), ); let vec_data = packets.into_iter().flatten().collect(); @@ -679,6 +641,7 @@ impl VirtioNetDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(self.num_vqs), self.dev_cfg.features.into(), + Box::new(|_waker| {}), ) .unwrap(), ))); @@ -690,6 +653,7 @@ impl VirtioNetDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(self.num_vqs), self.dev_cfg.features.into(), + Box::new(|_waker| {}), ) .unwrap(), ))); @@ -754,6 +718,7 @@ impl VirtioNetDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(2 * i), self.dev_cfg.features.into(), + Box::new(|_waker| {}), ) .unwrap(); // Interrupt for receiving packets is wanted @@ -767,6 +732,7 @@ impl VirtioNetDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(2 * i + 1), self.dev_cfg.features.into(), + Box::new(|_waker| {}), ) .unwrap(); // Interrupt for comunicating that a sended packet left, is not needed @@ -780,6 +746,7 @@ impl VirtioNetDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(2 * i), self.dev_cfg.features.into(), + Box::new(|_waker| {}), ) .unwrap(); // Interrupt for receiving packets is wanted @@ -793,6 +760,7 @@ impl VirtioNetDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(2 * i + 1), self.dev_cfg.features.into(), + Box::new(|_waker| {}), ) .unwrap(); // Interrupt for comunicating that a sended packet left, is not needed diff --git a/src/drivers/virtio/transport/mod.rs b/src/drivers/virtio/transport/mod.rs index 278481e8d1..0b5c85f32f 100644 --- a/src/drivers/virtio/transport/mod.rs +++ b/src/drivers/virtio/transport/mod.rs @@ -15,7 +15,7 @@ pub mod pci; ))] use crate::arch::kernel::mmio as hardware; #[cfg(all( - any(feature = "vsock", feature = "tcp", feature = "udp"), + any(feature = "vsock", feature = "tcp", feature = "udp", feature = "fuse"), feature = "pci" ))] use crate::drivers::pci as hardware; diff --git a/src/drivers/virtio/transport/pci.rs b/src/drivers/virtio/transport/pci.rs index c7698bb9e5..d5ce1b3e7a 100644 --- a/src/drivers/virtio/transport/pci.rs +++ b/src/drivers/virtio/transport/pci.rs @@ -18,7 +18,7 @@ use volatile::{VolatilePtr, VolatileRef}; #[cfg(all( not(feature = "rtl8139"), - any(feature = "tcp", feature = "udp", feature = "vsock") + any(feature = "tcp", feature = "udp", feature = "vsock", feature = "fuse") ))] use crate::arch::kernel::interrupts::*; use crate::arch::memory_barrier; @@ -34,7 +34,7 @@ use crate::drivers::pci::PciDevice; use crate::drivers::virtio::error::VirtioError; #[cfg(all( not(feature = "rtl8139"), - any(feature = "tcp", feature = "udp", feature = "vsock") + any(feature = "tcp", feature = "udp", feature = "vsock", feature = "fuse") ))] use crate::drivers::virtio::transport::hardware; #[cfg(feature = "vsock")] @@ -993,7 +993,22 @@ pub(crate) fn init_device( Ok(drv) } #[cfg(feature = "fuse")] - VirtioDriver::FileSystem(_) => Ok(drv), + VirtioDriver::FileSystem(_) => { + let irq = device.get_irq().unwrap(); + + info!("Install virtio interrupt handler at line {}", irq); + + fn file_system_handler() { + if let Some(driver) = hardware::get_filesystem_driver() { + driver.lock().handle_interrupt(); + } + } + + irq_install_handler(irq, file_system_handler); + add_irq_name(irq, "virtio file system"); + + Ok(drv) + } }, Err(virt_err) => Err(virt_err), } diff --git a/src/drivers/virtio/virtqueue/mod.rs b/src/drivers/virtio/virtqueue/mod.rs index 2ae238091d..d5bff4b604 100644 --- a/src/drivers/virtio/virtqueue/mod.rs +++ b/src/drivers/virtio/virtqueue/mod.rs @@ -14,12 +14,15 @@ pub mod split; use alloc::boxed::Box; use alloc::collections::vec_deque::VecDeque; +use alloc::rc::Rc; use alloc::vec::Vec; use core::any::Any; +use core::future::Future; use core::mem::MaybeUninit; +use core::task::{Poll, Waker}; use core::{mem, ptr}; -use async_channel::TryRecvError; +use hermit_sync::SpinMutex; use virtio::{le32, le64, pvirtq, virtq}; use self::error::VirtqError; @@ -88,10 +91,10 @@ impl From for u16 { } } -type UsedBufferTokenSender = async_channel::Sender; - // Public interface of Virtq +pub(crate) type VirtqMutex = Rc>>; + /// The Virtq trait unifies access to the two different Virtqueue types /// [packed::PackedVq] and [split::SplitVq]. /// @@ -106,7 +109,6 @@ pub trait Virtq { fn dispatch( &mut self, tkn: AvailBufferToken, - sender: Option, notif: bool, buffer_type: BufferType, ) -> Result<(), VirtqError>; @@ -125,21 +127,20 @@ pub trait Virtq { tkn: AvailBufferToken, buffer_type: BufferType, ) -> Result { - let (sender, receiver) = async_channel::bounded(1); - self.dispatch(tkn, Some(sender), false, buffer_type)?; + self.dispatch(tkn, false, buffer_type)?; self.disable_notifs(); let result: UsedBufferToken; // Keep Spinning until the receive queue is filled loop { - match receiver.try_recv() { - Ok(buffer_tkn) => { - result = buffer_tkn; - break; - } - Err(TryRecvError::Closed) => return Err(VirtqError::General), - Err(TryRecvError::Empty) => self.poll(), + // TODO: normally, we should check if the used buffer in question is the one + // we just made available. However, this shouldn't be a problem as the queue this + // function is called on makes use of this blocking dispatch function exclusively + // and thus dispatches cannot be interleaved. + if let Ok(buffer_tkn) = self.try_recv() { + result = buffer_tkn; + break; } } @@ -156,10 +157,7 @@ pub trait Virtq { /// Checks if new used descriptors have been written by the device. /// This activates the queue and polls the descriptor ring of the queue. - /// - /// * `TransferTokens` which hold an `await_queue` will be placed into - /// these queues. - fn poll(&mut self); + fn try_recv(&mut self) -> Result; /// Dispatches a batch of [AvailBufferToken]s. The buffers are provided to the queue in /// sequence. After the last buffer has been written, the queue marks the first buffer as available and triggers @@ -189,7 +187,6 @@ pub trait Virtq { fn dispatch_batch_await( &mut self, tkns: Vec<(AvailBufferToken, BufferType)>, - await_queue: UsedBufferTokenSender, notif: bool, ) -> Result<(), VirtqError>; @@ -204,6 +201,7 @@ pub trait Virtq { size: VqSize, index: VqIndex, features: virtio::F, + waker_registrar: Box, ) -> Result where Self: Sized; @@ -216,6 +214,34 @@ pub trait Virtq { fn index(&self) -> VqIndex; fn has_used_buffers(&self) -> bool; + + fn recv(&mut self, mutex_self: VirtqMutex) -> Recv { + Recv { vq: mutex_self } + } + + fn register_waker(&self, waker: Waker); +} + +pub(crate) struct Recv { + vq: Rc>>, +} + +impl Future for Recv { + type Output = Result; + + fn poll( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> Poll { + let mut vq = self.vq.lock(); + let try_result = vq.try_recv(); + if let Err(VirtqError::NoNewUsed) = try_result { + vq.register_waker(cx.waker().clone()); + Poll::Pending + } else { + Poll::Ready(try_result) + } + } } /// These methods are an implementation detail and are meant only for consumption by the default method @@ -242,7 +268,6 @@ trait VirtqPrivate { /// After this call, the buffers are no longer writable. fn transfer_token_from_buffer_token( buff_tkn: AvailBufferToken, - await_queue: Option, buffer_type: BufferType, ) -> TransferToken { let ctrl_desc = match buffer_type { @@ -252,7 +277,6 @@ trait VirtqPrivate { TransferToken { buff_tkn, - await_queue, ctrl_desc, } } @@ -334,11 +358,6 @@ pub struct TransferToken { /// Must be some in order to prevent drop /// upon reuse. buff_tkn: AvailBufferToken, - /// Structure which allows to await Transfers - /// If Some, finished TransferTokens will be placed here - /// as finished `Transfers`. If None, only the state - /// of the Token will be changed. - await_queue: Option, // Contains the [MemDescr] for the indirect table if the transfer is indirect. ctrl_desc: Option>, } @@ -616,6 +635,7 @@ pub mod error { FeatureNotSupported(virtio::F), AllocationError, IncompleteWrite, + NoNewUsed, } impl core::fmt::Debug for VirtqError { @@ -645,6 +665,9 @@ pub mod error { VirtqError::IncompleteWrite => { write!(f, "A sized object was partially initialized.") } + VirtqError::NoNewUsed => { + write!(f, "The queue does not contain any new used buffers.") + } } } } diff --git a/src/drivers/virtio/virtqueue/packed.rs b/src/drivers/virtio/virtqueue/packed.rs index b1ea248eee..6de18dabd8 100644 --- a/src/drivers/virtio/virtqueue/packed.rs +++ b/src/drivers/virtio/virtqueue/packed.rs @@ -6,6 +6,7 @@ use alloc::boxed::Box; use alloc::vec::Vec; use core::cell::Cell; use core::sync::atomic::{fence, Ordering}; +use core::task::Waker; use core::{ops, ptr}; use align_address::Align; @@ -23,8 +24,8 @@ use super::super::transport::mmio::{ComCfg, NotifCfg, NotifCtrl}; use super::super::transport::pci::{ComCfg, NotifCfg, NotifCtrl}; use super::error::VirtqError; use super::{ - AvailBufferToken, BufferType, MemDescrId, MemPool, TransferToken, UsedBufferToken, - UsedBufferTokenSender, Virtq, VirtqPrivate, VqIndex, VqSize, + AvailBufferToken, BufferType, MemDescrId, MemPool, TransferToken, UsedBufferToken, Virtq, + VirtqPrivate, VqIndex, VqSize, }; use crate::arch::mm::paging::{BasePageSize, PageSize}; use crate::arch::mm::{paging, VirtAddr}; @@ -128,21 +129,14 @@ impl DescriptorRing { } /// Polls poll index and sets the state of any finished TransferTokens. - /// If [TransferToken::await_queue] is available, the [UsedBufferToken] will be moved to the queue. - fn poll(&mut self) { + fn try_recv(&mut self) -> Result { let mut ctrl = self.get_read_ctrler(); - if let Some((mut tkn, written_len)) = ctrl.poll_next() { - if let Some(queue) = tkn.await_queue.take() { - // Place the TransferToken in a Transfer, which will hold ownership of the token - queue - .try_send(UsedBufferToken::from_avail_buffer_token( - tkn.buff_tkn, - written_len, - )) - .unwrap(); - } - } + ctrl.poll_next() + .map(|(tkn, written_len)| { + UsedBufferToken::from_avail_buffer_token(tkn.buff_tkn, written_len) + }) + .ok_or(VirtqError::NoNewUsed) } fn push_batch( @@ -526,6 +520,7 @@ pub struct PackedVq { /// device and is unique on a per device basis. index: VqIndex, last_next: Cell, + waker_registrar: Box, } // Public interface of PackedVq @@ -539,8 +534,8 @@ impl Virtq for PackedVq { self.drv_event.disable_notif(); } - fn poll(&mut self) { - self.descr_ring.poll(); + fn try_recv(&mut self) -> Result { + self.descr_ring.try_recv() } fn dispatch_batch( @@ -552,7 +547,7 @@ impl Virtq for PackedVq { assert!(!buffer_tkns.is_empty()); let transfer_tkns = buffer_tkns.into_iter().map(|(buffer_tkn, buffer_type)| { - Self::transfer_token_from_buffer_token(buffer_tkn, None, buffer_type) + Self::transfer_token_from_buffer_token(buffer_tkn, buffer_type) }); let next_idx = self.descr_ring.push_batch(transfer_tkns)?; @@ -581,18 +576,13 @@ impl Virtq for PackedVq { fn dispatch_batch_await( &mut self, buffer_tkns: Vec<(AvailBufferToken, BufferType)>, - await_queue: super::UsedBufferTokenSender, notif: bool, ) -> Result<(), VirtqError> { // Zero transfers are not allowed assert!(!buffer_tkns.is_empty()); let transfer_tkns = buffer_tkns.into_iter().map(|(buffer_tkn, buffer_type)| { - Self::transfer_token_from_buffer_token( - buffer_tkn, - Some(await_queue.clone()), - buffer_type, - ) + Self::transfer_token_from_buffer_token(buffer_tkn, buffer_type) }); let next_idx = self.descr_ring.push_batch(transfer_tkns)?; @@ -621,11 +611,10 @@ impl Virtq for PackedVq { fn dispatch( &mut self, buffer_tkn: AvailBufferToken, - sender: Option, notif: bool, buffer_type: BufferType, ) -> Result<(), VirtqError> { - let transfer_tkn = Self::transfer_token_from_buffer_token(buffer_tkn, sender, buffer_type); + let transfer_tkn = Self::transfer_token_from_buffer_token(buffer_tkn, buffer_type); let next_idx = self.descr_ring.push(transfer_tkn)?; if notif { @@ -655,6 +644,7 @@ impl Virtq for PackedVq { size: VqSize, index: VqIndex, features: virtio::F, + waker_registrar: Box, ) -> Result { // Currently we do not have support for in order use. // This steems from the fact, that the packedVq ReadCtrl currently is not @@ -738,6 +728,7 @@ impl Virtq for PackedVq { size: VqSize::from(vq_size), index, last_next: Default::default(), + waker_registrar, }) } @@ -749,6 +740,10 @@ impl Virtq for PackedVq { let desc = &self.descr_ring.ring[usize::from(self.descr_ring.poll_index)]; self.descr_ring.is_marked_used(desc.flags) } + + fn register_waker(&self, waker: core::task::Waker) { + (self.waker_registrar)(waker); + } } impl VirtqPrivate for PackedVq { diff --git a/src/drivers/virtio/virtqueue/split.rs b/src/drivers/virtio/virtqueue/split.rs index fb1d9c5908..dad953c644 100644 --- a/src/drivers/virtio/virtqueue/split.rs +++ b/src/drivers/virtio/virtqueue/split.rs @@ -6,6 +6,7 @@ use alloc::vec::Vec; use core::cell::UnsafeCell; use core::mem::{self, MaybeUninit}; use core::ptr; +use core::task::Waker; #[cfg(not(feature = "pci"))] use virtio::mmio::NotificationData; @@ -19,8 +20,8 @@ use super::super::transport::mmio::{ComCfg, NotifCfg, NotifCtrl}; use super::super::transport::pci::{ComCfg, NotifCfg, NotifCtrl}; use super::error::VirtqError; use super::{ - AvailBufferToken, BufferType, MemPool, TransferToken, UsedBufferToken, UsedBufferTokenSender, - Virtq, VirtqPrivate, VqIndex, VqSize, + AvailBufferToken, BufferType, MemPool, TransferToken, UsedBufferToken, Virtq, VirtqPrivate, + VqIndex, VqSize, }; use crate::arch::memory_barrier; use crate::arch::mm::{paging, VirtAddr}; @@ -98,51 +99,38 @@ impl DescrRing { Ok(next_idx) } - fn poll(&mut self) { - // We cannot use a simple while loop here because Rust cannot tell that [Self::used_ring_ref], - // [Self::read_idx] and [Self::token_ring] access separate fields of `self`. For this reason we - // need to move [Self::used_ring_ref] lines into a separate scope. - loop { - let used_elem; - { - if self.read_idx == self.used_ring().idx.to_ne() { - break; - } else { - let cur_ring_index = self.read_idx as usize % self.token_ring.len(); - used_elem = self.used_ring().ring()[cur_ring_index]; - } - } + fn try_recv(&mut self) -> Result { + if self.read_idx == self.used_ring().idx.to_ne() { + return Err(VirtqError::NoNewUsed); + } + let cur_ring_index = self.read_idx as usize % self.token_ring.len(); + let used_elem = self.used_ring().ring()[cur_ring_index]; - let mut tkn = self.token_ring[used_elem.id.to_ne() as usize] - .take() - .expect( - "The buff_id is incorrect or the reference to the TransferToken was misplaced.", - ); - - if let Some(queue) = tkn.await_queue.take() { - queue - .try_send(UsedBufferToken::from_avail_buffer_token( - tkn.buff_tkn, - used_elem.len.to_ne(), - )) - .unwrap() - } + let tkn = self.token_ring[used_elem.id.to_ne() as usize] + .take() + .expect( + "The buff_id is incorrect or the reference to the TransferToken was misplaced.", + ); - let mut id_ret_idx = u16::try_from(used_elem.id.to_ne()).unwrap(); - loop { - self.mem_pool.ret_id(super::MemDescrId(id_ret_idx)); - let cur_chain_elem = - unsafe { self.descr_table_mut()[usize::from(id_ret_idx)].assume_init() }; - if cur_chain_elem.flags.contains(virtq::DescF::NEXT) { - id_ret_idx = cur_chain_elem.next.to_ne(); - } else { - break; - } + // We return the indices of the now freed ring slots back to `mem_pool.` + let mut id_ret_idx = u16::try_from(used_elem.id.to_ne()).unwrap(); + loop { + self.mem_pool.ret_id(super::MemDescrId(id_ret_idx)); + let cur_chain_elem = + unsafe { self.descr_table_mut()[usize::from(id_ret_idx)].assume_init() }; + if cur_chain_elem.flags.contains(virtq::DescF::NEXT) { + id_ret_idx = cur_chain_elem.next.to_ne(); + } else { + break; } - - memory_barrier(); - self.read_idx = self.read_idx.wrapping_add(1); } + + memory_barrier(); + self.read_idx = self.read_idx.wrapping_add(1); + Ok(UsedBufferToken::from_avail_buffer_token( + tkn.buff_tkn, + used_elem.len.to_ne(), + )) } fn drv_enable_notif(&mut self) { @@ -169,6 +157,7 @@ pub struct SplitVq { index: VqIndex, notif_ctrl: NotifCtrl, + waker_registrar: Box, } impl Virtq for SplitVq { @@ -180,8 +169,8 @@ impl Virtq for SplitVq { self.ring.drv_disable_notif(); } - fn poll(&mut self) { - self.ring.poll() + fn try_recv(&mut self) -> Result { + self.ring.try_recv() } fn dispatch_batch( @@ -195,7 +184,6 @@ impl Virtq for SplitVq { fn dispatch_batch_await( &mut self, _tkns: Vec<(AvailBufferToken, BufferType)>, - _await_queue: super::UsedBufferTokenSender, _notif: bool, ) -> Result<(), VirtqError> { unimplemented!() @@ -204,11 +192,10 @@ impl Virtq for SplitVq { fn dispatch( &mut self, buffer_tkn: AvailBufferToken, - sender: Option, notif: bool, buffer_type: BufferType, ) -> Result<(), VirtqError> { - let transfer_tkn = Self::transfer_token_from_buffer_token(buffer_tkn, sender, buffer_type); + let transfer_tkn = Self::transfer_token_from_buffer_token(buffer_tkn, buffer_type); let next_idx = self.ring.push(transfer_tkn)?; if notif { @@ -236,6 +223,7 @@ impl Virtq for SplitVq { size: VqSize, index: VqIndex, features: virtio::F, + waker_registrar: Box, ) -> Result { // Get a handler to the queues configuration area. let mut vq_handler = match com_cfg.select_vq(index.into()) { @@ -316,6 +304,7 @@ impl Virtq for SplitVq { notif_ctrl, size: VqSize(size), index, + waker_registrar, }) } @@ -326,6 +315,10 @@ impl Virtq for SplitVq { fn has_used_buffers(&self) -> bool { self.ring.read_idx != self.ring.used_ring().idx.to_ne() } + + fn register_waker(&self, waker: Waker) { + (self.waker_registrar)(waker) + } } impl VirtqPrivate for SplitVq { diff --git a/src/drivers/vsock/mod.rs b/src/drivers/vsock/mod.rs index 7ab72e3008..07108e4233 100644 --- a/src/drivers/vsock/mod.rs +++ b/src/drivers/vsock/mod.rs @@ -24,12 +24,7 @@ use crate::drivers::virtio::virtqueue::{ use crate::drivers::vsock::pci::VsockDevCfgRaw; use crate::mm::device_alloc::DeviceAlloc; -fn fill_queue( - vq: &mut dyn Virtq, - num_packets: u16, - packet_size: u32, - poll_sender: async_channel::Sender, -) { +fn fill_queue(vq: &mut dyn Virtq, num_packets: u16, packet_size: u32) { for _ in 0..num_packets { let buff_tkn = match AvailBufferToken::new( vec![], @@ -51,12 +46,7 @@ fn fill_queue( // BufferTokens are directly provided to the queue // TransferTokens are directly dispatched // Transfers will be awaited at the queue - match vq.dispatch( - buff_tkn, - Some(poll_sender.clone()), - false, - BufferType::Direct, - ) { + match vq.dispatch(buff_tkn, false, BufferType::Direct) { Ok(_) => (), Err(err) => { error!("{:#?}", err); @@ -68,19 +58,14 @@ fn fill_queue( pub(crate) struct RxQueue { vq: Option>, - poll_sender: async_channel::Sender, - poll_receiver: async_channel::Receiver, packet_size: u32, } impl RxQueue { pub fn new() -> Self { - let (poll_sender, poll_receiver) = async_channel::unbounded(); - Self { vq: None, - poll_sender, - poll_receiver, + packet_size: crate::VSOCK_PACKET_SIZE, } } @@ -89,12 +74,7 @@ impl RxQueue { const BUFF_PER_PACKET: u16 = 2; let num_packets: u16 = u16::from(vq.size()) / BUFF_PER_PACKET; info!("num_packets {}", num_packets); - fill_queue( - vq.as_mut(), - num_packets, - self.packet_size, - self.poll_sender.clone(), - ); + fill_queue(vq.as_mut(), num_packets, self.packet_size); self.vq = Some(vq); } @@ -112,22 +92,7 @@ impl RxQueue { } fn get_next(&mut self) -> Option { - let transfer = self.poll_receiver.try_recv(); - - transfer - .or_else(|_| { - // Check if any not yet provided transfers are in the queue. - self.poll(); - - self.poll_receiver.try_recv() - }) - .ok() - } - - fn poll(&mut self) { - if let Some(ref mut vq) = self.vq { - vq.poll(); - } + self.vq.as_mut().unwrap().try_recv().ok() } pub fn process_packet(&mut self, mut f: F) @@ -144,7 +109,7 @@ impl RxQueue { if let Some(ref mut vq) = self.vq { f(&header, &packet[..]); - fill_queue(vq.as_mut(), 1, self.packet_size, self.poll_sender.clone()); + fill_queue(vq.as_mut(), 1, self.packet_size); } else { panic!("Invalid length of receive queue"); } @@ -185,7 +150,7 @@ impl TxQueue { fn poll(&mut self) { if let Some(ref mut vq) = self.vq { - vq.poll(); + while vq.try_recv().is_ok() {} } } @@ -198,9 +163,8 @@ impl TxQueue { { // We need to poll to get the queue to remove elements from the table and make space for // what we are about to add + self.poll(); if let Some(ref mut vq) = self.vq { - vq.poll(); - assert!(len < usize::try_from(self.packet_length).unwrap()); let mut packet = Vec::with_capacity_in(len, DeviceAlloc); let result = unsafe { @@ -213,8 +177,7 @@ impl TxQueue { let buff_tkn = AvailBufferToken::new(vec![BufferElem::Vector(packet)], vec![]).unwrap(); - vq.dispatch(buff_tkn, None, false, BufferType::Direct) - .unwrap(); + vq.dispatch(buff_tkn, false, BufferType::Direct).unwrap(); result } else { @@ -225,19 +188,13 @@ impl TxQueue { pub(crate) struct EventQueue { vq: Option>, - poll_sender: async_channel::Sender, - poll_receiver: async_channel::Receiver, packet_size: u32, } impl EventQueue { pub fn new() -> Self { - let (poll_sender, poll_receiver) = async_channel::unbounded(); - Self { vq: None, - poll_sender, - poll_receiver, packet_size: 128u32, } } @@ -248,12 +205,7 @@ impl EventQueue { fn add(&mut self, mut vq: Box) { const BUFF_PER_PACKET: u16 = 2; let num_packets: u16 = u16::from(vq.size()) / BUFF_PER_PACKET; - fill_queue( - vq.as_mut(), - num_packets, - self.packet_size, - self.poll_sender.clone(), - ); + fill_queue(vq.as_mut(), num_packets, self.packet_size); self.vq = Some(vq); } @@ -405,6 +357,7 @@ impl VirtioVsockDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(0u16), self.dev_cfg.features.into(), + Box::new(|_waker| {}), ) .unwrap(), )); @@ -418,6 +371,7 @@ impl VirtioVsockDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(1u16), self.dev_cfg.features.into(), + Box::new(|_waker| {}), ) .unwrap(), )); @@ -432,6 +386,7 @@ impl VirtioVsockDriver { VqSize::from(VIRTIO_MAX_QUEUE_SIZE), VqIndex::from(2u16), self.dev_cfg.features.into(), + Box::new(|_waker| {}), ) .unwrap(), ));