Skip to content

Commit

Permalink
fix: add a garbage collector
Browse files Browse the repository at this point in the history
When a job is created and completed, but the result is not collected,
the job would previously remain in the job map. With these jobs not
being collected, they would remain in the job map forever. The garbage
collector removes them after they have been stale for a while.
  • Loading branch information
Erin van der Veen committed May 16, 2024
1 parent 101d99d commit a634f38
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 29 deletions.
7 changes: 4 additions & 3 deletions genealogos-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ path = "src/main.rs"

[dependencies]
chrono.workspace = true
rocket.workspace = true
genealogos = { workspace = true, features = [ "rocket" ] }
serde_json.workspace = true
serde.workspace = true
log.workspace = true
nixtract.workspace = true
rocket.workspace = true
serde.workspace = true
serde_json.workspace = true

[dev-dependencies]
env_logger.workspace = true
Expand Down
24 changes: 2 additions & 22 deletions genealogos-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,9 @@ use rocket::tokio;

use crate::messages::{self, Result, StatusEnum, StatusResponse};

pub type JobId = u16;
pub mod job_map;

/// This JobMap holds the status of all jobs that are currently running
pub type JobMap = Arc<rocket::tokio::sync::Mutex<std::collections::HashMap<JobId, JobStatus>>>;

pub enum JobStatus {
Stopped,
/// The job is still running, the receiver is used receive status messages from worker threads
Running(Box<dyn genealogos::backend::BackendHandle + Send>),
Done(String, time::Duration),
Error(String),
}

impl ToString for JobStatus {
fn to_string(&self) -> String {
match self {
JobStatus::Running(_) => "running".to_string(),
JobStatus::Done(_, _) => "done".to_string(),
JobStatus::Stopped => "stopped".to_string(),
JobStatus::Error(e) => e.to_owned(),
}
}
}
use job_map::{JobId, JobMap, JobStatus};

#[rocket::get("/create?<installable>&<bom_format>")]
pub async fn create(
Expand Down
92 changes: 92 additions & 0 deletions genealogos-api/src/jobs/job_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use rocket::tokio::time;

pub type JobMap = std::sync::Arc<rocket::tokio::sync::Mutex<JobHashMap>>;

/// This JobMap holds the status of all jobs that are currently running
pub struct JobHashMap(std::collections::HashMap<JobId, JobMapEntry>);

/// A single entry in the job map, contains all data related to the job and some
/// metadata required for the garbage collector.
pub struct JobMapEntry {
/// Stores the last time this job was accesed. Any job that is not accessed
/// for a certain amount of time is considered stale and will be removed.
last_updated: time::Instant,
/// The status of the job
status: JobStatus,
}

pub type JobId = u16;

/// The status of a single job
pub enum JobStatus {
/// The job has been stopped and is not running anymore, or it has not been started yet
Stopped,
/// The job is still running, the receiver is used receive status messages from worker threads
Running(Box<dyn genealogos::backend::BackendHandle + Send>),
/// The job has finished, the string contains the output of the job
/// and the duration contains how long it took to finish
Done(String, time::Duration),
/// The job has thrown an error, the string contains the error message
Error(String),
}

impl ToString for JobStatus {
fn to_string(&self) -> String {
match self {
JobStatus::Running(_) => "running".to_string(),
JobStatus::Done(_, _) => "done".to_string(),
JobStatus::Stopped => "stopped".to_string(),
JobStatus::Error(e) => e.to_owned(),
}
}
}

impl JobMapEntry {
pub fn new(status: JobStatus) -> Self {
Self {
last_updated: time::Instant::now(),
status,
}
}
}

impl JobHashMap {
pub fn insert(&mut self, job_id: JobId, job_status: JobStatus) {
self.0.insert(job_id, JobMapEntry::new(job_status));
}

pub fn get(&mut self, job_id: &JobId) -> Option<&JobStatus> {
self.0.get(job_id).map(|entry| &entry.status)
}

pub fn remove(&mut self, job_id: &JobId) -> Option<JobStatus> {
self.0.remove(job_id).map(|entry| entry.status)
}

pub(crate) fn new() -> Self {
Self(std::collections::HashMap::new())
}
}

/// The garbage collection task that should be run every few minutes or so to prevent the job map from growing too large.
pub async fn garbage_collector(
job_map: JobMap,
interval: time::Duration,
remove_after: time::Duration,
) {
let mut interval = time::interval(interval);

log::info!("Started the garbage collector");

loop {
log::info!("Collecting garbage");
interval.tick().await;

for (job_id, job_entry) in job_map.lock().await.0.iter_mut() {
if job_entry.last_updated.elapsed() > remove_after {
log::info!("Removing a stale job");
job_map.lock().await.remove(job_id);
}
}
}
}
25 changes: 21 additions & 4 deletions genealogos-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{atomic, Arc};
use genealogos::args::BomArg;
use genealogos::backend::Backend;
use genealogos::bom::Bom;
use jobs::job_map::{self, garbage_collector};
use rocket::http::Status;
use rocket::response::{content, status};
use rocket::serde::json::Json;
Expand Down Expand Up @@ -72,6 +73,9 @@ fn analyze(installable: &str, bom_format: Option<BomArg>) -> Result<messages::An

#[rocket::launch]
fn rocket() -> _ {
let job_map = Arc::new(Mutex::new(job_map::JobHashMap::new()));

let job_map_clone = job_map.clone();
rocket::build()
.attach(rocket::fairing::AdHoc::on_response("cors", |_req, resp| {
Box::pin(async move {
Expand All @@ -81,6 +85,22 @@ fn rocket() -> _ {
));
})
}))
.attach(rocket::fairing::AdHoc::on_liftoff(
"garbage_collector",
|_| {
Box::pin(async move {
rocket::tokio::spawn(async move {
let interval = std::time::Duration::from_secs(10);
garbage_collector(
job_map_clone,
interval,
std::time::Duration::from_secs(5 * 60),
)
.await;
});
})
},
))
.mount("/", rocket::routes![index])
.mount("/api", rocket::routes![analyze])
.register("/api", rocket::catchers![handle_errors])
Expand All @@ -89,10 +109,7 @@ fn rocket() -> _ {
rocket::routes![jobs::create, jobs::status, jobs::result],
)
.register("/api/jobs/", rocket::catchers![handle_errors])
.manage(Arc::new(Mutex::new(std::collections::HashMap::<
jobs::JobId,
jobs::JobStatus,
>::new())))
.manage(job_map)
.manage(atomic::AtomicU16::new(0))
}

Expand Down

0 comments on commit a634f38

Please sign in to comment.