Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add firebase_sync bin to periodically sync events with aw-firebase-leaderboard #488

Closed
wants to merge 21 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
449 changes: 416 additions & 33 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -7,5 +7,6 @@ members = [
"aw-server",
"aw-sync",
"aw-query",
"aw-firebase-sync",
]
resolver = "2"
6 changes: 6 additions & 0 deletions aw-client-rust/src/blocking.rs
Original file line number Diff line number Diff line change
@@ -62,6 +62,12 @@ impl AwClient {
stop: Option<DateTime<Utc>>,
limit: Option<u64>
);
proxy_method!(
query,
Vec<serde_json::Value>,
query: &str,
timeperiods: Vec<(DateTime<Utc>, DateTime<Utc>)>
);
proxy_method!(insert_event, (), bucketname: &str, event: &Event);
proxy_method!(insert_events, (), bucketname: &str, events: Vec<Event>);
proxy_method!(
28 changes: 27 additions & 1 deletion aw-client-rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use std::vec::Vec;
use std::{collections::HashMap, error::Error};

use chrono::{DateTime, Utc};
use serde_json::Map;
use serde_json::{json, Map};

pub use aw_models::{Bucket, BucketMetadata, Event};

@@ -98,6 +98,32 @@ impl AwClient {
Ok(())
}

pub async fn query(
&self,
query: &str,
timeperiods: Vec<(DateTime<Utc>, DateTime<Utc>)>,
) -> Result<Vec<serde_json::Value>, reqwest::Error> {
let url = reqwest::Url::parse(format!("{}/api/0/query", self.baseurl).as_str()).unwrap();

// Format timeperiods as ISO8601 strings, separated by /
let timeperiods_str: Vec<String> = timeperiods
.iter()
.map(|(start, stop)| (start.to_rfc3339(), stop.to_rfc3339()))
.map(|(start, stop)| format!("{}/{}", start, stop))
.collect();
// Result is a sequence, one element per timeperiod
self.client
.post(url)
.json(&json!({
"query": query.split('\n').collect::<Vec<&str>>(),
"timeperiods": timeperiods_str,
}))
.send()
.await?
.json()
.await
}

pub async fn get_events(
&self,
bucketname: &str,
28 changes: 21 additions & 7 deletions aw-client-rust/tests/test.rs
Original file line number Diff line number Diff line change
@@ -86,19 +86,17 @@ mod test {
println!("Buckets: {buckets:?}");
let mut event = Event {
id: None,
timestamp: DateTime::from_utc(
timestamp: DateTime::from_naive_utc_and_offset(
DateTime::parse_from_rfc3339("2017-12-30T01:00:00+00:00")
.unwrap()
.naive_utc(),
Utc,
),
duration: Duration::seconds(0),
.naive_utc(), Utc),
duration: Duration::try_seconds(0).unwrap(),
data: Map::new(),
};
println!("{event:?}");
client.insert_event(&bucketname, &event).unwrap();
// Ugly way to create a UTC from timestamp, see https://github.com/chronotope/chrono/issues/263
event.timestamp = DateTime::from_utc(
event.timestamp = DateTime::from_naive_utc_and_offset(
DateTime::parse_from_rfc3339("2017-12-30T01:00:01+00:00")
.unwrap()
.naive_utc(),
@@ -108,7 +106,23 @@ mod test {

let events = client.get_events(&bucketname, None, None, None).unwrap();
println!("Events: {events:?}");
assert!(events[0].duration == Duration::seconds(1));
assert!(events[0].duration == Duration::try_seconds(1).unwrap());

// Query
let query = format!(
"events = query_bucket(\"{}\");
RETURN = events;",
bucket.id
);
let start: DateTime<Utc> = DateTime::parse_from_rfc3339("1996-12-19T00:00:00-08:00")
.unwrap()
.into();
let end: DateTime<Utc> = DateTime::parse_from_rfc3339("2020-12-19T00:00:00-08:00")
.unwrap()
.into();
let timeperiods = (start, end);
let query_result = client.query(&query, vec![timeperiods]).unwrap();
println!("Query result: {query_result:?}");

client
.delete_event(&bucketname, events[0].id.unwrap())
2 changes: 1 addition & 1 deletion aw-datastore/src/worker.rs
Original file line number Diff line number Diff line change
@@ -177,7 +177,7 @@ impl DatastoreWorker {
response_sender.respond(response);

let now: DateTime<Utc> = Utc::now();
let commit_interval_passed: bool = (now - last_commit_time) > Duration::seconds(15);
let commit_interval_passed: bool = (now - last_commit_time) > Duration::try_seconds(15).unwrap();
if self.commit
|| commit_interval_passed
|| self.uncommitted_events > 100
32 changes: 16 additions & 16 deletions aw-datastore/tests/datastore.rs
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ mod datastore_tests {
Some(created) => {
let now = Utc::now();
assert!(created <= now);
assert!(created > now - Duration::seconds(10));
assert!(created > now - Duration::try_seconds(10).unwrap());
}
};

@@ -102,7 +102,7 @@ mod datastore_tests {
Some(created) => {
let now = Utc::now();
assert!(created <= now);
assert!(created > now - Duration::seconds(10));
assert!(created > now - Duration::try_seconds(10).unwrap());
}
};

@@ -129,7 +129,7 @@ mod datastore_tests {
let e1 = Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"key": json!("value")},
};
let mut e2 = e1.clone();
@@ -157,7 +157,7 @@ mod datastore_tests {
let e1 = Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"key": json!("value")},
};
let mut e2 = e1.clone();
@@ -224,16 +224,16 @@ mod datastore_tests {
let e1 = Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(100),
duration: Duration::try_seconds(100).unwrap(),
data: json_map! {"key": json!("value")},
};

let event_list = [e1];
ds.insert_events(&bucket.id, &event_list).unwrap();

info!("Get event that covers queried timeperiod");
let query_start = now + Duration::seconds(1);
let query_end = query_start + Duration::seconds(1);
let query_start = now + Duration::try_seconds(1).unwrap();
let query_end = query_start + Duration::try_seconds(1).unwrap();
let fetched_events_limit = ds
.get_events(&bucket.id, Some(query_start), Some(query_end), Some(1))
.unwrap();
@@ -256,11 +256,11 @@ mod datastore_tests {
let e1 = Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"key": json!("value")},
};
let mut e2 = e1.clone();
e2.timestamp += Duration::seconds(1);
e2.timestamp += Duration::try_seconds(1).unwrap();

let event_list = [e1.clone(), e2.clone()];

@@ -308,7 +308,7 @@ mod datastore_tests {
let e1 = Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"key": json!("value")},
};
let mut e2 = e1.clone();
@@ -334,14 +334,14 @@ mod datastore_tests {
let e1 = Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"key": json!("value")},
};
let mut e2 = e1.clone();
e2.timestamp += Duration::seconds(1);
e2.timestamp += Duration::try_seconds(1).unwrap();

let mut e_diff_data = e2.clone();
e_diff_data.timestamp += Duration::seconds(1);
e_diff_data.timestamp += Duration::try_seconds(1).unwrap();
e_diff_data.data = json_map! {"key": json!("other value")};

// First event
@@ -358,7 +358,7 @@ mod datastore_tests {
let fetched_events = ds.get_events(&bucket.id, None, None, None).unwrap();
assert_eq!(fetched_events.len(), 1);
assert_eq!(fetched_events[0].timestamp, e1.timestamp);
assert_eq!(fetched_events[0].duration, Duration::seconds(1));
assert_eq!(fetched_events[0].duration, Duration::try_seconds(1).unwrap());
assert_eq!(fetched_events[0].data, e1.data);
assert_eq!(fetched_events[0].id, e1.id);
let e2 = &fetched_events[0];
@@ -383,7 +383,7 @@ mod datastore_tests {
let e = Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"key": json!("value")},
};
let mut e1 = e.clone();
@@ -451,7 +451,7 @@ mod datastore_tests {
let e1 = Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"key": json!("value")},
};
{
19 changes: 19 additions & 0 deletions aw-firebase-sync/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "aw-firebase-sync"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
chrono = "0.4.35"
tokio = { version = "1", features = ["full"] }
aw-client-rust = { path = "../aw-client-rust" }
aw-models = { path = "../aw-models" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.70"
serde_yaml = "0.8.18"
reqwest = { version = "0.12.4" , features = ["json"] }
dirs = "5.0.1"
tracing = "0.1.26"
tracing-subscriber = "0.3.18"
10 changes: 10 additions & 0 deletions aw-firebase-sync/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
aw-firebase-sync
================

Firebase sync for ActivityWatch [leaderboard](https://github.com/ActivityWatch/aw-firebase-leaderboard).

This is exports screentime data from ActivityWatch and uploads it to firebase.

## Status

Still in early development, not ready for use yet.
2 changes: 2 additions & 0 deletions aw-firebase-sync/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# you can get your api key from activitywatch leaderboard
apikey: your-api-key
114 changes: 114 additions & 0 deletions aw-firebase-sync/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use aw_client_rust::AwClient;
use chrono::Local;
use dirs::config_dir;
use reqwest;
use serde_json::{json, Value};
use serde_yaml;
use std::env;
use std::fs::{DirBuilder, File};
use std::io::prelude::*;
use tracing::info;
use tracing_subscriber;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let args: Vec<String> = env::args().collect();
let mut port: u16 = 5600;
let mut testing = false;
if args.len() > 1 {
for idx in 1..args.len() {
if args[idx] == "--port" {
port = args[idx + 1].parse().expect("Invalid port number");
break;
}
if args[idx] == "--testing" {
testing = true;
}
if args[idx] == "--help" {
println!("Usage: aw-firebase-sync [--testing] [--port PORT] [--help]");
return Ok(());
}
}
}
let aw_client = AwClient::new("localhost", port, "aw-firebase-sync").unwrap();

let path = config_dir()
.map(|mut path| {
path.push("activitywatch");
path.push("aw-firebase-sync");
path.push("config.yaml");
path
})
.unwrap();

if !path.exists() {
DirBuilder::new()
.recursive(true)
.create(path.as_path().parent().expect("Unable to get parent path"))
.expect("Unable to create config directory");
let mut file = File::create(&path).expect("Unable to create file");
file.write_all(b"apikey: your-api-key\n")
.expect("Unable to write to file");
panic!("Please set your API key at {:?}", path.to_str().unwrap());
}

let mut file = File::open(path).expect("Unable to open file");
let mut contents = String::new();
file.read_to_string(&mut contents)
.expect("Unable to read file");
let yaml: Value =
serde_yaml::from_str(&contents).expect("Failed parsing yaml from config file");
let apikey = yaml["apikey"]
.as_str()
.expect("unable to get api key from config file");
if apikey == "your-api-key" || apikey == "" {
panic!("Please set your API key in the config.yaml file");
}


let query = "
events = flood(query_bucket(find_bucket(\"aw-watcher-window_\")));
not_afk = flood(query_bucket(find_bucket(\"aw-watcher-afk_\")));
not_afk = filter_keyvals(not_afk, \"status\", [\"not-afk\"]);
events = filter_period_intersect(events, not_afk);
events = categorize(events, [[[\"Work\"], {\"type\": \"regex\", \"regex\": \"aw|ActivityWatch\", \"ignore_case\": true}]]);
events = filter_keyvals(events, \"$category\", [[\"Work\"]]);
RETURN = events;
";

let firebase_url = if testing {
"http://localhost:5001/aw-mockup/us-central1/uploadData"
} else {
"https://us-central1-aw-mockup.cloudfunctions.net/uploadData"
};

let firebase_client = reqwest::Client::new();

loop {
let start = Local::now().to_utc() - chrono::Duration::minutes(5);
let end = Local::now().to_utc();
let timeperiods = vec![(start, end)];

let query_result = aw_client
.query(&query, timeperiods)
.await
.expect("Failed to query data");
let query_data =
serde_json::to_string(&query_result[0]).expect("Failed to serialize query data");
let payload = json!({
"apiKey": apikey,
"data": query_data
});
let response = firebase_client
.post(firebase_url)
.json(&payload)
.send()
.await?
.json::<Value>()
.await?;
info!("Response: {:?}", response);
std::thread::sleep(std::time::Duration::from_secs(300));
}
// Ok(())
}
Empty file added aw-firebase-sync/test-sync.sh
Empty file.
6 changes: 3 additions & 3 deletions aw-models/src/event.rs
Original file line number Diff line number Diff line change
@@ -60,14 +60,14 @@ impl Default for Event {
Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: serde_json::Map::new(),
}
}
}

fn default_duration() -> Duration {
Duration::seconds(0)
Duration::try_seconds(0).unwrap()
}

#[test]
@@ -77,7 +77,7 @@ fn test_event() {
let e = Event {
id: None,
timestamp: Utc::now(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"test": json!(1)},
};
debug!("event: {:?}", e);
4 changes: 2 additions & 2 deletions aw-query/benches/benchmark.rs
Original file line number Diff line number Diff line change
@@ -61,8 +61,8 @@ mod query_benchmarks {
for i in 0..num_events {
let e = Event {
id: None,
timestamp: chrono::Utc::now() + Duration::seconds(i),
duration: Duration::seconds(10),
timestamp: chrono::Utc::now() + Duration::try_seconds(i).unwrap(),
duration: Duration::try_seconds(10).unwrap(),
data: possible_data[i as usize % 20].clone(),
};
event_list.push(e);
2 changes: 1 addition & 1 deletion aw-query/src/functions.rs
Original file line number Diff line number Diff line change
@@ -269,7 +269,7 @@ mod qfunctions {
validate::args_length(&args, 1)?;
let events: Vec<Event> = (&args[0]).try_into()?;
// Run flood
let mut flooded_events = aw_transform::flood(events, chrono::Duration::seconds(5));
let mut flooded_events = aw_transform::flood(events, chrono::Duration::try_seconds(5).unwrap());
// Put events back into DataType::Event container
let mut tagged_flooded_events = Vec::new();
for event in flooded_events.drain(..) {
4 changes: 2 additions & 2 deletions aw-query/tests/query.rs
Original file line number Diff line number Diff line change
@@ -67,14 +67,14 @@ mod query_tests {
let e1 = Event {
id: None,
timestamp: chrono::Utc::now(),
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"key": json!("value")},
};
let mut e2 = e1.clone();
e2.timestamp = chrono::Utc::now();
let mut e_replace = e2.clone();
e_replace.data = json_map! {"key": json!("value2")};
e_replace.duration = Duration::seconds(2);
e_replace.duration = Duration::try_seconds(2).unwrap();

let mut event_list = Vec::new();
event_list.push(e1);
1 change: 1 addition & 0 deletions aw-sync/src/dirs.rs
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ pub fn get_config_dir() -> Result<PathBuf, Box<dyn Error>> {
Ok(dir)
}

#[allow(dead_code)]
pub fn get_server_config_path(testing: bool) -> Result<PathBuf, ()> {
let dir = aw_server::dirs::get_config_dir()?;
Ok(dir.join(if testing {
2 changes: 1 addition & 1 deletion aw-sync/src/sync.rs
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ pub fn sync_run(
let info = client.get_info()?;

// FIXME: Here it is assumed that the device_id for the local server is the one used by
// aw-server-rust, which is not necessarily true (aw-server-python has seperate device_id).
// aw-server-rust, which is not necessarily true (aw-server-python has separate device_id).
// Therefore, this may sometimes fail to pick up the correct local datastore.
let device_id = info.device_id.as_str();

2 changes: 1 addition & 1 deletion aw-sync/tests/sync.rs
Original file line number Diff line number Diff line change
@@ -241,7 +241,7 @@ mod sync_tests {
// Insert some testing events into the bucket
let events: Vec<Event> = (0..3)
.map(|i| {
let timestamp: DateTime<Utc> = Utc::now() + Duration::milliseconds(i * 10);
let timestamp: DateTime<Utc> = Utc::now() + Duration::try_milliseconds(i * 10).unwrap();
let event_jsonstr = format!(
r#"{{
"timestamp": "{}",
4 changes: 2 additions & 2 deletions aw-transform/benches/bench.rs
Original file line number Diff line number Diff line change
@@ -31,8 +31,8 @@ fn create_events(num_events: i64) -> Vec<Event> {
for i in 0..num_events {
let e = Event {
id: None,
timestamp: chrono::Utc::now() + Duration::seconds(i),
duration: Duration::seconds(10),
timestamp: chrono::Utc::now() + Duration::try_seconds(i).unwrap(),
duration: Duration::try_seconds(10).unwrap(),
data: possible_data[i as usize % 20].clone(),
};
event_list.push(e);
6 changes: 3 additions & 3 deletions aw-transform/src/chunk.rs
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let mut e2 = e1.clone();
@@ -74,7 +74,7 @@ mod tests {

let res = chunk_events_by_key(vec![e1, e2, e3, e4], "test");
assert_eq!(res.len(), 2);
assert_eq!(res[0].duration, Duration::seconds(2));
assert_eq!(res[1].duration, Duration::seconds(1));
assert_eq!(res[0].duration, Duration::try_seconds(2).unwrap());
assert_eq!(res[1].duration, Duration::try_seconds(1).unwrap());
}
}
8 changes: 4 additions & 4 deletions aw-transform/src/filter_keyvals.rs
Original file line number Diff line number Diff line change
@@ -93,7 +93,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let mut e2 = e1.clone();
@@ -109,7 +109,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"key1": json!("value1")},
};
let mut e2 = e1.clone();
@@ -139,7 +139,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"key1": json!(100)},
};
let events = vec![e1.clone()];
@@ -153,7 +153,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let mut e2 = e1.clone();
30 changes: 15 additions & 15 deletions aw-transform/src/filter_period.rs
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ pub fn filter_period_intersect(events: Vec<Event>, filter_events: Vec<Event>) ->
loop {
let event_endtime = cur_event.calculate_endtime();
let filter_endtime = cur_filter_event.calculate_endtime();
if cur_event.duration == Duration::seconds(0) || event_endtime <= cur_filter_event.timestamp
if cur_event.duration == Duration::try_seconds(0).unwrap() || event_endtime <= cur_filter_event.timestamp
{
match events_iter.next() {
Some(e) => {
@@ -86,7 +86,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let mut e2 = e1.clone();
@@ -101,16 +101,16 @@ mod tests {
let filter_event = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:02.5Z").unwrap(),
duration: Duration::seconds(2),
duration: Duration::try_seconds(2).unwrap(),
data: json_map! {"test": json!(1)},
};

let filtered_events =
filter_period_intersect(vec![e1, e2, e3, e4, e5], vec![filter_event.clone()]);
assert_eq!(filtered_events.len(), 3);
assert_eq!(filtered_events[0].duration, Duration::milliseconds(500));
assert_eq!(filtered_events[1].duration, Duration::milliseconds(1000));
assert_eq!(filtered_events[2].duration, Duration::milliseconds(500));
assert_eq!(filtered_events[0].duration, Duration::try_milliseconds(500).unwrap());
assert_eq!(filtered_events[1].duration, Duration::try_milliseconds(1000).unwrap());
assert_eq!(filtered_events[2].duration, Duration::try_milliseconds(500).unwrap());

let dt: DateTime<Utc> = DateTime::from_str("2000-01-01T00:00:02.500Z").unwrap();
assert_eq!(filtered_events[0].timestamp, dt);
@@ -123,36 +123,36 @@ mod tests {
let e = Event {
id: None,
timestamp: timestamp_01s,
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let mut f2 = filter_event.clone();
f2.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap();
f2.duration = Duration::milliseconds(1500);
f2.duration = Duration::try_milliseconds(1500).unwrap();
let res = filter_period_intersect(vec![e.clone()], vec![f2]);
assert_eq!(res[0].timestamp, timestamp_01s);
assert_eq!(res[0].duration, Duration::milliseconds(500));
assert_eq!(res[0].duration, Duration::try_milliseconds(500).unwrap());

let timestamp_01_5s = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap();
let mut f3 = filter_event.clone();
f3.timestamp = timestamp_01_5s;
f3.duration = Duration::milliseconds(1000);
f3.duration = Duration::try_milliseconds(1000).unwrap();
let res = filter_period_intersect(vec![e.clone()], vec![f3]);
assert_eq!(res[0].timestamp, timestamp_01_5s);
assert_eq!(res[0].duration, Duration::milliseconds(500));
assert_eq!(res[0].duration,Duration::try_milliseconds(500).unwrap() );

let mut f4 = filter_event.clone();
f4.timestamp = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap();
f4.duration = Duration::milliseconds(100);
f4.duration = Duration::try_milliseconds(100).unwrap();
let res = filter_period_intersect(vec![e.clone()], vec![f4]);
assert_eq!(res[0].timestamp, timestamp_01_5s);
assert_eq!(res[0].duration, Duration::milliseconds(100));
assert_eq!(res[0].duration, Duration::try_milliseconds(100).unwrap());

let mut f5 = filter_event.clone();
f5.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap();
f5.duration = Duration::seconds(10);
f5.duration = Duration::try_seconds(10).unwrap();
let res = filter_period_intersect(vec![e.clone()], vec![f5]);
assert_eq!(res[0].timestamp, timestamp_01s);
assert_eq!(res[0].duration, Duration::milliseconds(1000));
assert_eq!(res[0].duration, Duration::try_milliseconds(1000).unwrap());
}
}
66 changes: 33 additions & 33 deletions aw-transform/src/flood.rs
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ pub fn flood(events: Vec<Event>, pulsetime: chrono::Duration) -> Vec<Event> {
let mut retry_e: Option<Event> = None;

// If negative gaps are smaller than this, prune them to become zero
let negative_gap_trim_thres = chrono::Duration::milliseconds(100);
let negative_gap_trim_thres = chrono::Duration::try_milliseconds(100).unwrap();

let mut warned_negative_gap_safe = false;
let mut warned_negative_gap_unsafe = false;
@@ -74,7 +74,7 @@ pub fn flood(events: Vec<Event>, pulsetime: chrono::Duration) -> Vec<Event> {

// We split the program flow into 2 parts: positive and negative gaps
// First we check negative gaps (if events overlap)
if gap < chrono::Duration::seconds(0) {
if gap < chrono::Duration::try_seconds(0).unwrap() {
// Python implementation:
//
// if gap < timedelta(0) and e1.data == e2.data:
@@ -129,7 +129,7 @@ pub fn flood(events: Vec<Event>, pulsetime: chrono::Duration) -> Vec<Event> {
// warned_about_negative_gap_unsafe = True

// Ensure that gap is actually non-negative here, at least in tests
debug_assert!(gap >= chrono::Duration::seconds(0));
debug_assert!(gap >= chrono::Duration::try_seconds(0).unwrap());

// If data is the same, we should merge them.
if e1.data == e2.data {
@@ -179,22 +179,22 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:03Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let e_expected = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(4),
duration: Duration::try_seconds(4).unwrap(),
data: json_map! {"test": json!(1)},
};
let res = flood(vec![e1, e2], Duration::seconds(5));
let res = flood(vec![e1, e2], Duration::try_seconds(5).unwrap());
assert_eq!(1, res.len());
assert_eq!(&res[0], &e_expected);
}
@@ -205,28 +205,28 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:03Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(2)},
};
let e1_expected = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(2),
duration: Duration::try_seconds(2).unwrap(),
data: json_map! {"test": json!(1)},
};
let e2_expected = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:02Z").unwrap(),
duration: Duration::seconds(2),
duration: Duration::try_seconds(2).unwrap(),
data: json_map! {"test": json!(2)},
};
let res = flood(vec![e1, e2], Duration::seconds(5));
let res = flood(vec![e1, e2], Duration::try_seconds(5).unwrap());
assert_eq!(2, res.len());
assert_eq!(&res[0], &e1_expected);
assert_eq!(&res[1], &e2_expected);
@@ -238,22 +238,22 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(10),
duration: Duration::try_seconds(10).unwrap(),
data: json_map! {"type": "a"},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:05Z").unwrap(),
duration: Duration::seconds(10),
duration: Duration::try_seconds(10).unwrap(),
data: json_map! {"type": "a"},
};
let e1_expected = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(15),
duration: Duration::try_seconds(15).unwrap(),
data: json_map! {"type": "a"},
};
let res = flood(vec![e1, e2], Duration::seconds(5));
let res = flood(vec![e1, e2], Duration::try_seconds(5).unwrap());
assert_eq!(1, res.len());
assert_eq!(&res[0], &e1_expected);
}
@@ -264,16 +264,16 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(10),
duration: Duration::try_seconds(10).unwrap(),
data: json_map! {"type": "a"},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(5),
duration: Duration::try_seconds(5).unwrap(),
data: json_map! {"type": "a"},
};
let res = flood(vec![e1.clone(), e2], Duration::seconds(5));
let res = flood(vec![e1.clone(), e2], Duration::try_seconds(5).unwrap());
assert_eq!(1, res.len());
assert_eq!(&res[0], &e1);
}
@@ -285,16 +285,16 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(10),
duration: Duration::try_seconds(10).unwrap(),
data: json_map! {"type": "a"},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(5),
duration: Duration::try_seconds(5).unwrap(),
data: json_map! {"type": "b"},
};
let res = flood(vec![e1.clone(), e2.clone()], Duration::seconds(5));
let res = flood(vec![e1.clone(), e2.clone()], Duration::try_seconds(5).unwrap());
assert_eq!(2, res.len());
assert_eq!(&res[0], &e1);
assert_eq!(&res[1], &e2);
@@ -309,30 +309,30 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"status": "afk"},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(5),
duration: Duration::try_seconds(5).unwrap(),
data: json_map! {"status": "not-afk"},
};
let e3 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"status": "not-afk"},
};
let e4 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:06Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"status": "afk"},
};
let res = flood(
vec![e1.clone(), e2.clone(), e3, e4.clone()],
Duration::seconds(5),
Duration::try_seconds(5).unwrap(),
);
assert_eq!(3, res.len());
assert_eq!(&res[0], &e1);
@@ -350,36 +350,36 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"status": "afk"},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(5),
duration: Duration::try_seconds(5).unwrap(),
data: json_map! {"status": "not-afk"},
};
let e3 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"status": "not-afk"},
};
let e4 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(10),
duration: Duration::try_seconds(10).unwrap(),
data: json_map! {"status": "not-afk"},
};
let e5 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:11Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"status": "afk"},
};
let res = flood(
vec![e1.clone(), e2, e3, e4.clone(), e5.clone()],
Duration::seconds(5),
Duration::try_seconds(5).unwrap(),
);
assert_eq!(3, res.len());
assert_eq!(&res[0], &e1);
30 changes: 15 additions & 15 deletions aw-transform/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -74,20 +74,20 @@ mod tests {
let event1 = Event {
id: None,
timestamp: now,
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let heartbeat1 = Event {
id: None,
timestamp: now + Duration::seconds(2),
duration: Duration::seconds(1),
timestamp: now + Duration::try_seconds(2).unwrap(),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};

// Merge result
let res_merge = heartbeat(&event1, &heartbeat1, 2.0).unwrap();
assert_eq!(res_merge.timestamp, now);
assert_eq!(res_merge.duration, Duration::seconds(3));
assert_eq!(res_merge.duration, Duration::try_seconds(3).unwrap());
assert_eq!(res_merge.data, event1.data);

// No merge result
@@ -103,22 +103,22 @@ mod tests {
let event = Event {
id: None,
timestamp: now,
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let long_pulse_event = Event {
id: None,
// note that no duration is sent, which is how aw-client works
duration: Duration::seconds(0),
timestamp: now + Duration::seconds(120),
duration: Duration::try_seconds(0).unwrap(),
timestamp: now + Duration::try_seconds(120).unwrap(),
data: json_map! {"test": json!(1)},
};

// Merge result
let res_merge = heartbeat(&event, &long_pulse_event, 120.0).unwrap();
assert_eq!(res_merge.timestamp, now);
assert_eq!(res_merge.data, event.data);
assert_eq!(res_merge.duration, Duration::seconds(120));
assert_eq!(res_merge.duration, Duration::try_seconds(120).unwrap());

// No merge result when pulsetime is less than the timestamp delta between heartbeats
let res_no_merge = heartbeat(&event, &long_pulse_event, 60.0);
@@ -131,13 +131,13 @@ mod tests {
let event = Event {
id: None,
timestamp: now,
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"test": json!(1)},
};
let heartbeat_same_data = Event {
id: None,
timestamp: now,
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};

@@ -148,7 +148,7 @@ mod tests {
let heartbeat_different_data = Event {
id: None,
timestamp: now,
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(2)},
};
// Data is different, should not merge
@@ -162,22 +162,22 @@ mod tests {
let event = Event {
id: None,
timestamp: now,
duration: Duration::seconds(0),
duration: Duration::try_seconds(0).unwrap(),
data: json_map! {"test": json!(1)},
};
let heartbeat_same_data = Event {
id: None,
timestamp: now,
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};

// Should merge
let res_merge = heartbeat(&event, &heartbeat_same_data, 1.0).unwrap();
assert_eq!(Duration::seconds(1), res_merge.duration);
assert_eq!(Duration::try_seconds(1).unwrap(), res_merge.duration);

// Order shouldn't matter, should merge anyway
let res_merge = heartbeat(&heartbeat_same_data, &event, 1.0).unwrap();
assert_eq!(Duration::seconds(1), res_merge.duration);
assert_eq!(Duration::try_seconds(1).unwrap(), res_merge.duration);
}
}
12 changes: 6 additions & 6 deletions aw-transform/src/merge.rs
Original file line number Diff line number Diff line change
@@ -97,25 +97,25 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(3),
duration: Duration::try_seconds(3).unwrap(),
data: json_map! {"test2": json!(3)},
};
let e3 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:02Z").unwrap(),
duration: Duration::seconds(7),
duration: Duration::try_seconds(7).unwrap(),
data: json_map! {"test": json!(6)},
};
let e4 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:03Z").unwrap(),
duration: Duration::seconds(9),
duration: Duration::try_seconds(9).unwrap(),
data: json_map! {"test": json!(1)},
};
let in_events = vec![e1, e2, e3, e4];
@@ -126,13 +126,13 @@ mod tests {
Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(10),
duration: Duration::try_seconds(10).unwrap(),
data: json_map! {"test": json!(1)},
},
Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:02Z").unwrap(),
duration: Duration::seconds(7),
duration: Duration::try_seconds(7).unwrap(),
data: json_map! {"test": json!(6)},
},
];
10 changes: 5 additions & 5 deletions aw-transform/src/period_union.rs
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};

@@ -91,7 +91,7 @@ mod tests {

let dt: DateTime<Utc> = DateTime::from_str("2000-01-01T00:00:01.000Z").unwrap();
assert_eq!(e_result[0].timestamp, dt);
assert_eq!(e_result[0].duration, Duration::milliseconds(2000));
assert_eq!(e_result[0].duration, Duration::try_milliseconds(2000).unwrap());
}

/// Make sure nothing gets done when nothing to union (gaps present)
@@ -100,7 +100,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};

@@ -116,7 +116,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};

@@ -129,7 +129,7 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};

8 changes: 4 additions & 4 deletions aw-transform/src/sort.rs
Original file line number Diff line number Diff line change
@@ -29,13 +29,13 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:03Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let res = sort_by_timestamp(vec![e2.clone(), e1.clone()]);
@@ -47,13 +47,13 @@ mod tests {
let e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
duration: Duration::seconds(2),
duration: Duration::try_seconds(2).unwrap(),
data: json_map! {"test": json!(1)},
};
let e2 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:03Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"test": json!(1)},
};
let res = sort_by_duration(vec![e2.clone(), e1.clone()]);
2 changes: 1 addition & 1 deletion aw-transform/src/split_url.rs
Original file line number Diff line number Diff line change
@@ -81,7 +81,7 @@ mod tests {
let mut e1 = Event {
id: None,
timestamp: DateTime::from_str("2000-01-01T00:00:01Z").unwrap(),
duration: Duration::seconds(1),
duration: Duration::try_seconds(1).unwrap(),
data: json_map! {"url": "http://www.google.com/path?query=1"},
};
split_url_event(&mut e1);
14 changes: 7 additions & 7 deletions aw-transform/src/union_no_overlap.rs
Original file line number Diff line number Diff line change
@@ -83,11 +83,11 @@ mod tests {
#[test]
fn test_split_event() {
let now = Utc::now();
let td1h = Duration::hours(1);
let td1h = Duration::try_hours(1).unwrap();
let e = Event {
id: None,
timestamp: now,
duration: Duration::hours(2),
duration: Duration::try_hours(2).unwrap(),
data: serde_json::Map::new(),
};
let (e1, e2_opt) = split_event(&e, now + td1h);
@@ -101,15 +101,15 @@ mod tests {
// Now a test which does not lead to a split
let (e1, e2_opt) = split_event(&e, now);
assert_eq!(e1.timestamp, now);
assert_eq!(e1.duration, Duration::hours(2));
assert_eq!(e1.duration, Duration::try_hours(2).unwrap());
assert!(e2_opt.is_none());
}

#[test]
fn test_union_no_overlap() {
// A test without any actual overlap
let now = Utc::now();
let td1h = Duration::hours(1);
let td1h = Duration::try_hours(1).unwrap();
let e1 = Event::new(now, td1h, serde_json::Map::new());
let e2 = Event::new(now + td1h, td1h, serde_json::Map::new());
let events1 = vec![e1.clone()];
@@ -139,9 +139,9 @@ mod tests {
fn test_union_no_overlap_with_overlap() {
// A test where the events overlap
let now = Utc::now();
let td1h = Duration::hours(1);
let td1h = Duration::try_hours(1).unwrap();
let e1 = Event::new(now, td1h, serde_json::Map::new());
let e2 = Event::new(now, Duration::hours(2), serde_json::Map::new());
let e2 = Event::new(now, Duration::try_hours(1).unwrap(), serde_json::Map::new());
let events1 = vec![e1];
let events2 = vec![e2];
let events_union = union_no_overlap(events1, events2);
@@ -154,7 +154,7 @@ mod tests {

// Now test the case where e2 starts before e1
let e1 = Event::new(now + td1h, td1h, serde_json::Map::new());
let e2 = Event::new(now, Duration::hours(2), serde_json::Map::new());
let e2 = Event::new(now, Duration::try_hours(2).unwrap(), serde_json::Map::new());
let events1 = vec![e1];
let events2 = vec![e2];
let events_union = union_no_overlap(events1, events2);