@@ -53,6 +53,8 @@ pub enum Response {
53
53
Count ( i64 ) ,
54
54
KeyValue ( KeyValue ) ,
55
55
StringVec ( Vec < String > ) ,
56
+ // Used to indicate that no response should occur at all (not even an empty one)
57
+ NoResponse ( ) ,
56
58
}
57
59
58
60
#[ allow( clippy:: large_enum_variant) ]
@@ -78,6 +80,7 @@ pub enum Command {
78
80
GetKeyValue ( String ) ,
79
81
GetKeysStarting ( String ) ,
80
82
DeleteKeyValue ( String ) ,
83
+ Close ( ) ,
81
84
}
82
85
83
86
fn _unwrap_response (
@@ -118,7 +121,7 @@ impl DatastoreWorker {
118
121
119
122
fn work_loop ( & mut self , method : DatastoreMethod ) {
120
123
// Open SQLite connection
121
- let mut conn = match method {
124
+ let mut conn = match & method {
122
125
DatastoreMethod :: Memory ( ) => {
123
126
Connection :: open_in_memory ( ) . expect ( "Failed to create in-memory datastore" )
124
127
}
@@ -150,12 +153,20 @@ impl DatastoreWorker {
150
153
// Start handling and respond to requests
151
154
loop {
152
155
let last_commit_time: DateTime < Utc > = Utc :: now ( ) ;
153
- let mut transaction = conn
154
- . transaction_with_behavior ( TransactionBehavior :: Immediate )
155
- . unwrap ( ) ;
156
+ let mut tx: Transaction =
157
+ match conn. transaction_with_behavior ( TransactionBehavior :: Immediate ) {
158
+ Ok ( tx) => tx,
159
+ Err ( err) => {
160
+ error ! ( "Unable to start transaction! {:?}" , err) ;
161
+ // Wait 1s before retrying
162
+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 1000 ) ) ;
163
+ continue ;
164
+ }
165
+ } ;
166
+ tx. set_drop_behavior ( DropBehavior :: Commit ) ;
167
+
156
168
self . uncommited_events = 0 ;
157
169
self . commit = false ;
158
- transaction. set_drop_behavior ( DropBehavior :: Commit ) ;
159
170
loop {
160
171
let ( request, response_sender) = match self . responder . poll ( ) {
161
172
Ok ( ( req, res_sender) ) => ( req, res_sender) ,
@@ -166,19 +177,28 @@ impl DatastoreWorker {
166
177
break ;
167
178
}
168
179
} ;
169
- let response = self . handle_request ( request, & mut ds, & transaction) ;
170
- response_sender. respond ( response) ;
180
+ let response = self . handle_request ( request, & mut ds, & tx) ;
181
+ match response {
182
+ // The NoResponse is used by commands like close(), which should
183
+ // not be responded to, as the requester might have disappeared.
184
+ Ok ( Response :: NoResponse ( ) ) => ( ) ,
185
+ _ => response_sender. respond ( response) ,
186
+ }
171
187
let now: DateTime < Utc > = Utc :: now ( ) ;
172
188
let commit_interval_passed: bool = ( now - last_commit_time) > Duration :: seconds ( 15 ) ;
173
- if self . commit || commit_interval_passed || self . uncommited_events > 100 {
189
+ if self . commit
190
+ || commit_interval_passed
191
+ || self . uncommited_events > 100
192
+ || self . quit
193
+ {
174
194
break ;
175
195
} ;
176
196
}
177
197
debug ! (
178
198
"Commiting DB! Force commit {}, {} uncommited events" ,
179
199
self . commit, self . uncommited_events
180
200
) ;
181
- match transaction . commit ( ) {
201
+ match tx . commit ( ) {
182
202
Ok ( _) => ( ) ,
183
203
Err ( err) => panic ! ( "Failed to commit datastore transaction! {}" , err) ,
184
204
}
@@ -193,32 +213,30 @@ impl DatastoreWorker {
193
213
& mut self ,
194
214
request : Command ,
195
215
ds : & mut DatastoreInstance ,
196
- transaction : & Transaction ,
216
+ tx : & Transaction ,
197
217
) -> Result < Response , DatastoreError > {
198
218
match request {
199
- Command :: CreateBucket ( bucket) => match ds. create_bucket ( & transaction , bucket) {
219
+ Command :: CreateBucket ( bucket) => match ds. create_bucket ( tx , bucket) {
200
220
Ok ( _) => {
201
221
self . commit = true ;
202
222
Ok ( Response :: Empty ( ) )
203
223
}
204
224
Err ( e) => Err ( e) ,
205
225
} ,
206
- Command :: DeleteBucket ( bucketname) => {
207
- match ds. delete_bucket ( & transaction, & bucketname) {
208
- Ok ( _) => {
209
- self . commit = true ;
210
- Ok ( Response :: Empty ( ) )
211
- }
212
- Err ( e) => Err ( e) ,
226
+ Command :: DeleteBucket ( bucketname) => match ds. delete_bucket ( tx, & bucketname) {
227
+ Ok ( _) => {
228
+ self . commit = true ;
229
+ Ok ( Response :: Empty ( ) )
213
230
}
214
- }
231
+ Err ( e) => Err ( e) ,
232
+ } ,
215
233
Command :: GetBucket ( bucketname) => match ds. get_bucket ( & bucketname) {
216
234
Ok ( b) => Ok ( Response :: Bucket ( b) ) ,
217
235
Err ( e) => Err ( e) ,
218
236
} ,
219
237
Command :: GetBuckets ( ) => Ok ( Response :: BucketMap ( ds. get_buckets ( ) ) ) ,
220
238
Command :: InsertEvents ( bucketname, events) => {
221
- match ds. insert_events ( & transaction , & bucketname, events) {
239
+ match ds. insert_events ( tx , & bucketname, events) {
222
240
Ok ( events) => {
223
241
self . uncommited_events += events. len ( ) ;
224
242
self . last_heartbeat . insert ( bucketname. to_string ( ) , None ) ; // invalidate last_heartbeat cache
@@ -228,13 +246,7 @@ impl DatastoreWorker {
228
246
}
229
247
}
230
248
Command :: Heartbeat ( bucketname, event, pulsetime) => {
231
- match ds. heartbeat (
232
- & transaction,
233
- & bucketname,
234
- event,
235
- pulsetime,
236
- & mut self . last_heartbeat ,
237
- ) {
249
+ match ds. heartbeat ( tx, & bucketname, event, pulsetime, & mut self . last_heartbeat ) {
238
250
Ok ( e) => {
239
251
self . uncommited_events += 1 ;
240
252
Ok ( Response :: Event ( e) )
@@ -243,31 +255,25 @@ impl DatastoreWorker {
243
255
}
244
256
}
245
257
Command :: GetEvent ( bucketname, event_id) => {
246
- match ds. get_event ( & transaction , & bucketname, event_id) {
258
+ match ds. get_event ( tx , & bucketname, event_id) {
247
259
Ok ( el) => Ok ( Response :: Event ( el) ) ,
248
260
Err ( e) => Err ( e) ,
249
261
}
250
262
}
251
263
Command :: GetEvents ( bucketname, starttime_opt, endtime_opt, limit_opt) => {
252
- match ds. get_events (
253
- & transaction,
254
- & bucketname,
255
- starttime_opt,
256
- endtime_opt,
257
- limit_opt,
258
- ) {
264
+ match ds. get_events ( tx, & bucketname, starttime_opt, endtime_opt, limit_opt) {
259
265
Ok ( el) => Ok ( Response :: EventList ( el) ) ,
260
266
Err ( e) => Err ( e) ,
261
267
}
262
268
}
263
269
Command :: GetEventCount ( bucketname, starttime_opt, endtime_opt) => {
264
- match ds. get_event_count ( & transaction , & bucketname, starttime_opt, endtime_opt) {
270
+ match ds. get_event_count ( tx , & bucketname, starttime_opt, endtime_opt) {
265
271
Ok ( n) => Ok ( Response :: Count ( n) ) ,
266
272
Err ( e) => Err ( e) ,
267
273
}
268
274
}
269
275
Command :: DeleteEventsById ( bucketname, event_ids) => {
270
- match ds. delete_events_by_id ( & transaction , & bucketname, event_ids) {
276
+ match ds. delete_events_by_id ( tx , & bucketname, event_ids) {
271
277
Ok ( ( ) ) => Ok ( Response :: Empty ( ) ) ,
272
278
Err ( e) => Err ( e) ,
273
279
}
@@ -276,26 +282,26 @@ impl DatastoreWorker {
276
282
self . commit = true ;
277
283
Ok ( Response :: Empty ( ) )
278
284
}
279
- Command :: InsertKeyValue ( key, data) => {
280
- match ds. insert_key_value ( & transaction, & key, & data) {
281
- Ok ( ( ) ) => Ok ( Response :: Empty ( ) ) ,
282
- Err ( e) => Err ( e) ,
283
- }
284
- }
285
- Command :: GetKeyValue ( key) => match ds. get_key_value ( & transaction, & key) {
285
+ Command :: InsertKeyValue ( key, data) => match ds. insert_key_value ( tx, & key, & data) {
286
+ Ok ( ( ) ) => Ok ( Response :: Empty ( ) ) ,
287
+ Err ( e) => Err ( e) ,
288
+ } ,
289
+ Command :: GetKeyValue ( key) => match ds. get_key_value ( tx, & key) {
286
290
Ok ( result) => Ok ( Response :: KeyValue ( result) ) ,
287
291
Err ( e) => Err ( e) ,
288
292
} ,
289
- Command :: GetKeysStarting ( pattern) => {
290
- match ds. get_keys_starting ( & transaction, & pattern) {
291
- Ok ( result) => Ok ( Response :: StringVec ( result) ) ,
292
- Err ( e) => Err ( e) ,
293
- }
294
- }
295
- Command :: DeleteKeyValue ( key) => match ds. delete_key_value ( & transaction, & key) {
293
+ Command :: GetKeysStarting ( pattern) => match ds. get_keys_starting ( tx, & pattern) {
294
+ Ok ( result) => Ok ( Response :: StringVec ( result) ) ,
295
+ Err ( e) => Err ( e) ,
296
+ } ,
297
+ Command :: DeleteKeyValue ( key) => match ds. delete_key_value ( tx, & key) {
296
298
Ok ( ( ) ) => Ok ( Response :: Empty ( ) ) ,
297
299
Err ( e) => Err ( e) ,
298
300
} ,
301
+ Command :: Close ( ) => {
302
+ self . quit = true ;
303
+ Ok ( Response :: NoResponse ( ) )
304
+ }
299
305
}
300
306
}
301
307
}
@@ -516,4 +522,10 @@ impl Datastore {
516
522
Err ( e) => Err ( e) ,
517
523
}
518
524
}
525
+
526
+ // TODO: Should this block until worker has stopped?
527
+ pub fn close ( & self ) {
528
+ info ! ( "Sending close request to database" ) ;
529
+ self . requester . request ( Command :: Close ( ) ) . unwrap ( ) ;
530
+ }
519
531
}
0 commit comments