diff --git a/genealogos-api/Cargo.toml b/genealogos-api/Cargo.toml index 7730789..1276b0a 100644 --- a/genealogos-api/Cargo.toml +++ b/genealogos-api/Cargo.toml @@ -15,11 +15,12 @@ path = "src/main.rs" [dependencies] chrono.workspace = true -rocket.workspace = true genealogos = { workspace = true, features = [ "rocket" ] } -serde_json.workspace = true -serde.workspace = true +log.workspace = true nixtract.workspace = true +rocket.workspace = true +serde.workspace = true +serde_json.workspace = true [dev-dependencies] env_logger.workspace = true diff --git a/genealogos-api/src/jobs.rs b/genealogos-api/src/jobs.rs index fb4428b..f3cc532 100644 --- a/genealogos-api/src/jobs.rs +++ b/genealogos-api/src/jobs.rs @@ -9,29 +9,9 @@ use rocket::tokio; use crate::messages::{self, Result, StatusEnum, StatusResponse}; -pub type JobId = u16; +pub mod job_map; -/// This JobMap holds the status of all jobs that are currently running -pub type JobMap = Arc>>; - -pub enum JobStatus { - Stopped, - /// The job is still running, the receiver is used receive status messages from worker threads - Running(Box), - Done(String, time::Duration), - Error(String), -} - -impl ToString for JobStatus { - fn to_string(&self) -> String { - match self { - JobStatus::Running(_) => "running".to_string(), - JobStatus::Done(_, _) => "done".to_string(), - JobStatus::Stopped => "stopped".to_string(), - JobStatus::Error(e) => e.to_owned(), - } - } -} +use job_map::{JobId, JobMap, JobStatus}; #[rocket::get("/create?&")] pub async fn create( diff --git a/genealogos-api/src/jobs/job_map.rs b/genealogos-api/src/jobs/job_map.rs new file mode 100644 index 0000000..923a664 --- /dev/null +++ b/genealogos-api/src/jobs/job_map.rs @@ -0,0 +1,92 @@ +use rocket::tokio::time; + +pub type JobMap = std::sync::Arc>; + +/// This JobMap holds the status of all jobs that are currently running +pub struct JobHashMap(std::collections::HashMap); + +/// A single entry in the job map, contains all data related to the job and some +/// metadata required for the garbage collector. +pub struct JobMapEntry { + /// Stores the last time this job was accesed. Any job that is not accessed + /// for a certain amount of time is considered stale and will be removed. + last_updated: time::Instant, + /// The status of the job + status: JobStatus, +} + +pub type JobId = u16; + +/// The status of a single job +pub enum JobStatus { + /// The job has been stopped and is not running anymore, or it has not been started yet + Stopped, + /// The job is still running, the receiver is used receive status messages from worker threads + Running(Box), + /// The job has finished, the string contains the output of the job + /// and the duration contains how long it took to finish + Done(String, time::Duration), + /// The job has thrown an error, the string contains the error message + Error(String), +} + +impl ToString for JobStatus { + fn to_string(&self) -> String { + match self { + JobStatus::Running(_) => "running".to_string(), + JobStatus::Done(_, _) => "done".to_string(), + JobStatus::Stopped => "stopped".to_string(), + JobStatus::Error(e) => e.to_owned(), + } + } +} + +impl JobMapEntry { + pub fn new(status: JobStatus) -> Self { + Self { + last_updated: time::Instant::now(), + status, + } + } +} + +impl JobHashMap { + pub fn insert(&mut self, job_id: JobId, job_status: JobStatus) { + self.0.insert(job_id, JobMapEntry::new(job_status)); + } + + pub fn get(&mut self, job_id: &JobId) -> Option<&JobStatus> { + self.0.get(job_id).map(|entry| &entry.status) + } + + pub fn remove(&mut self, job_id: &JobId) -> Option { + self.0.remove(job_id).map(|entry| entry.status) + } + + pub(crate) fn new() -> Self { + Self(std::collections::HashMap::new()) + } +} + +/// The garbage collection task that should be run every few minutes or so to prevent the job map from growing too large. +pub async fn garbage_collector( + job_map: JobMap, + interval: time::Duration, + remove_after: time::Duration, +) { + let mut interval = time::interval(interval); + + log::info!("Started the garbage collector"); + + loop { + log::info!("Collecting garbage"); + interval.tick().await; + + for (job_id, job_entry) in job_map.lock().await.0.iter_mut() { + if job_entry.last_updated.elapsed() > remove_after { + log::info!("Removing a stale job"); + job_map.lock().await.remove(job_id); + } + } + } +} diff --git a/genealogos-api/src/main.rs b/genealogos-api/src/main.rs index 98c820c..9cc0e27 100644 --- a/genealogos-api/src/main.rs +++ b/genealogos-api/src/main.rs @@ -3,6 +3,7 @@ use std::sync::{atomic, Arc}; use genealogos::args::BomArg; use genealogos::backend::Backend; use genealogos::bom::Bom; +use jobs::job_map::{self, garbage_collector}; use rocket::http::Status; use rocket::response::{content, status}; use rocket::serde::json::Json; @@ -72,6 +73,9 @@ fn analyze(installable: &str, bom_format: Option) -> Result _ { + let job_map = Arc::new(Mutex::new(job_map::JobHashMap::new())); + + let job_map_clone = job_map.clone(); rocket::build() .attach(rocket::fairing::AdHoc::on_response("cors", |_req, resp| { Box::pin(async move { @@ -81,6 +85,22 @@ fn rocket() -> _ { )); }) })) + .attach(rocket::fairing::AdHoc::on_liftoff( + "garbage_collector", + |_| { + Box::pin(async move { + rocket::tokio::spawn(async move { + let interval = std::time::Duration::from_secs(10); + garbage_collector( + job_map_clone, + interval, + std::time::Duration::from_secs(5 * 60), + ) + .await; + }); + }) + }, + )) .mount("/", rocket::routes![index]) .mount("/api", rocket::routes![analyze]) .register("/api", rocket::catchers![handle_errors]) @@ -89,10 +109,7 @@ fn rocket() -> _ { rocket::routes![jobs::create, jobs::status, jobs::result], ) .register("/api/jobs/", rocket::catchers![handle_errors]) - .manage(Arc::new(Mutex::new(std::collections::HashMap::< - jobs::JobId, - jobs::JobStatus, - >::new()))) + .manage(job_map) .manage(atomic::AtomicU16::new(0)) }