Skip to content

Commit

Permalink
Issue #34 - Experimental field duration in Condition
Browse files Browse the repository at this point in the history
  • Loading branch information
David Rajchenbach-Teller committed Mar 18, 2016
1 parent d14ae42 commit dfe76dc
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 29 deletions.
8 changes: 7 additions & 1 deletion examples/simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl Drop for TestAdapterWatchGuard {


/// The test environment.
#[derive(Clone)]
struct TestEnv {
/// The manager in charge of all adapters.
manager: AdapterManager,
Expand All @@ -319,6 +320,11 @@ impl ExecutableDevEnv for TestEnv {
fn api(&self) -> Self::API {
self.manager.clone()
}

type TimerGuard = (); // FIXME: Implement
fn start_timer(&self, duration: Duration, timer: Box<ExtSender<()>>) -> Self::TimerGuard {
unimplemented!()
}
}
impl TestEnv {
fn new(on_event: Box<ExtSender<SimulatorEvent>>) -> Self {
Expand Down Expand Up @@ -473,7 +479,7 @@ fn main () {

let mut runner = Execution::<TestEnv>::new();
let (tx, rx) = channel();
runner.start(env.api(), script, tx).unwrap();
runner.start(env.clone(), script, tx).unwrap();
match rx.recv().unwrap() {
Starting { result: Ok(()) } => println!("ready."),
err => panic!("Could not launch script {:?}", err)
Expand Down
14 changes: 11 additions & 3 deletions src/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
//! - Transform each `Statement` to make sure that the kind of the
//! `destination` matches the `kind`, even if devices change.

use std::marker::PhantomData;

use ast::{Script, Rule, Statement, Match, Context, UncheckedCtx};
use ast::{ Script, Rule, Statement, Match, Context, UncheckedCtx };
use util::*;

use foxbox_taxonomy::api::API;
use foxbox_taxonomy::util::Phantom;
use foxbox_taxonomy::values::Duration;

use transformable_channels::mpsc::*;

use std::marker::PhantomData;

use serde::ser::{Serialize, Serializer};
use serde::de::{Deserialize, Deserializer};
Expand All @@ -35,7 +38,12 @@ use serde::de::{Deserialize, Deserializer};
pub trait ExecutableDevEnv: Serialize + Deserialize + Send {
type WatchGuard;
type API: API<WatchGuard = Self::WatchGuard>;

/// Return a handle to the API.
fn api(&self) -> Self::API;

type TimerGuard;
fn start_timer(&self, duration: Duration, timer: Box<ExtSender<()>>) -> Self::TimerGuard;
}


Expand Down
135 changes: 110 additions & 25 deletions src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use foxbox_taxonomy::api;
use foxbox_taxonomy::api::{ API, Error as APIError, WatchEvent };
use foxbox_taxonomy::services::{ Getter, Setter };
use foxbox_taxonomy::util::{ Exactly, Id };
use foxbox_taxonomy::values::Duration;

use transformable_channels::mpsc::*;

use std::collections::HashMap;
use std::marker::PhantomData;
use std::thread;
use std::collections::HashMap;
use std::sync::Mutex;

/// Running and controlling a single script.
pub struct Execution<Env> where Env: ExecutableDevEnv + 'static {
Expand Down Expand Up @@ -43,7 +45,7 @@ impl<Env> Execution<Env> where Env: ExecutableDevEnv + 'static {
/// are:
/// - `RunningError:AlreadyRunning` if the script is already running;
/// - a compilation error if the script was incorrect.
pub fn start<S>(&mut self, api: Env::API, script: Script<UncheckedCtx>, on_event: S) ->
pub fn start<S>(&mut self, env: Env, script: Script<UncheckedCtx>, on_event: S) ->
Result<(), Error>
where S: ExtSender<ExecutionEvent>
{
Expand Down Expand Up @@ -72,7 +74,7 @@ impl<Env> Execution<Env> where Env: ExecutableDevEnv + 'static {
result: Ok(())
});
let _ = tx_init.send(Ok(()));
task.run(api, on_event);
task.run(env, on_event);
}
}
});
Expand All @@ -97,7 +99,7 @@ impl<Env> Execution<Env> where Env: ExecutableDevEnv + 'static {
},
Some(ref tx) => {
// Shutdown the application, asynchronously.
let _ignored = tx.send(ExecutionOp::Stop(Box::new(on_result)));
let _ignored = tx.send(ExecutionOp::Stop(Mutex::new(Box::new(on_result))));
}
};
self.command_sender = None;
Expand Down Expand Up @@ -145,18 +147,51 @@ pub enum ExecutionEvent {
}

enum ExecutionOp {
Update { event: WatchEvent, rule_index: usize, condition_index: usize },
/// We have received an update from the AdapterManager.
Update {
/// The individual event.
event: WatchEvent,

/// The rule to which this event applies.
rule_index: usize,

/// The index to which this event applies.
condition_index: usize
},

/// A channel state has enter/left its target range and we
/// have waited long enough to trigger the consequences.
UpdateCondition {
/// The channel that has changed state.
id: Id<Getter>,

/// `true` if the condition is now met, `false` otherwise.
is_met: bool,

/// The rule to which this event applies.
rule_index: usize,

/// The index to which this event applies.
condition_index: usize,
},

/// Time to stop executing the script.
Stop(Box<Fn(Result<(), Error>) + Send>)
Stop(Mutex<Box<Fn(Result<(), Error>) + Send>>)
}

