Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rsc: Track job events in job audit table #1664

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rust/entity/src/job_audit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "job_audit")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub hash: String,
pub event: String,
pub created_at: DateTime,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
1 change: 1 addition & 0 deletions rust/entity/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod api_key;
pub mod blob;
pub mod blob_store;
pub mod job;
pub mod job_audit;
pub mod job_history;
pub mod job_use;
pub mod local_blob_store;
Expand Down
1 change: 1 addition & 0 deletions rust/entity/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub use super::api_key::Entity as ApiKey;
pub use super::blob::Entity as Blob;
pub use super::blob_store::Entity as BlobStore;
pub use super::job::Entity as Job;
pub use super::job_audit::Entity as JobAudit;
pub use super::job_history::Entity as JobHistory;
pub use super::job_use::Entity as JobUse;
pub use super::local_blob_store::Entity as LocalBlobStore;
Expand Down
2 changes: 2 additions & 0 deletions rust/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod m20240522_185420_create_job_history;
mod m20240731_152842_create_job_size_proc;
mod m20240731_201632_create_job_blob_timestamp_index;
mod m20240805_163520_create_blob_id_fk_indexes;
mod m20240809_213440_add_job_audit_table;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update this timestamp so that is lands at the end now? Or is that totally irrelevant?

mod m20240819_193352_add_output_indexes;
mod m20240919_214610_add_hidden_to_output_dir;

Expand All @@ -35,6 +36,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240731_152842_create_job_size_proc::Migration),
Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration),
Box::new(m20240805_163520_create_blob_id_fk_indexes::Migration),
Box::new(m20240809_213440_add_job_audit_table::Migration),
Box::new(m20240819_193352_add_output_indexes::Migration),
Box::new(m20240919_214610_add_hidden_to_output_dir::Migration),
]
Expand Down
47 changes: 47 additions & 0 deletions rust/migration/src/m20240809_213440_add_job_audit_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(JobAudit::Table)
.col(
ColumnDef::new(JobAudit::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(JobAudit::Hash).string().not_null())
.col(ColumnDef::new(JobAudit::Event).string().not_null())
.col(
ColumnDef::new(JobAudit::CreatedAt)
.timestamp()
.not_null()
.default(SimpleExpr::Keyword(Keyword::CurrentTimestamp)),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(JobAudit::Table).to_owned())
.await
}
}

#[derive(DeriveIden)]
enum JobAudit {
Table,
Id,
Hash,
CreatedAt,
Event,
}
2 changes: 1 addition & 1 deletion rust/rsc/.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"connection_pool_max_connect": 90,
"connection_pool_timeout": 60,
"standalone": false,
"active_store": "5eed6ba4-d65f-46ca-b4a6-eff98b00857d",
"active_store": "ac9b4685-f36e-4726-9073-c992597ec134",
"log_directory": null,
"blob_eviction": {
"tick_rate": 60,
Expand Down
18 changes: 15 additions & 3 deletions rust/rsc/src/bin/rsc/read_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ async fn record_hit(job_id: Uuid, hash: String, conn: Arc<DatabaseConnection>) {
job_id: Set(job_id),
};
let _ = usage.insert(conn.as_ref()).await;
let _ = database::upsert_job_hit(conn.as_ref(), hash).await;
let _ = database::record_job_hit(conn.as_ref(), hash).await;
}

#[tracing::instrument(skip(hash, conn))]
async fn record_miss(hash: String, conn: Arc<DatabaseConnection>) {
let _ = database::upsert_job_miss(conn.as_ref(), hash).await;
let _ = database::record_job_miss(conn.as_ref(), hash).await;
}

