Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

virtq: async support #1399

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ virtio = { package = "virtio-spec", version = "0.1", features = ["alloc", "mmio"
ahash = { version = "0.8", default-features = false }
align-address = "0.3"
anstyle = { version = "1", default-features = false }
async-channel = { version = "2.3", default-features = false }
async-lock = { version = "3.4.0", default-features = false }
async-trait = "0.1.82"
bit_field = "0.10"
Expand Down
47 changes: 43 additions & 4 deletions src/drivers/fs/virtio_fs.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use alloc::boxed::Box;
use alloc::rc::Rc;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use core::str;
use core::task::Waker;

use hermit_sync::SpinMutex;
use pci_types::InterruptLine;
use virtio::fs::ConfigVolatileFieldAccess;
use virtio::FeatureBits;
use volatile::access::ReadOnly;
use volatile::VolatileRef;

use crate::config::VIRTIO_MAX_QUEUE_SIZE;
use crate::drivers::pci::get_filesystem_driver;
use crate::drivers::virtio::error::VirtioFsError;
#[cfg(not(feature = "pci"))]
use crate::drivers::virtio::transport::mmio::{ComCfg, IsrStatus, NotifCfg};
Expand All @@ -18,9 +22,11 @@ use crate::drivers::virtio::transport::pci::{ComCfg, IsrStatus, NotifCfg};
use crate::drivers::virtio::virtqueue::error::VirtqError;
use crate::drivers::virtio::virtqueue::split::SplitVq;
use crate::drivers::virtio::virtqueue::{
AvailBufferToken, BufferElem, BufferType, Virtq, VqIndex, VqSize,
AvailBufferToken, BufferElem, BufferType, Virtq, VirtqMutex, VqIndex, VqSize,
};
use crate::executor::block_on;
use crate::fs::fuse::{self, FuseInterface, Rsp, RspHeader};
use crate::io;
use crate::mm::device_alloc::DeviceAlloc;

/// A wrapper struct for the raw configuration structure.
Expand All @@ -42,8 +48,9 @@ pub(crate) struct VirtioFsDriver {
pub(super) com_cfg: ComCfg,
pub(super) isr_stat: IsrStatus,
pub(super) notif_cfg: NotifCfg,
pub(super) vqueues: Vec<Box<dyn Virtq>>,
pub(super) vqueues: Vec<VirtqMutex>,
pub(super) irq: InterruptLine,
pub(super) waker: Option<Waker>,
}

// Backend-independent interface for Virtio network driver
Expand Down Expand Up @@ -136,16 +143,39 @@ impl VirtioFsDriver {
VqSize::from(VIRTIO_MAX_QUEUE_SIZE),
VqIndex::from(i),
self.dev_cfg.features.into(),
Box::new(|waker| get_filesystem_driver().unwrap().lock().waker = Some(waker)),
)
.unwrap();
self.vqueues.push(Box::new(vq));
self.vqueues.push(Rc::new(SpinMutex::new(Box::new(vq))));
}

// At this point the device is "live"
self.com_cfg.drv_ok();

Ok(())
}

pub(crate) fn handle_interrupt(&mut self) {
let status = self.isr_stat.is_queue_interrupt();

#[cfg(not(feature = "pci"))]
if status.contains(virtio::mmio::InterruptStatus::CONFIGURATION_CHANGE_NOTIFICATION) {
info!("Configuration changes are not possible! Aborting");
todo!("Implement possibility to change config on the fly...")
}

#[cfg(feature = "pci")]
if status.contains(virtio::pci::IsrStatus::DEVICE_CONFIGURATION_INTERRUPT) {
info!("Configuration changes are not possible! Aborting");
todo!("Implement possibility to change config on the fly...")
}

if let Some(waker) = &self.waker {
waker.wake_by_ref();
}

self.isr_stat.acknowledge();
}
}