struct ConditionState {
match_is_met: bool,
per_getter: HashMap<Id<Getter>, bool>,

/// If `None`, a duration is attached to this condition and we need to make sure that the
/// condition remains true for at least `duration` before we decide whether to proceed with
/// statements.
duration: Option<Duration>
}
struct RuleState {
struct RuleState<Env> where Env: ExecutableDevEnv {
rule_is_met: bool,
per_condition: Vec<ConditionState>,
ongoing_timer: Option<Env::TimerGuard>, // FIXME: It's actually a guard.
}

impl<Env> ExecutionTask<Env> where Env: ExecutableDevEnv {
Expand All @@ -179,13 +214,14 @@ impl<Env> ExecutionTask<Env> where Env: ExecutableDevEnv {

/// Execute the monitoring task.
/// This currently expects to be executed in its own thread.
fn run<S>(&mut self, api: Env::API, on_event: S) where S: ExtSender<ExecutionEvent> {
fn run<S>(&mut self, env: Env, on_event: S) where S: ExtSender<ExecutionEvent> {
let mut witnesses = Vec::new();
let api = env.api();

// Generate the state of rules, conditions, getters and start
// listening to changes in the getters.

// FIXME: We could optimize requests by grouping per `TargetMap<GetterSelector, Exactly<Range>>`
// FIXME: We could optimize requests by detecting if several share a `TargetMap<GetterSelector, Exactly<Range>>`
let mut per_rule : Vec<_> = self.script.rules.iter().zip(0 as usize..).map(|(rule, rule_index)| {
let per_condition = rule.conditions.iter().zip(0 as usize..).map(|(condition, condition_index)| {
// We will often end up watching several times the
Expand All @@ -210,12 +246,14 @@ impl<Env> ExecutionTask<Env> where Env: ExecutableDevEnv {
ConditionState {
match_is_met: false,
per_getter: HashMap::new(),
duration: condition.duration.clone(),
}
}).collect();

RuleState {
rule_is_met: false,
per_condition: per_condition
per_condition: per_condition,
ongoing_timer: None,
}
}).collect();

Expand All @@ -224,14 +262,14 @@ impl<Env> ExecutionTask<Env> where Env: ExecutableDevEnv {
ExecutionOp::Stop(cb) => {
// Leave the loop. Watching will stop once
// `witnesses` is dropped.
cb(Ok(()));
cb.lock().unwrap()(Ok(()));
return;
},
ExecutionOp::Update {
event,
rule_index,
condition_index,
} => match event {
ExecutionOp::UpdateCondition { id, is_met, rule_index, condition_index } => {
self.update_conditions(id, is_met, &mut per_rule,
rule_index, condition_index, &api, &on_event)
}
ExecutionOp::Update { event, rule_index, condition_index } => match event {
WatchEvent::InitializationError {
channel,
error
Expand All @@ -257,20 +295,61 @@ impl<Env> ExecutionTask<Env> where Env: ExecutableDevEnv {
.per_getter
.insert(id, false);
}
WatchEvent::EnterRange { from: id, .. } =>
self.update_conditions(id, true, &mut per_rule,
rule_index, condition_index, &api, &on_event),
WatchEvent::ExitRange { from: id, .. } =>
self.update_conditions(id, false, &mut per_rule,
rule_index, condition_index, &api, &on_event)
WatchEvent::EnterRange { from: id, .. } => {
// We have entered a range. If there is a
// timer, start it, otherwise update conditions.
let msg = move || {
ExecutionOp::UpdateCondition {
id: id.clone(),
is_met: true,
rule_index: rule_index,
condition_index: condition_index
}
};
let duration = match per_rule[rule_index].
per_condition[condition_index].
duration {
None => {
let _ = self.tx.send(msg());
continue
}
Some(ref duration) => {
duration.clone()
}
};
let tx = self.tx.map(move |()| {
msg()
});
per_rule[rule_index].ongoing_timer =
Some(env.start_timer(duration.clone(), Box::new(tx)))
}
WatchEvent::ExitRange { from: id, .. } => {
if per_rule[rule_index].ongoing_timer.is_some() {
// Cancel the timer. No need to update conditions.
per_rule[rule_index].ongoing_timer.take();
} else {
// No timer, either because it has already fired or because we don't
// have a duration. In either case, update the condition.
let msg = ExecutionOp::UpdateCondition {
id: id,
is_met: false,
rule_index: rule_index,
condition_index: condition_index
};
let _ = self.tx.send(msg);
}
}
}
}
};
}

/// A getter just entered/left a range. Update the conditions to determine whether
/// we now need to fire the statements.
fn update_conditions<S>(&self, id: Id<Getter>, getter_is_met: bool,
per_rule: &mut Vec<RuleState>, rule_index: usize, condition_index: usize,
api: &Env::API, on_event: &S) where S: ExtSender<ExecutionEvent>
per_rule: &mut Vec<RuleState<Env>>, rule_index: usize, condition_index: usize,
api: &Env::API, on_event: &S)
where S: ExtSender<ExecutionEvent>
{
use std::mem::replace;

Expand All @@ -279,10 +358,16 @@ impl<Env> ExecutionTask<Env> where Env: ExecutableDevEnv {
// empty, in case we received messages in
// the wrong order.

per_rule[rule_index]
let was_met = per_rule[rule_index]
.per_condition[condition_index]
.per_getter
.insert(id, getter_is_met); // FIXME: Could be used to optimize
if let Some(ref was_met) = was_met {
if *was_met == getter_is_met {
// Nothing has changed, no need to update any further.
return;
}
}

// 1. Is the match met?
//
Expand Down

0 comments on commit dfe76dc

Please sign in to comment.