1- use dipstick:: { lazy_static, metrics, InputScope , Proxy } ;
21use failure:: Error ;
32use rumqtt:: { MqttClient , MqttOptions , Notification , QoS , ReconnectOptions , SecurityOptions } ;
43use serde:: Deserialize ;
@@ -24,7 +23,7 @@ struct SwitchConfig {
2423struct Config {
2524 source : MQTTConnectionConfig ,
2625 target : MQTTConnectionConfig ,
27- graphite_host : String ,
26+ statsd_host : String ,
2827 source_topic_prefix : String ,
2928 target_topic : String ,
3029 switches : Vec < SwitchConfig > ,
@@ -71,51 +70,39 @@ fn main() -> Result<(), Error> {
7170 }
7271}
7372
74- fn init_metrics ( config : & Config ) -> Result < Box < dyn dipstick:: InputScope > , Error > {
75- use dipstick:: { Input , Prefixed } ;
76- Ok ( Box :: new (
77- dipstick:: Graphite :: send_to ( & config. graphite_host ) ?
78- . metrics ( )
79- . named ( "gbridge_bridge" ) ,
80- ) )
73+ fn init_metrics ( config : & Config ) -> Result < statsd:: Client , Error > {
74+ statsd:: Client :: new ( & config. statsd_host , "gbridge_bridge" ) . map_err ( |e| e. into ( ) )
8175}
8276
83- fn run ( config : Config , metrics : Box < dyn dipstick:: InputScope > ) -> Result < ( ) , Error > {
84- use dipstick:: { time, * } ;
85-
86- let metrics = dipstick:: Stream :: write_to_stdout ( ) . metrics ( ) ;
87-
88- let target_connection_timer = metrics. timer ( "target_connect" ) ;
89- let ( target_mqtt_client, _target_notifications) = time ! ( target_connection_timer, {
77+ fn run ( config : Config , metrics : statsd:: Client ) -> Result < ( ) , Error > {
78+ let ( target_mqtt_client, _target_notifications) = metrics. time ( "target_connect" , || {
9079 let target_options = MqttOptions :: new ( "target" , & config. target . host , 8883 )
9180 . set_ca ( CA_CHAIN . to_vec ( ) )
9281 . set_security_opts ( SecurityOptions :: UsernamePassword (
93- config. target. user,
94- config. target. password,
82+ config. target . user . clone ( ) ,
83+ config. target . password . clone ( ) ,
9584 ) )
9685 // Reconnection appears to be broken in rumqtt. Subsequent notifications aren't handled.
9786 . set_reconnect_opts ( ReconnectOptions :: Never ) ;
9887 log:: info!( "Connecting to target {}:{}" , & config. target. host, 8883 ) ;
99- MqttClient :: start( target_options) ?
100- } ) ;
88+ MqttClient :: start ( target_options)
89+ } ) ? ;
10190
102- let source_connection_timer = metrics. timer ( "source_connect" ) ;
103- let ( mut source_mqtt_client, source_notifications) = time ! ( source_connection_timer, {
91+ let ( mut source_mqtt_client, source_notifications) = metrics. time ( "source_connect" , || {
10492 let source_options = MqttOptions :: new ( "source" , & config. source . host , 8883 )
10593 . set_ca ( CA_CHAIN . to_vec ( ) )
10694 . set_security_opts ( SecurityOptions :: UsernamePassword (
107- config. source. user,
108- config. source. password,
95+ config. source . user . clone ( ) ,
96+ config. source . password . clone ( ) ,
10997 ) )
11098 . set_reconnect_opts ( ReconnectOptions :: Never ) ;
11199 log:: info!( "Connecting to source {}:{}" , & config. source. host, 8883 ) ;
112- MqttClient :: start( source_options) ?
113- } ) ;
100+ MqttClient :: start ( source_options)
101+ } ) ? ;
114102
115103 source_mqtt_client. subscribe ( format ! ( "{}#" , config. source_topic_prefix) , QoS :: AtLeastOnce ) ?;
116104
117105 let switch_configs = prepare_switch_configs ( config. switches ) ;
118- let publish_marker = metrics. marker ( "publish_count" ) ;
119106 for notification in source_notifications {
120107 let mut client = target_mqtt_client. clone ( ) ;
121108 let target_topic = config. target_topic . to_string ( ) ;
@@ -124,7 +111,8 @@ fn run(config: Config, metrics: Box<dyn dipstick::InputScope>) -> Result<(), Err
124111 let tristate = zap_tristate ( & p. topic_name , payload, & switch_configs) ;
125112 eprintln ! ( "Received {:#?}, sending tristate {:#?}." , payload, tristate) ;
126113 if let Some ( t) = tristate {
127- publish_marker. mark ( ) ;
114+ eprintln ! ( "Marking ..." ) ;
115+ metrics. incr ( "publish" ) ;
128116 client. publish ( target_topic, QoS :: AtLeastOnce , false , t) ?
129117 }
130118 }
0 commit comments