Skip to content

Commit 21fd70c

Browse files
committed
Apply feedback
1 parent 8487b8f commit 21fd70c

File tree

6 files changed

+23
-21
lines changed

6 files changed

+23
-21
lines changed

crates/corro-agent/src/api/peer/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ fn send_change_chunks<I: Iterator<Item = rusqlite::Result<Change>>>(
751751
Some(Ok((changes, seqs))) => {
752752
let start = Instant::now();
753753

754-
if changes.is_empty() && *seqs.start() == CrsqlSeq(0) && *seqs.end() == last_seq {
754+
if changes.is_empty() && seqs.start() == CrsqlSeq(0) && seqs.end() == last_seq {
755755
warn!(%actor_id, %version, "got an empty changes we should've had");
756756
return Ok(());
757757
} else {
@@ -760,7 +760,7 @@ fn send_change_chunks<I: Iterator<Item = rusqlite::Result<Change>>>(
760760
changeset: Changeset::Full {
761761
version,
762762
changes,
763-
seqs: seqs.into(),
763+
seqs,
764764
last_seq,
765765
ts,
766766
},

crates/corro-types/src/agent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1140,7 +1140,7 @@ impl VersionsSnapshot {
11401140
}
11411141
let details = json!({"count": count, "range": range});
11421142
assert_always!(count == 1, "ineffective deletion of gaps in-db", &details);
1143-
for version in CrsqlDbVersionRange::from(range.clone()) {
1143+
for version in CrsqlDbVersionRange::from(&range) {
11441144
self.partials.remove(&version);
11451145
}
11461146
self.needed.remove(range);

crates/corro-types/src/broadcast.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl Changeset {
170170
// todo: this returns dummy version because empty set has an array of versions.
171171
// probably shouldn't be doing this
172172
Changeset::EmptySet { .. } => CrsqlDbVersionRange::empty(),
173-
Changeset::Full { version, .. } => (*version..=*version).into(),
173+
Changeset::Full { version, .. } => CrsqlDbVersionRange::single(*version),
174174
}
175175
}
176176

@@ -549,7 +549,7 @@ pub async fn broadcast_changes(
549549
changeset: Changeset::Full {
550550
version: db_version,
551551
changes,
552-
seqs: seqs.into(),
552+
seqs,
553553
last_seq,
554554
ts,
555555
},

crates/corro-types/src/change.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use std::{iter::Peekable, ops::RangeInclusive};
1+
use std::iter::Peekable;
22

33
use antithesis_sdk::assert_always;
44
pub use corro_api_types::SqliteValue;
55
use corro_api_types::{ColumnName, TableName};
6-
use corro_base_types::CrsqlDbVersion;
6+
use corro_base_types::{CrsqlDbVersion, CrsqlSeqRange};
77
use rusqlite::{Connection, Row};
88
use serde::{Deserialize, Serialize};
99
use serde_json::json;
@@ -104,7 +104,7 @@ impl<I> Iterator for ChunkedChanges<I>
104104
where
105105
I: Iterator<Item = rusqlite::Result<Change>>,
106106
{
107-
type Item = Result<(Vec<Change>, RangeInclusive<CrsqlSeq>), rusqlite::Error>;
107+
type Item = Result<(Vec<Change>, CrsqlSeqRange), rusqlite::Error>;
108108

109109
fn next(&mut self) -> Option<Self::Item> {
110110
// previously marked as done because the Rows iterator returned None
@@ -153,7 +153,7 @@ where
153153

154154
return Some(Ok((
155155
self.changes.drain(..).collect(),
156-
start_seq..=self.last_pushed_seq,
156+
CrsqlSeqRange::new(start_seq, self.last_pushed_seq),
157157
)));
158158
}
159159
}
@@ -171,8 +171,8 @@ where
171171

172172
// return buffered changes
173173
Some(Ok((
174-
self.changes.clone(), // no need to drain here like before
175-
self.last_start_seq..=self.last_seq, // even if empty, this is all we have still applied
174+
self.changes.clone(), // no need to drain here like before
175+
CrsqlSeqRange::new(self.last_start_seq, self.last_seq), // even if empty, this is all we have still applied
176176
)))
177177
}
178178
}
@@ -262,14 +262,14 @@ pub fn insert_local_changes(
262262
#[cfg(test)]
263263
mod tests {
264264
use super::*;
265-
use crate::base::dbsri;
265+
use crate::base::dbsr;
266266

267267
#[test]
268268
fn test_change_chunker() {
269269
// empty interator
270270
let mut chunker = ChunkedChanges::new(vec![].into_iter(), CrsqlSeq(0), CrsqlSeq(100), 50);
271271

272-
assert_eq!(chunker.next(), Some(Ok((vec![], dbsri!(0, 100)))));
272+
assert_eq!(chunker.next(), Some(Ok((vec![], dbsr!(0, 100)))));
273273
assert_eq!(chunker.next(), None);
274274

275275
let changes: Vec<Change> = (0..100)
@@ -296,12 +296,12 @@ mod tests {
296296
chunker.next(),
297297
Some(Ok((
298298
vec![changes[0].clone(), changes[1].clone()],
299-
dbsri!(0, 1)
299+
dbsr!(0, 1)
300300
)))
301301
);
302302
assert_eq!(
303303
chunker.next(),
304-
Some(Ok((vec![changes[2].clone()], dbsri!(2, 100))))
304+
Some(Ok((vec![changes[2].clone()], dbsr!(2, 100))))
305305
);
306306
assert_eq!(chunker.next(), None);
307307

@@ -314,7 +314,7 @@ mod tests {
314314

315315
assert_eq!(
316316
chunker.next(),
317-
Some(Ok((vec![changes[0].clone()], dbsri!(0, 0))))
317+
Some(Ok((vec![changes[0].clone()], dbsr!(0, 0))))
318318
);
319319
assert_eq!(chunker.next(), None);
320320

@@ -330,7 +330,7 @@ mod tests {
330330
chunker.next(),
331331
Some(Ok((
332332
vec![changes[0].clone(), changes[2].clone()],
333-
dbsri!(0, 100)
333+
dbsr!(0, 100)
334334
)))
335335
);
336336

@@ -359,7 +359,7 @@ mod tests {
359359
changes[7].clone(),
360360
changes[8].clone()
361361
],
362-
dbsri!(0, 100)
362+
dbsr!(0, 100)
363363
)))
364364
);
365365

@@ -383,15 +383,15 @@ mod tests {
383383
chunker.next(),
384384
Some(Ok((
385385
vec![changes[2].clone(), changes[4].clone(),],
386-
dbsri!(0, 4)
386+
dbsr!(0, 4)
387387
)))
388388
);
389389

390390
assert_eq!(
391391
chunker.next(),
392392
Some(Ok((
393393
vec![changes[7].clone(), changes[8].clone(),],
394-
dbsri!(5, 10)
394+
dbsr!(5, 10)
395395
)))
396396
);
397397

crates/corro-types/src/sync.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ impl SyncStateV1 {
167167
let start = cmp::max(range.start(), overlap.start());
168168
let end = cmp::min(range.end(), overlap.end());
169169
needs.entry(*actor_id).or_default().push(SyncNeedV1::Full {
170-
versions: (*start..=*end).into(),
170+
versions: CrsqlDbVersionRange::new(*start, *end),
171171
})
172172
}
173173
}

rust-toolchain.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[toolchain]
2+
channel = "stable"

0 commit comments

Comments
 (0)