Skip to content

Commit 5030349

Browse files
committed
Address feedback
1 parent d5cf478 commit 5030349

File tree

4 files changed

+50
-47
lines changed

4 files changed

+50
-47
lines changed

crates/corrosion-tests/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ pub fn query_to_string(
364364
String::from_utf8(out).unwrap()
365365
}
366366

367-
pub async fn assert_sub<T>(
367+
pub async fn assert_sub_event_eq<T>(
368368
rx: &mut tokio::sync::mpsc::Receiver<pubsub::SubscriptionEvent>,
369369
expected: &pubsub::TypedQueryEvent<T>,
370370
) -> Option<T>

crates/corrosion-tests/tests/subscription.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -352,12 +352,12 @@ async fn single_sub() {
352352
let mut cur_cid = 0;
353353

354354
// We always get a state of the world first, in our case the DB is empty
355-
corrosion_tests::assert_sub::<ServerRow>(
355+
corrosion_tests::assert_sub_event_eq::<ServerRow>(
356356
&mut rx,
357357
&TypedQueryEvent::Columns(vec!["endpoint".into(), "icao".into(), "tokens".into()]),
358358
)
359359
.await;
360-
corrosion_tests::assert_sub::<ServerRow>(
360+
corrosion_tests::assert_sub_event_eq::<ServerRow>(
361361
&mut rx,
362362
&TypedQueryEvent::EndOfQuery {
363363
time: 0.,
@@ -395,7 +395,7 @@ async fn single_sub() {
395395
pool.broadcast_changes(&mut states).await;
396396
cur_cid += 1;
397397

398-
corrosion_tests::assert_sub(
398+
corrosion_tests::assert_sub_event_eq(
399399
&mut rx,
400400
&TypedQueryEvent::Change(
401401
ChangeType::Insert,
@@ -423,7 +423,7 @@ async fn single_sub() {
423423
pool.broadcast_changes(&mut states).await;
424424
cur_cid += 1;
425425

426-
corrosion_tests::assert_sub(
426+
corrosion_tests::assert_sub_event_eq(
427427
&mut rx,
428428
&TypedQueryEvent::Change(
429429
ChangeType::Update,
@@ -451,7 +451,7 @@ async fn single_sub() {
451451
pool.broadcast_changes(&mut states).await;
452452
cur_cid += 1;
453453

454-
corrosion_tests::assert_sub(
454+
corrosion_tests::assert_sub_event_eq(
455455
&mut rx,
456456
&TypedQueryEvent::Change(
457457
ChangeType::Delete,
@@ -521,10 +521,10 @@ async fn multiple_subs() {
521521
let mut cur_cid = 0;
522522

523523
let mut assert_all = async |tqe| {
524-
corrosion_tests::assert_sub::<ServerRow>(&mut orx, &tqe).await;
524+
corrosion_tests::assert_sub_event_eq::<ServerRow>(&mut orx, &tqe).await;
525525

526526
for sub in &mut multi_subs {
527-
corrosion_tests::assert_sub::<ServerRow>(sub, &tqe).await;
527+
corrosion_tests::assert_sub_event_eq::<ServerRow>(sub, &tqe).await;
528528
}
529529
};
530530

crates/corrosion/src/db.rs

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,56 +16,58 @@ pub struct InitializedDb {
1616
pub actor_id: ActorId,
1717
}
1818

19-
pub async fn setup(db_path: &crate::Path, schema: &str) -> eyre::Result<InitializedDb> {
20-
let partial_schema = corro_types::schema::parse_sql(schema)?;
21-
22-
let actor_id = {
23-
// we need to set auto_vacuum before any tables are created
24-
let db_conn = rusqlite::Connection::open(db_path)?;
25-
db_conn.execute_batch("PRAGMA auto_vacuum = INCREMENTAL")?;
26-
27-
let conn = CrConn::init(db_conn)?;
28-
conn.query_row("SELECT crsql_site_id();", [], |row| {
29-
row.get::<_, ActorId>(0)
30-
})?
31-
};
19+
impl InitializedDb {
20+
pub async fn setup(db_path: &crate::Path, schema: &str) -> eyre::Result<Self> {
21+
let partial_schema = corro_types::schema::parse_sql(schema)?;
22+
23+
let actor_id = {
24+
// we need to set auto_vacuum before any tables are created
25+
let db_conn = rusqlite::Connection::open(db_path)?;
26+
db_conn.execute_batch("PRAGMA auto_vacuum = INCREMENTAL")?;
27+
28+
let conn = CrConn::init(db_conn)?;
29+
conn.query_row("SELECT crsql_site_id();", [], |row| {
30+
row.get::<_, ActorId>(0)
31+
})?
32+
};
3233

33-
let write_sema = Arc::new(tokio::sync::Semaphore::new(1));
34-
let pool = SplitPool::create(&db_path, write_sema.clone()).await?;
34+
let write_sema = Arc::new(tokio::sync::Semaphore::new(1));
35+
let pool = SplitPool::create(&db_path, write_sema.clone()).await?;
3536

36-
let clock = Arc::new(
37-
uhlc::HLCBuilder::default()
38-
.with_id(actor_id.try_into().unwrap())
39-
.with_max_delta(std::time::Duration::from_millis(300))
40-
.build(),
41-
);
37+
let clock = Arc::new(
38+
uhlc::HLCBuilder::default()
39+
.with_id(actor_id.try_into().unwrap())
40+
.with_max_delta(std::time::Duration::from_millis(300))
41+
.build(),
42+
);
4243

43-
let schema = {
44-
let mut conn = pool.write_priority().await?;
44+
let schema = {
45+
let mut conn = pool.write_priority().await?;
4546

46-
let old_schema = {
47-
corro_types::agent::migrate(clock.clone(), &mut conn)?;
48-
let mut schema = corro_types::schema::init_schema(&conn)?;
49-
schema.constrain()?;
47+
let old_schema = {
48+
corro_types::agent::migrate(clock.clone(), &mut conn)?;
49+
let mut schema = corro_types::schema::init_schema(&conn)?;
50+
schema.constrain()?;
5051

51-
schema
52-
};
52+
schema
53+
};
5354

54-
tokio::task::block_in_place(|| update_schema(old_schema, partial_schema, &mut conn))?
55-
};
55+
tokio::task::block_in_place(|| update_schema(&mut conn, old_schema, partial_schema))?
56+
};
5657

57-
Ok(InitializedDb {
58-
pool,
59-
clock,
60-
schema: Arc::new(schema),
61-
actor_id,
62-
})
58+
Ok(Self {
59+
pool,
60+
clock,
61+
schema: Arc::new(schema),
62+
actor_id,
63+
})
64+
}
6365
}
6466

6567
pub fn update_schema(
68+
conn: &mut WriteConn,
6669
old_schema: Schema,
6770
new_schema: Schema,
68-
conn: &mut WriteConn,
6971
) -> eyre::Result<Schema> {
7072
// clone the previous schema and apply
7173
let mut new_schema = {

crates/corrosion/src/persistent/executor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ use std::{
1717

1818
pub type BroadcastTx = tokio::sync::mpsc::Sender<broadcast::BroadcastInput>;
1919

20-
///
20+
/// A DB mutator that will broadcast changes to any subscribers when a mutation
21+
/// occurs that matches a subscriber's query
2122
#[derive(Clone)]
2223
pub struct BroadcastingTransactor {
2324
pool: SplitPool,

0 commit comments

Comments
 (0)