impl FuseInterface for VirtioFsDriver {
Expand Down Expand Up @@ -179,8 +209,17 @@ impl FuseInterface for VirtioFsDriver {
};

let buffer_tkn = AvailBufferToken::new(send, recv).unwrap();
let vq = &mut self.vqueues[1];
let vq_clone = vq.clone();
let recv_future = {
let mut vq_guard = vq.lock();
vq_guard.dispatch(buffer_tkn, false, BufferType::Direct)?;
dbg!();
vq_guard.recv(vq_clone)
};
let mut transfer_result =
self.vqueues[1].dispatch_blocking(buffer_tkn, BufferType::Direct)?;
block_on(async { recv_future.await.or(Err(io::Error::EIO)) }, None)
.or(Err(VirtqError::General))?;

let headers = transfer_result.used_recv_buff.pop_front_downcast().unwrap();
let payload = transfer_result.used_recv_buff.pop_front_vec();
Expand Down
1 change: 1 addition & 0 deletions src/drivers/fs/virtio_pci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl VirtioFsDriver {
notif_cfg,
vqueues: Vec::new(),
irq: device.get_irq().unwrap(),
waker: None,
})
}

Expand Down
62 changes: 15 additions & 47 deletions src/drivers/net/virtio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,11 @@ impl CtrlQueue {

pub struct RxQueues {
vqs: Vec<Box<dyn Virtq>>,
poll_sender: async_channel::Sender<UsedBufferToken>,
poll_receiver: async_channel::Receiver<UsedBufferToken>,
packet_size: u32,
}

impl RxQueues {
pub fn new(vqs: Vec<Box<dyn Virtq>>, dev_cfg: &NetDevCfg) -> Self {
let (poll_sender, poll_receiver) = async_channel::unbounded();

// See Virtio specification v1.1 - 5.1.6.3.1
//
let packet_size = if dev_cfg.features.contains(virtio::net::F::MRG_RXBUF) {
Expand All @@ -73,12 +69,7 @@ impl RxQueues {
dev_cfg.raw.as_ptr().mtu().read().to_ne().into()
};

Self {
vqs,
poll_sender,
poll_receiver,
packet_size,
}
Self { vqs, packet_size }
}

/// Takes care of handling packets correctly which need some processing after being received.
Expand All @@ -95,32 +86,12 @@ impl RxQueues {
fn add(&mut self, mut vq: Box<dyn Virtq>) {
const BUFF_PER_PACKET: u16 = 2;
let num_packets: u16 = u16::from(vq.size()) / BUFF_PER_PACKET;
fill_queue(
vq.as_mut(),
num_packets,
self.packet_size,
self.poll_sender.clone(),
);
fill_queue(vq.as_mut(), num_packets, self.packet_size);
self.vqs.push(vq);
}

fn get_next(&mut self) -> Option<UsedBufferToken> {
let transfer = self.poll_receiver.try_recv();

transfer
.or_else(|_| {
// Check if any not yet provided transfers are in the queue.
self.poll();

self.poll_receiver.try_recv()
})
.ok()
}

fn poll(&mut self) {
for vq in &mut self.vqs {
vq.poll();
}
self.vqs[0].try_recv().ok()
}

fn enable_notifs(&mut self) {
Expand All @@ -140,12 +111,7 @@ impl RxQueues {
}
}

fn fill_queue(
vq: &mut dyn Virtq,
num_packets: u16,
packet_size: u32,
poll_sender: async_channel::Sender<UsedBufferToken>,
) {
fn fill_queue(vq: &mut dyn Virtq, num_packets: u16, packet_size: u32) {
for _ in 0..num_packets {
let buff_tkn = match AvailBufferToken::new(
vec![],
Expand All @@ -167,12 +133,7 @@ fn fill_queue(
// BufferTokens are directly provided to the queue
// TransferTokens are directly dispatched
// Transfers will be awaited at the queue
match vq.dispatch(
buff_tkn,
Some(poll_sender.clone()),
false,
BufferType::Direct,
) {
match vq.dispatch(buff_tkn, false, BufferType::Direct) {
Ok(_) => (),
Err(err) => {
error!("{:#?}", err);
Expand Down Expand Up @@ -220,7 +181,9 @@ impl TxQueues {

fn poll(&mut self) {
for vq in &mut self.vqs {
vq.poll();
// We don't do anything with the buffers but we need to receive them for the
// ring slots to be emptied and the memory from the previous transfers to be freed.
while vq.try_recv().is_ok() {}
}
}

Expand Down Expand Up @@ -339,7 +302,7 @@ impl NetworkDriver for VirtioNetDriver {
.unwrap();

self.send_vqs.vqs[0]
.dispatch(buff_tkn, None, false, BufferType::Direct)
.dispatch(buff_tkn, false, BufferType::Direct)
.unwrap();

result
Expand Down Expand Up @@ -373,7 +336,6 @@ impl NetworkDriver for VirtioNetDriver {
self.recv_vqs.vqs[0].as_mut(),
num_buffers,
self.recv_vqs.packet_size,
self.recv_vqs.poll_sender.clone(),
);

let vec_data = packets.into_iter().flatten().collect();
Expand Down Expand Up @@ -679,6 +641,7 @@ impl VirtioNetDriver {
VqSize::from(VIRTIO_MAX_QUEUE_SIZE),
VqIndex::from(self.num_vqs),
self.dev_cfg.features.into(),
Box::new(|_waker| {}),
)
.unwrap(),
)));
Expand All @@ -690,6 +653,7 @@ impl VirtioNetDriver {
VqSize::from(VIRTIO_MAX_QUEUE_SIZE),
VqIndex::from(self.num_vqs),
self.dev_cfg.features.into(),
Box::new(|_waker| {}),
)
.unwrap(),
)));
Expand Down Expand Up @@ -754,6 +718,7 @@ impl VirtioNetDriver {
VqSize::from(VIRTIO_MAX_QUEUE_SIZE),
VqIndex::from(2 * i),
self.dev_cfg.features.into(),
Box::new(|_waker| {}),
)
.unwrap();
// Interrupt for receiving packets is wanted
Expand All @@ -767,6 +732,7 @@ impl VirtioNetDriver {
VqSize::from(VIRTIO_MAX_QUEUE_SIZE),
VqIndex::from(2 * i + 1),
self.dev_cfg.features.into(),
Box::new(|_waker| {}),
)
.unwrap();
// Interrupt for comunicating that a sended packet left, is not needed
Expand All @@ -780,6 +746,7 @@ impl VirtioNetDriver {
VqSize::from(VIRTIO_MAX_QUEUE_SIZE),
VqIndex::from(2 * i),
self.dev_cfg.features.into(),
Box::new(|_waker| {}),
)
.unwrap();
// Interrupt for receiving packets is wanted
Expand All @@ -793,6 +760,7 @@ impl VirtioNetDriver {
VqSize::from(VIRTIO_MAX_QUEUE_SIZE),
VqIndex::from(2 * i + 1),
self.dev_cfg.features.into(),
Box::new(|_waker| {}),
)
.unwrap();
// Interrupt for comunicating that a sended packet left, is not needed
Expand Down
2 changes: 1 addition & 1 deletion src/drivers/virtio/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod pci;
))]
use crate::arch::kernel::mmio as hardware;
#[cfg(all(
any(feature = "vsock", feature = "tcp", feature = "udp"),
any(feature = "vsock", feature = "tcp", feature = "udp", feature = "fuse"),
feature = "pci"
))]
use crate::drivers::pci as hardware;
21 changes: 18 additions & 3 deletions src/drivers/virtio/transport/pci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use volatile::{VolatilePtr, VolatileRef};

#[cfg(all(
not(feature = "rtl8139"),
any(feature = "tcp", feature = "udp", feature = "vsock")
any(feature = "tcp", feature = "udp", feature = "vsock", feature = "fuse")
))]
use crate::arch::kernel::interrupts::*;
use crate::arch::memory_barrier;
Expand All @@ -34,7 +34,7 @@ use crate::drivers::pci::PciDevice;
use crate::drivers::virtio::error::VirtioError;
#[cfg(all(
not(feature = "rtl8139"),
any(feature = "tcp", feature = "udp", feature = "vsock")
any(feature = "tcp", feature = "udp", feature = "vsock", feature = "fuse")
))]
use crate::drivers::virtio::transport::hardware;
#[cfg(feature = "vsock")]
Expand Down Expand Up @@ -993,7 +993,22 @@ pub(crate) fn init_device(
Ok(drv)
}
#[cfg(feature = "fuse")]
VirtioDriver::FileSystem(_) => Ok(drv),
VirtioDriver::FileSystem(_) => {
let irq = device.get_irq().unwrap();

info!("Install virtio interrupt handler at line {}", irq);

fn file_system_handler() {
if let Some(driver) = hardware::get_filesystem_driver() {
driver.lock().handle_interrupt();
}
}

irq_install_handler(irq, file_system_handler);
add_irq_name(irq, "virtio file system");

Ok(drv)
}
},
Err(virt_err) => Err(virt_err),
}
Expand Down
Loading
Loading