@@ -126,7 +126,7 @@ impl PushReliability {
126
126
} )
127
127
}
128
128
129
- // Record the record state change to storage.
129
+ // Record the message state change to storage.
130
130
pub async fn record (
131
131
& self ,
132
132
reliability_id : & Option < String > ,
@@ -146,46 +146,81 @@ impl PushReliability {
146
146
new
147
147
) ;
148
148
match pool. get ( ) . await {
149
- Ok ( mut conn) => self . internal_record ( & mut conn, old, new, expr, id) . await ,
150
- Err ( e) => warn ! ( "🔍⚠️ Unable to record reliability state, {:?}" , e) ,
151
- }
149
+ Ok ( mut conn) => {
150
+ let _ = self
151
+ . internal_record ( & mut conn, old, new, expr, id)
152
+ . await
153
+ . inspect_err ( |e| {
154
+ warn ! ( "🔍⚠️ Unable to record reliability state: {:?}" , e) ;
155
+ } ) ;
156
+ }
157
+ Err ( e) => warn ! ( "🔍⚠️ Unable to get reliability state pool, {:?}" , e) ,
158
+ } ;
152
159
} ;
153
160
// Errors are not fatal, and should not impact message flow, but
154
161
// we should record them somewhere.
155
162
let _ = self . db . log_report ( id, new) . await . inspect_err ( |e| {
156
- warn ! ( "🔍⚠️ Unable to record reliability state: {:?}" , e) ;
163
+ warn ! ( "🔍⚠️ Unable to record reliability state log : {:?}" , e) ;
157
164
} ) ;
158
165
Some ( new)
159
166
}
160
167
161
- pub ( crate ) async fn internal_record < C : ConnectionLike > (
168
+ pub ( crate ) async fn internal_record < C : ConnectionLike + AsyncCommands > (
162
169
& self ,
163
170
conn : & mut C ,
164
171
old : & Option < ReliabilityState > ,
165
172
new : ReliabilityState ,
166
173
expr : Option < u64 > ,
167
174
id : & str ,
168
- ) {
169
- let mut pipeline = redis:: pipe ( ) ;
170
- pipeline. hincr ( COUNTS , new. to_string ( ) , 1 ) ;
171
-
172
- if let Some ( old) = old {
173
- pipeline. hincr ( COUNTS , old. to_string ( ) , -1 ) ;
174
- } ;
175
- // Errors are not fatal, and should not impact message flow, but
176
- // we should record them somewhere.
177
- if !new. is_terminal ( ) {
178
- // Write the expiration only if the state is non-terminal. Otherwise we run the risk of
179
- // messages reporting a false "expired" state even if they were "successful".
180
- let cc = pipeline
181
- . zadd ( EXPIRY , format ! ( "{}#{}" , new, id) , expr. unwrap_or_default ( ) )
182
- . exec_async ( conn)
183
- . await
184
- . inspect_err ( |e| {
185
- warn ! ( "🔍 Failed to write to storage: {:?}" , e) ;
186
- } ) ;
187
- trace ! ( "🔍 internal record result: {:?}" , cc) ;
188
- }
175
+ ) -> Result < ( ) > {
176
+ trace ! (
177
+ "🔍 internal record: {} from {} to {}" ,
178
+ id,
179
+ old. map( |v| v. to_string( ) )
180
+ . unwrap_or_else( || "None" . to_owned( ) ) ,
181
+ new
182
+ ) ;
183
+ crate :: redis_util:: transaction (
184
+ conn,
185
+ & [ COUNTS , EXPIRY ] ,
186
+ self . retries ,
187
+ || ApcErrorKind :: GeneralError ( "Exceeded reliability record retry attempts" . to_owned ( ) ) ,
188
+ async |conn, pipe| {
189
+ // First, increment the new state.
190
+ pipe. hincr ( COUNTS , new. to_string ( ) , 1 ) ;
191
+ // If there is an old state, decrement it.
192
+ if let Some ( old_state) = old {
193
+ pipe. hincr ( COUNTS , old_state. to_string ( ) , -1 ) ;
194
+ // remove the old state from the expiry set, if it exists.
195
+ // There should only be one message at a given state in the `expiry` table.
196
+ // Since we only use that table to track messages that may expire. (We
197
+ // decrement "expired" messages in the `gc` function, so having messages
198
+ // in multiple states may decrement counts incorrectly.))
199
+ let key = format ! ( "{}#{}" , id, old_state) ;
200
+ pipe. zrem ( EXPIRY , & key) ;
201
+ trace ! ( "🔍 internal remove old state: {:?}" , key) ;
202
+ }
203
+ if !new. is_terminal ( ) {
204
+ // Write the expiration only if the state is non-terminal. Otherwise we run the risk of
205
+ // messages reporting a false "expired" state even if they were "successful".
206
+ let key = format ! ( "{}#{}" , id, new) ;
207
+ let _ = pipe. zadd ( EXPIRY , & key, expr. unwrap_or_default ( ) ) ;
208
+ trace ! ( "🔍 internal record result: {:?}" , key) ;
209
+ }
210
+ // `exec_query` returns `RedisResult<()>`.
211
+ // `query_async` returns `RedisResult<Option<redis::Value>, RedisError>`.
212
+ // We really don't care about the returned result here, but transaction
213
+ // retries if we return Ok(None), so we run the exec and return
214
+ // a nonce `Some` value.
215
+ // The turbo-fish is a fallback for edition 2024
216
+ pipe. query_async :: < ( ) > ( conn) . await . inspect_err ( |e| {
217
+ warn ! ( "🔍 Redis internal storage error: {:?}" , e) ;
218
+ } ) ?;
219
+ Ok ( Some ( redis:: Value :: Okay ) )
220
+ } ,
221
+ )
222
+ . await ?;
223
+ Ok ( ( ) )
189
224
}
190
225
191
226
/// Perform a garbage collection cycle on a reliability object.
@@ -443,12 +478,96 @@ mod tests {
443
478
. await ;
444
479
445
480
// and mock the redis call.
446
- pr. internal_record ( & mut conn, & old, new, Some ( expr) , & test_id)
481
+ let _ = pr
482
+ . internal_record ( & mut conn, & old, new, Some ( expr) , & test_id)
447
483
. await ;
448
484
449
485
Ok ( ( ) )
450
486
}
451
487
488
+ //*
489
+ #[ actix_rt:: test]
490
+ async fn test_push_reliabilty_record ( ) -> Result < ( ) > {
491
+ let db = crate :: db:: mock:: MockDbClient :: new ( ) ;
492
+ let test_id = format ! ( "TEST_VALUE_{}" , Uuid :: new_v4( ) ) ;
493
+ let new = ReliabilityState :: Stored ;
494
+ let old = ReliabilityState :: Received ;
495
+ let expr = 1 ;
496
+
497
+ let metrics = Arc :: new ( StatsdClient :: builder ( "" , cadence:: NopMetricSink ) . build ( ) ) ;
498
+ let new_key = format ! ( "{}#{}" , & test_id, & new) ;
499
+ let old_key = format ! ( "{}#{}" , & test_id, & old) ;
500
+ let mut mock_pipe = redis:: Pipeline :: new ( ) ;
501
+ mock_pipe
502
+ . cmd ( "MULTI" )
503
+ . ignore ( )
504
+ . cmd ( "HINCRBY" )
505
+ . arg ( COUNTS )
506
+ . arg ( new. to_string ( ) )
507
+ . arg ( 1 )
508
+ . ignore ( )
509
+ . cmd ( "HINCRBY" )
510
+ . arg ( COUNTS )
511
+ . arg ( old. to_string ( ) )
512
+ . arg ( -1 )
513
+ . ignore ( )
514
+ . cmd ( "ZREM" )
515
+ . arg ( EXPIRY )
516
+ . arg ( old_key)
517
+ . ignore ( )
518
+ . cmd ( "ZADD" )
519
+ . arg ( EXPIRY )
520
+ . arg ( new_key)
521
+ . ignore ( )
522
+ . cmd ( "EXEC" )
523
+ . ignore ( ) ;
524
+
525
+ let mut conn = MockRedisConnection :: new ( vec ! [
526
+ MockCmd :: new(
527
+ redis:: cmd( "WATCH" ) . arg( COUNTS ) . arg( EXPIRY ) ,
528
+ Ok ( redis:: Value :: Okay ) ,
529
+ ) ,
530
+ // NOTE: Technically, since we `.ignore()` these, we could just have a
531
+ // Vec containing just `Okay`. I'm being a bit pedantic here because I know
532
+ // that this will come back to haunt me if I'm not, and because figuring out
533
+ // the proper response for this was annoying.
534
+ MockCmd :: new(
535
+ mock_pipe,
536
+ Ok ( redis:: Value :: Array ( vec![
537
+ redis:: Value :: Okay ,
538
+ // Match the number of commands that are being held for processing
539
+ redis:: Value :: SimpleString ( "QUEUED" . to_owned( ) ) ,
540
+ redis:: Value :: SimpleString ( "QUEUED" . to_owned( ) ) ,
541
+ redis:: Value :: SimpleString ( "QUEUED" . to_owned( ) ) ,
542
+ redis:: Value :: SimpleString ( "QUEUED" . to_owned( ) ) ,
543
+ // the exec has been called, return an array containing the results.
544
+ redis:: Value :: Array ( vec![
545
+ redis:: Value :: Okay ,
546
+ redis:: Value :: Okay ,
547
+ redis:: Value :: Okay ,
548
+ redis:: Value :: Okay ,
549
+ ] ) ,
550
+ ] ) ) ,
551
+ ) ,
552
+ // If the transaction fails, this should return a redis::Value::Nil
553
+ MockCmd :: new( redis:: cmd( "UNWATCH" ) , Ok ( redis:: Value :: Okay ) ) ,
554
+ ] ) ;
555
+
556
+ // test the main report function (note, this does not test redis)
557
+ let pr = PushReliability :: new (
558
+ & None ,
559
+ Box :: new ( Arc :: new ( db) ) ,
560
+ & metrics,
561
+ crate :: redis_util:: MAX_TRANSACTION_LOOP ,
562
+ )
563
+ . unwrap ( ) ;
564
+ let _ = pr
565
+ . internal_record ( & mut conn, & Some ( old) , new, Some ( expr) , & test_id)
566
+ . await ;
567
+
568
+ Ok ( ( ) )
569
+ }
570
+ // */
452
571
#[ actix_rt:: test]
453
572
async fn test_push_reliability_gc ( ) -> Result < ( ) > {
454
573
let db = crate :: db:: mock:: MockDbClient :: new ( ) ;
0 commit comments