Skip to content

Commit

Permalink
Fix slow process exit issue
Browse files Browse the repository at this point in the history
The slow process exit is caused by two issues.

1. Blocking tokio operations.

This can either be
caused by `spawn_blocking` (which is internally used by a lot of tokio
std operations) and using block_on.

The "guard" pattern can especially causes this because tokio is not
expecting to block in a runtime thread.

2. Not receiving destroy event.

We listen for the destroy event to determine if the removal is
successful. However we only `.await` it after sending the removal
call. So if the removal takes a short time, then the container
could be destroyed before we start listening (since future does
nothing until polled). This is fixed by spawning.
  • Loading branch information
nbdd0121 committed Mar 4, 2024
1 parent ec636b6 commit 0dc36d3
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 59 deletions.
37 changes: 17 additions & 20 deletions src/docker/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,29 @@ pub struct Container {
pub(super) remove_event: Shared<BoxFuture<'static, Option<EventMessage>>>,
}

pub struct ContainerGuard(Option<Container>, Timeout);

impl Drop for ContainerGuard {
fn drop(&mut self) {
let container = self.0.take().unwrap();
let timeout = self.1;
let _ = futures::executor::block_on(container.remove(timeout));
}
}

impl Container {
pub fn id(&self) -> &str {
&self.id
}

pub fn guard(&self, timeout: Timeout) -> ContainerGuard {
ContainerGuard(Some(self.clone()), timeout)
}

pub async fn remove(&self, timeout: Timeout) -> Result<()> {
self.rename(format!("removing-{}", self.id)).await?;
let options = bollard::container::RemoveContainerOptions {
force: true,
..Default::default()
};
let _ = self.docker.remove_container(&self.id, Some(options)).await;
log::info!("Removing container {}", self.id);

// Since we passed "--rm" flag, docker will automatically start removing the container.
// Ignore any error for manual removal.
let _: Result<()> = async {
self.rename(format!("removing-{}", self.id)).await?;
let options = bollard::container::RemoveContainerOptions {
force: true,
..Default::default()
};
self.docker
.remove_container(&self.id, Some(options))
.await?;
Ok(())
}
.await;

if let Timeout::Some(duration) = timeout {
let _ = tokio::time::timeout(duration, self.remove_event.clone()).await;
} else {
Expand Down
15 changes: 6 additions & 9 deletions src/docker/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,11 @@ fn container_removed_future(
..Default::default()
};

let removed = docker
.events(Some(options))
.map(|evt| evt.ok())
.take(1)
.collect::<Vec<_>>()
.map(|vec| vec.into_iter().next().flatten())
.boxed()
.shared();
let mut events = docker.events(Some(options));

removed
// Spawn the future to start listening event.
tokio::spawn(async move { events.next().await?.ok() })
.map(|x| x.ok().flatten())
.boxed()
.shared()
}
75 changes: 45 additions & 30 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use cli::{Device, LogFormat, Symlink, Timeout};
use docker::{Container, Docker};
use hotplug::{Event as HotPlugEvent, HotPlug, PluggedDevice};

use std::{fmt::Display, path::Path, process::ExitCode};
use std::pin::pin;
use std::{fmt::Display, path::Path};
use tokio_stream::StreamExt;

use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -148,9 +149,9 @@ fn run_hotplug(
}
}

async fn hotplug_main() -> Result<ExitCode> {
async fn hotplug_main() -> Result<u8> {
let args = Args::parse();
let mut status = ExitCode::SUCCESS;
let mut status = 0;

match args.action {
Action::Run {
Expand Down Expand Up @@ -184,7 +185,6 @@ async fn hotplug_main() -> Result<ExitCode> {

let docker = Docker::connect_with_defaults()?;
let container = docker.run(docker_args).await?;
let _guard = container.guard(remove_timeout);
let _ = container.pipe_signals();

let hub_path = root_device.hub()?.syspath().to_owned();
Expand All @@ -197,45 +197,60 @@ async fn hotplug_main() -> Result<ExitCode> {
}
};

let stream = tokio_stream::empty()
let stream = pin!(tokio_stream::empty()
.merge(hotplug_stream)
.merge(container_stream);

tokio::pin!(stream);
while let Some(event) = stream.next().await {
let event = event?;
info!("{}", event);
match event {
Event::Remove(dev) if dev.syspath() == hub_path => {
info!("Hub device detached. Stopping container.");
status = ExitCode::from(root_unplugged_exit_code);
container.kill(15).await?;
break;
}
Event::Stopped(_, code) => {
if code >= 0 && code < 256 && (code as u8) != root_unplugged_exit_code {
status = ExitCode::from(code as u8);
} else {
status = ExitCode::FAILURE;
.merge(container_stream));

let result: Result<()> = async {
tokio::pin!(stream);
while let Some(event) = stream.next().await {
let event = event?;
info!("{}", event);
match event {
Event::Remove(dev) if dev.syspath() == hub_path => {
info!("Hub device detached. Stopping container.");
status = root_unplugged_exit_code;
container.kill(15).await?;
break;
}
Event::Stopped(_, code) => {
status = 1;
if let Ok(code) = u8::try_from(code) {
// Use the container exit code, but only if it won't be confused
// with the pre-defined root_unplugged_exit_code.
if code != root_unplugged_exit_code {
status = code;
}
} else {
status = 1;
}
break;
}
break;
_ => {}
}
_ => {}
}
Ok(())
}
.await;

let _ = container.remove(remove_timeout).await;
result?
}
};

Ok(status)
}

#[tokio::main]
async fn main() -> ExitCode {
match hotplug_main().await {
async fn main() {
let code = match hotplug_main().await {
Ok(code) => code,
Err(err) => {
let _ = eprintln!("Error: {err:?}");
ExitCode::FAILURE
eprintln!("Error: {err:?}");
1
}
}
};
// Upon returning from `main`, tokio will attempt to shutdown, but if there're any blocking
// operation (e.g. fs operations), then the shutdown will hang.
std::process::exit(code.into());
}

0 comments on commit 0dc36d3

Please sign in to comment.