#[tracing::instrument(skip(db, stores))]
Expand Down Expand Up @@ -217,8 +217,14 @@ pub async fn allow_job(
system_load: Arc<RwLock<f64>>,
min_runtime: f64,
) -> StatusCode {
let hash = payload.hash();

// Reject a subset of jobs that are never worth caching
if payload.runtime < min_runtime {
let denied_hash = hash.clone();
tokio::spawn(async move {
let _ = database::record_job_denied(conn.as_ref(), denied_hash).await;
});
return StatusCode::NOT_ACCEPTABLE;
}

Expand All @@ -245,11 +251,14 @@ pub async fn allow_job(
}

if thread_rng().gen_bool(shed_chance) {
let shed_hash = hash.clone();
tokio::spawn(async move {
let _ = database::record_job_shed(conn.as_ref(), shed_hash).await;
});
return StatusCode::TOO_MANY_REQUESTS;
}

// Reject jobs that are already cached
let hash = payload.hash();
match job::Entity::find()
.filter(job::Column::Hash.eq(hash.clone()))
.one(conn.as_ref())
Expand All @@ -263,6 +272,9 @@ pub async fn allow_job(
// Job is cached, don't try again
Ok(Some(_)) => {
tracing::warn!(%hash, "Rejecting job push for already cached job");
tokio::spawn(async move {
let _ = database::record_job_conflict(conn.as_ref(), hash).await;
});
StatusCode::CONFLICT
}
// Job is not cached, use the other deciding factors
Expand Down
2 changes: 1 addition & 1 deletion rust/rsc/src/bin/rsc/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct Dir {
pub path: String,
pub mode: i32,
// Optional member to allow for soft migration
pub hidden: Option<bool>
pub hidden: Option<bool>,
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
53 changes: 41 additions & 12 deletions rust/rsc/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use chrono::NaiveDateTime;
use data_encoding::BASE64;
use entity::prelude::{
ApiKey, Blob, BlobStore, Job, JobHistory, LocalBlobStore, OutputDir, OutputFile, OutputSymlink,
ApiKey, Blob, BlobStore, Job, JobAudit, JobHistory, LocalBlobStore, OutputDir, OutputFile,
OutputSymlink,
};
use entity::{
api_key, blob, blob_store, job, job_history, local_blob_store, output_dir, output_file,
output_symlink,
api_key, blob, blob_store, job, job_audit, job_history, local_blob_store, output_dir,
output_file, output_symlink,
};
use itertools::Itertools;
use migration::OnConflict;
Expand Down Expand Up @@ -510,7 +511,7 @@ where

tokio::spawn(async move {
for hash in hashes {
let _ = upsert_job_evict(db.as_ref(), hash.hash).await;
let _ = record_job_evict(db.as_ref(), hash.hash).await;
}
});

Expand Down Expand Up @@ -705,9 +706,12 @@ pub async fn delete_unreferenced_blobs<T: ConnectionTrait>(
}

// --------------------------------------------------
// ---------- JobHistory ----------
// ---------- JobAudit ----------
// --------------------------------------------------
pub async fn upsert_job_hit<T: ConnectionTrait>(db: &T, hash: String) -> Result<(), DbErr> {
// ---------- Create ----------
pub async fn record_job_hit<T: ConnectionTrait>(db: &T, hash: String) -> Result<(), DbErr> {
record_job_event(db, hash.clone(), "hit".to_string()).await?;

let active_model = job_history::ActiveModel {
hash: Set(hash),
hits: Set(1),
Expand All @@ -733,7 +737,9 @@ pub async fn upsert_job_hit<T: ConnectionTrait>(db: &T, hash: String) -> Result<
Ok(())
}

pub async fn upsert_job_miss<T: ConnectionTrait>(db: &T, hash: String) -> Result<(), DbErr> {
pub async fn record_job_miss<T: ConnectionTrait>(db: &T, hash: String) -> Result<(), DbErr> {
record_job_event(db, hash.clone(), "miss".to_string()).await?;

let active_model = job_history::ActiveModel {
hash: Set(hash),
hits: Set(0),
Expand All @@ -759,7 +765,9 @@ pub async fn upsert_job_miss<T: ConnectionTrait>(db: &T, hash: String) -> Result
Ok(())
}

pub async fn upsert_job_evict<T: ConnectionTrait>(db: &T, hash: String) -> Result<(), DbErr> {
pub async fn record_job_evict<T: ConnectionTrait>(db: &T, hash: String) -> Result<(), DbErr> {
record_job_event(db, hash.clone(), "evict".to_string()).await?;

let active_model = job_history::ActiveModel {
hash: Set(hash),
hits: Set(0),
Expand All @@ -785,10 +793,31 @@ pub async fn upsert_job_evict<T: ConnectionTrait>(db: &T, hash: String) -> Resul
Ok(())
}

// ---------- Create ----------
pub async fn record_job_denied<T: ConnectionTrait>(db: &T, hash: String) -> Result<(), DbErr> {
record_job_event(db, hash, "denied".to_string()).await
}

// ---------- Read ----------
pub async fn record_job_conflict<T: ConnectionTrait>(db: &T, hash: String) -> Result<(), DbErr> {
record_job_event(db, hash, "conflict".to_string()).await
}

// ---------- Update ----------
pub async fn record_job_shed<T: ConnectionTrait>(db: &T, hash: String) -> Result<(), DbErr> {
record_job_event(db, hash, "shed".to_string()).await
}

// ---------- Delete ----------
pub async fn record_job_event<T: ConnectionTrait>(
db: &T,
hash: String,
event: String,
) -> Result<(), DbErr> {
let active_model = job_audit::ActiveModel {
id: NotSet,
hash: Set(hash),
created_at: NotSet,
event: Set(event),
};

let _ = JobAudit::insert(active_model).exec(db).await?;

Ok(())
}
Loading