From b5acc3687412c7c0d8e8dc5504a3d32fe1f27516 Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Fri, 9 Aug 2024 16:59:40 -0700 Subject: [PATCH 1/3] save work --- rust/entity/src/job_audit.rs | 18 +++ rust/entity/src/mod.rs | 1 + rust/entity/src/prelude.rs | 1 + rust/migration/src/lib.rs | 2 + .../m20240809_213440_add_job_audit_table.rs | 47 ++++++++ rust/rsc/src/bin/rsc/read_job.rs | 6 +- rust/rsc/src/database.rs | 107 ++++++------------ 7 files changed, 105 insertions(+), 77 deletions(-) create mode 100644 rust/entity/src/job_audit.rs create mode 100644 rust/migration/src/m20240809_213440_add_job_audit_table.rs diff --git a/rust/entity/src/job_audit.rs b/rust/entity/src/job_audit.rs new file mode 100644 index 000000000..e3b585c73 --- /dev/null +++ b/rust/entity/src/job_audit.rs @@ -0,0 +1,18 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "job_audit")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub hash: String, + pub event: String, + pub created_at: DateTime, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/rust/entity/src/mod.rs b/rust/entity/src/mod.rs index ceeeca9b1..61d74016b 100644 --- a/rust/entity/src/mod.rs +++ b/rust/entity/src/mod.rs @@ -6,6 +6,7 @@ pub mod api_key; pub mod blob; pub mod blob_store; pub mod job; +pub mod job_audit; pub mod job_history; pub mod job_use; pub mod local_blob_store; diff --git a/rust/entity/src/prelude.rs b/rust/entity/src/prelude.rs index 4f4df039d..2137b8817 100644 --- a/rust/entity/src/prelude.rs +++ b/rust/entity/src/prelude.rs @@ -4,6 +4,7 @@ pub use super::api_key::Entity as ApiKey; pub use super::blob::Entity as Blob; pub use super::blob_store::Entity as BlobStore; pub use super::job::Entity as Job; +pub use super::job_audit::Entity as JobAudit; pub use super::job_history::Entity as JobHistory; pub use super::job_use::Entity as JobUse; pub use super::local_blob_store::Entity as LocalBlobStore; diff --git a/rust/migration/src/lib.rs b/rust/migration/src/lib.rs index 4903c72cc..7d946ef9d 100644 --- a/rust/migration/src/lib.rs +++ b/rust/migration/src/lib.rs @@ -13,6 +13,7 @@ mod m20240522_185420_create_job_history; mod m20240731_152842_create_job_size_proc; mod m20240731_201632_create_job_blob_timestamp_index; mod m20240805_163520_create_blob_id_fk_indexes; +mod m20240809_213440_add_job_audit_table; mod m20240819_193352_add_output_indexes; mod m20240919_214610_add_hidden_to_output_dir; @@ -35,6 +36,7 @@ impl MigratorTrait for Migrator { Box::new(m20240731_152842_create_job_size_proc::Migration), Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration), Box::new(m20240805_163520_create_blob_id_fk_indexes::Migration), + Box::new(m20240809_213440_add_job_audit_table::Migration), Box::new(m20240819_193352_add_output_indexes::Migration), Box::new(m20240919_214610_add_hidden_to_output_dir::Migration), ] diff --git a/rust/migration/src/m20240809_213440_add_job_audit_table.rs b/rust/migration/src/m20240809_213440_add_job_audit_table.rs new file mode 100644 index 000000000..3270fe47c --- /dev/null +++ b/rust/migration/src/m20240809_213440_add_job_audit_table.rs @@ -0,0 +1,47 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(JobAudit::Table) + .col( + ColumnDef::new(JobAudit::Id) + .integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col(ColumnDef::new(JobAudit::Hash).string().not_null()) + .col(ColumnDef::new(JobAudit::Event).string().not_null()) + .col( + ColumnDef::new(JobAudit::CreatedAt) + .timestamp() + .not_null() + .default(SimpleExpr::Keyword(Keyword::CurrentTimestamp)), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(JobAudit::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum JobAudit { + Table, + Id, + Hash, + CreatedAt, + Event, +} diff --git a/rust/rsc/src/bin/rsc/read_job.rs b/rust/rsc/src/bin/rsc/read_job.rs index a4fb2344e..417f82a45 100644 --- a/rust/rsc/src/bin/rsc/read_job.rs +++ b/rust/rsc/src/bin/rsc/read_job.rs @@ -25,12 +25,12 @@ async fn record_hit(job_id: Uuid, hash: String, conn: Arc) { job_id: Set(job_id), }; let _ = usage.insert(conn.as_ref()).await; - let _ = database::upsert_job_hit(conn.as_ref(), hash).await; + let _ = database::record_job_hit(conn.as_ref(), hash).await; } #[tracing::instrument(skip(hash, conn))] async fn record_miss(hash: String, conn: Arc) { - let _ = database::upsert_job_miss(conn.as_ref(), hash).await; + let _ = database::record_job_miss(conn.as_ref(), hash).await; } #[tracing::instrument(skip(db, stores))] @@ -217,6 +217,8 @@ pub async fn allow_job( system_load: Arc>, min_runtime: f64, ) -> StatusCode { + // TODO: Add all the audit messages + // Reject a subset of jobs that are never worth caching if payload.runtime < min_runtime { return StatusCode::NOT_ACCEPTABLE; diff --git a/rust/rsc/src/database.rs b/rust/rsc/src/database.rs index 32db02268..618892e16 100644 --- a/rust/rsc/src/database.rs +++ b/rust/rsc/src/database.rs @@ -1,11 +1,12 @@ use chrono::NaiveDateTime; use data_encoding::BASE64; use entity::prelude::{ - ApiKey, Blob, BlobStore, Job, JobHistory, LocalBlobStore, OutputDir, OutputFile, OutputSymlink, + ApiKey, Blob, BlobStore, Job, JobAudit, JobHistory, LocalBlobStore, OutputDir, OutputFile, + OutputSymlink, }; use entity::{ - api_key, blob, blob_store, job, job_history, local_blob_store, output_dir, output_file, - output_symlink, + api_key, blob, blob_store, job, job_audit, job_history, local_blob_store, output_dir, + output_file, output_symlink, }; use itertools::Itertools; use migration::OnConflict; @@ -510,7 +511,7 @@ where tokio::spawn(async move { for hash in hashes { - let _ = upsert_job_evict(db.as_ref(), hash.hash).await; + let _ = record_job_evict(db.as_ref(), hash.hash).await; } }); @@ -705,90 +706,46 @@ pub async fn delete_unreferenced_blobs( } // -------------------------------------------------- -// ---------- JobHistory ---------- +// ---------- JobAudit ---------- // -------------------------------------------------- -pub async fn upsert_job_hit(db: &T, hash: String) -> Result<(), DbErr> { - let active_model = job_history::ActiveModel { - hash: Set(hash), - hits: Set(1), - misses: Set(0), - evictions: Set(0), - created_at: NotSet, - updated_at: NotSet, - }; +// ---------- Create ---------- +pub async fn record_job_hit(db: &T, hash: String) -> Result<(), DbErr> { + record_job_event(db, hash, "hit".to_string()).await +} - let _ = JobHistory::insert(active_model) - .on_conflict( - OnConflict::column(job_history::Column::Hash) - .update_column(job_history::Column::UpdatedAt) - .value( - job_history::Column::Hits, - Expr::col(job_history::Column::Hits.as_column_ref()).add(1), - ) - .to_owned(), - ) - .exec(db) - .await?; +pub async fn record_job_miss(db: &T, hash: String) -> Result<(), DbErr> { + record_job_event(db, hash, "miss".to_string()).await +} - Ok(()) +pub async fn record_job_evict(db: &T, hash: String) -> Result<(), DbErr> { + record_job_event(db, hash, "evict".to_string()).await } -pub async fn upsert_job_miss(db: &T, hash: String) -> Result<(), DbErr> { - let active_model = job_history::ActiveModel { - hash: Set(hash), - hits: Set(0), - misses: Set(1), - evictions: Set(0), - created_at: NotSet, - updated_at: NotSet, - }; +pub async fn record_job_denied(db: &T, hash: String) -> Result<(), DbErr> { + record_job_event(db, hash, "denied".to_string()).await +} - let _ = JobHistory::insert(active_model) - .on_conflict( - OnConflict::column(job_history::Column::Hash) - .update_column(job_history::Column::UpdatedAt) - .value( - job_history::Column::Misses, - Expr::col(job_history::Column::Misses.as_column_ref()).add(1), - ) - .to_owned(), - ) - .exec(db) - .await?; +pub async fn record_job_conflict(db: &T, hash: String) -> Result<(), DbErr> { + record_job_event(db, hash, "conflict".to_string()).await +} - Ok(()) +pub async fn record_job_shed(db: &T, hash: String) -> Result<(), DbErr> { + record_job_event(db, hash, "shed".to_string()).await } -pub async fn upsert_job_evict(db: &T, hash: String) -> Result<(), DbErr> { - let active_model = job_history::ActiveModel { +pub async fn record_job_event( + db: &T, + hash: String, + event: String, +) -> Result<(), DbErr> { + let active_model = job_audit::ActiveModel { + id: NotSet, hash: Set(hash), - hits: Set(0), - misses: Set(0), - evictions: Set(1), created_at: NotSet, - updated_at: NotSet, + event: Set(event), }; - let _ = JobHistory::insert(active_model) - .on_conflict( - OnConflict::column(job_history::Column::Hash) - .update_column(job_history::Column::UpdatedAt) - .value( - job_history::Column::Evictions, - Expr::col(job_history::Column::Evictions.as_column_ref()).add(1), - ) - .to_owned(), - ) - .exec(db) - .await?; + let _ = JobAudit::insert(active_model).exec(db).await?; Ok(()) } - -// ---------- Create ---------- - -// ---------- Read ---------- - -// ---------- Update ---------- - -// ---------- Delete ---------- From 755e8e50743b5dcd229b47a2d87e564d8bbab281 Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Tue, 15 Oct 2024 12:52:17 -0700 Subject: [PATCH 2/3] add missing event triggers --- rust/rsc/.config.json | 2 +- rust/rsc/src/bin/rsc/read_job.rs | 14 ++++++++++++-- rust/rsc/src/bin/rsc/types.rs | 2 +- rust/rsc/src/database.rs | 9 ++++----- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/rust/rsc/.config.json b/rust/rsc/.config.json index effaf3aba..45b79cbb2 100644 --- a/rust/rsc/.config.json +++ b/rust/rsc/.config.json @@ -4,7 +4,7 @@ "connection_pool_max_connect": 90, "connection_pool_timeout": 60, "standalone": false, - "active_store": "5eed6ba4-d65f-46ca-b4a6-eff98b00857d", + "active_store": "ac9b4685-f36e-4726-9073-c992597ec134", "log_directory": null, "blob_eviction": { "tick_rate": 60, diff --git a/rust/rsc/src/bin/rsc/read_job.rs b/rust/rsc/src/bin/rsc/read_job.rs index 417f82a45..f2ce8691e 100644 --- a/rust/rsc/src/bin/rsc/read_job.rs +++ b/rust/rsc/src/bin/rsc/read_job.rs @@ -217,10 +217,14 @@ pub async fn allow_job( system_load: Arc>, min_runtime: f64, ) -> StatusCode { - // TODO: Add all the audit messages + let hash = payload.hash(); // Reject a subset of jobs that are never worth caching if payload.runtime < min_runtime { + let denied_hash = hash.clone(); + tokio::spawn(async move { + let _ = database::record_job_denied(conn.as_ref(), denied_hash).await; + }); return StatusCode::NOT_ACCEPTABLE; } @@ -247,11 +251,14 @@ pub async fn allow_job( } if thread_rng().gen_bool(shed_chance) { + let shed_hash = hash.clone(); + tokio::spawn(async move { + let _ = database::record_job_shed(conn.as_ref(), shed_hash).await; + }); return StatusCode::TOO_MANY_REQUESTS; } // Reject jobs that are already cached - let hash = payload.hash(); match job::Entity::find() .filter(job::Column::Hash.eq(hash.clone())) .one(conn.as_ref()) @@ -265,6 +272,9 @@ pub async fn allow_job( // Job is cached, don't try again Ok(Some(_)) => { tracing::warn!(%hash, "Rejecting job push for already cached job"); + tokio::spawn(async move { + let _ = database::record_job_conflict(conn.as_ref(), hash).await; + }); StatusCode::CONFLICT } // Job is not cached, use the other deciding factors diff --git a/rust/rsc/src/bin/rsc/types.rs b/rust/rsc/src/bin/rsc/types.rs index 08630f6f2..f719db73e 100644 --- a/rust/rsc/src/bin/rsc/types.rs +++ b/rust/rsc/src/bin/rsc/types.rs @@ -59,7 +59,7 @@ pub struct Dir { pub path: String, pub mode: i32, // Optional member to allow for soft migration - pub hidden: Option + pub hidden: Option, } #[derive(Debug, Deserialize, Serialize)] diff --git a/rust/rsc/src/database.rs b/rust/rsc/src/database.rs index 618892e16..9637a4560 100644 --- a/rust/rsc/src/database.rs +++ b/rust/rsc/src/database.rs @@ -1,18 +1,17 @@ use chrono::NaiveDateTime; use data_encoding::BASE64; use entity::prelude::{ - ApiKey, Blob, BlobStore, Job, JobAudit, JobHistory, LocalBlobStore, OutputDir, OutputFile, - OutputSymlink, + ApiKey, Blob, BlobStore, Job, JobAudit, LocalBlobStore, OutputDir, OutputFile, OutputSymlink, }; use entity::{ - api_key, blob, blob_store, job, job_audit, job_history, local_blob_store, output_dir, - output_file, output_symlink, + api_key, blob, blob_store, job, job_audit, local_blob_store, output_dir, output_file, + output_symlink, }; use itertools::Itertools; use migration::OnConflict; use rand::{thread_rng, RngCore}; use sea_orm::{ - prelude::{DateTime, Expr, Uuid}, + prelude::{DateTime, Uuid}, ActiveModelTrait, ActiveValue::*, ColumnTrait, ConnectionTrait, DbBackend, DbErr, DeleteResult, EntityTrait, PaginatorTrait, From f1dcaab14d19f2745d287585d6d1768f146c44b1 Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Tue, 15 Oct 2024 15:00:07 -0700 Subject: [PATCH 3/3] restore job_history tracking --- rust/rsc/src/database.rs | 87 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 80 insertions(+), 7 deletions(-) diff --git a/rust/rsc/src/database.rs b/rust/rsc/src/database.rs index 9637a4560..9497c4043 100644 --- a/rust/rsc/src/database.rs +++ b/rust/rsc/src/database.rs @@ -1,17 +1,18 @@ use chrono::NaiveDateTime; use data_encoding::BASE64; use entity::prelude::{ - ApiKey, Blob, BlobStore, Job, JobAudit, LocalBlobStore, OutputDir, OutputFile, OutputSymlink, + ApiKey, Blob, BlobStore, Job, JobAudit, JobHistory, LocalBlobStore, OutputDir, OutputFile, + OutputSymlink, }; use entity::{ - api_key, blob, blob_store, job, job_audit, local_blob_store, output_dir, output_file, - output_symlink, + api_key, blob, blob_store, job, job_audit, job_history, local_blob_store, output_dir, + output_file, output_symlink, }; use itertools::Itertools; use migration::OnConflict; use rand::{thread_rng, RngCore}; use sea_orm::{ - prelude::{DateTime, Uuid}, + prelude::{DateTime, Expr, Uuid}, ActiveModelTrait, ActiveValue::*, ColumnTrait, ConnectionTrait, DbBackend, DbErr, DeleteResult, EntityTrait, PaginatorTrait, @@ -709,15 +710,87 @@ pub async fn delete_unreferenced_blobs( // -------------------------------------------------- // ---------- Create ---------- pub async fn record_job_hit(db: &T, hash: String) -> Result<(), DbErr> { - record_job_event(db, hash, "hit".to_string()).await + record_job_event(db, hash.clone(), "hit".to_string()).await?; + + let active_model = job_history::ActiveModel { + hash: Set(hash), + hits: Set(1), + misses: Set(0), + evictions: Set(0), + created_at: NotSet, + updated_at: NotSet, + }; + + let _ = JobHistory::insert(active_model) + .on_conflict( + OnConflict::column(job_history::Column::Hash) + .update_column(job_history::Column::UpdatedAt) + .value( + job_history::Column::Hits, + Expr::col(job_history::Column::Hits.as_column_ref()).add(1), + ) + .to_owned(), + ) + .exec(db) + .await?; + + Ok(()) } pub async fn record_job_miss(db: &T, hash: String) -> Result<(), DbErr> { - record_job_event(db, hash, "miss".to_string()).await + record_job_event(db, hash.clone(), "miss".to_string()).await?; + + let active_model = job_history::ActiveModel { + hash: Set(hash), + hits: Set(0), + misses: Set(1), + evictions: Set(0), + created_at: NotSet, + updated_at: NotSet, + }; + + let _ = JobHistory::insert(active_model) + .on_conflict( + OnConflict::column(job_history::Column::Hash) + .update_column(job_history::Column::UpdatedAt) + .value( + job_history::Column::Misses, + Expr::col(job_history::Column::Misses.as_column_ref()).add(1), + ) + .to_owned(), + ) + .exec(db) + .await?; + + Ok(()) } pub async fn record_job_evict(db: &T, hash: String) -> Result<(), DbErr> { - record_job_event(db, hash, "evict".to_string()).await + record_job_event(db, hash.clone(), "evict".to_string()).await?; + + let active_model = job_history::ActiveModel { + hash: Set(hash), + hits: Set(0), + misses: Set(0), + evictions: Set(1), + created_at: NotSet, + updated_at: NotSet, + }; + + let _ = JobHistory::insert(active_model) + .on_conflict( + OnConflict::column(job_history::Column::Hash) + .update_column(job_history::Column::UpdatedAt) + .value( + job_history::Column::Evictions, + Expr::col(job_history::Column::Evictions.as_column_ref()).add(1), + ) + .to_owned(), + ) + .exec(db) + .await?; + + Ok(()) } pub async fn record_job_denied(db: &T, hash: String) -> Result<(), DbErr> {