@@ -264,70 +264,83 @@ impl PushReliability {
264264 for some unexpected reason), however for now, we just report a soft `error!()`.
265265
266266 */
267+ // Create the initial state key.
268+ // Do not place this inside of the transaction. We monitor the state key and the transaction will
269+ // fail because the value will change before the transaction completes. Yes, really.
270+ if new == ReliabilityState :: Received {
271+ trace ! (
272+ "🔍 Creating new record {state_key} ex {:?}" ,
273+ expr. unwrap_or( NO_EXPIRATION )
274+ ) ;
275+ // we can't perform this in a transaction because we can only increment if the set succeeds,
276+ // and values aren't returned when creating values in transactions. In order to do this
277+ // from inside the transaction, you would need to create a function, and that feels a bit
278+ // too heavy for this.
279+ // Create the new `state.{id}` key if it does not exist, and set the expiration.
280+ let options = redis:: SetOptions :: default ( )
281+ . with_expiration ( redis:: SetExpiry :: EX ( expr. unwrap_or ( NO_EXPIRATION ) ) )
282+ . conditional_set ( redis:: ExistenceCheck :: NX ) ;
283+ trace ! ( "🔍 ⭕ SET {state_key} NX EX {:?}" , new) ;
284+ let result = conn
285+ . set_options :: < _ , _ , Value > ( & state_key, new. to_string ( ) , options)
286+ . await
287+ . map_err ( |e| {
288+ //dbg!(&e);
289+ warn ! ( "🔍⚠️ Could not create state key: {:?}" , e) ;
290+ ApcErrorKind :: GeneralError ( "Could not create the state key" . to_owned ( ) )
291+ } ) ?;
292+ if result != redis:: Value :: Okay {
293+ error ! (
294+ "🔍⚠️ Tried to recreate state_key {state_key}: {:?}" ,
295+ & result
296+ ) ;
297+ return Err (
298+ ApcErrorKind :: GeneralError ( "Tried to recreate state_key" . to_string ( ) ) . into ( ) ,
299+ ) ;
300+ }
301+ } else {
302+ trace ! ( "🔍 Checking {:?}" , & old) ;
303+ // safety check (yes, there's still a slight chance of a race, but it's small)
304+ if let Some ( old) = old {
305+ let check_state: String = conn. get ( & state_key) . await ?;
306+ trace ! ( "🔍 Checking state for {}: {:?}" , id, & check_state) ;
307+ if check_state != old. to_string ( ) {
308+ trace ! (
309+ "🔍 Attempting to update state for {} from {} to {}, but current state is different: {:?}" ,
310+ id, old, new, check_state
311+ ) ;
312+ return Err ( ApcErrorKind :: GeneralError (
313+ "State mismatch during reliability record update" . to_owned ( ) ,
314+ )
315+ . into ( ) ) ;
316+ }
317+ } ;
318+ }
319+
267320 crate :: redis_util:: transaction (
268321 conn,
269322 & [ & state_key, & EXPIRY . to_owned ( ) ] ,
270323 self . retries ,
271324 || ApcErrorKind :: GeneralError ( "Exceeded reliability record retry attempts" . to_owned ( ) ) ,
272325 async |conn, pipe : & mut Pipeline | {
273-
274326 // The first state is special, since we need to create the `state_key`.
275- if new == ReliabilityState :: Received {
276- trace ! ( "🔍 Creating new record" ) ;
277- // we can't perform this in a transaction because we can only increment if the set succeeds,
278- // and values aren't returned when creating values in transactions. In order to do this
279- // from inside the transaction, you would need to create a function, and that feels a bit
280- // too heavy for this.
281- // Create the new `state.{id}` key if it does not exist, and set the expiration.
282- let options = redis:: SetOptions :: default ( )
283- . with_expiration ( redis:: SetExpiry :: EX ( expr. unwrap_or ( NO_EXPIRATION ) ) )
284- . conditional_set ( redis:: ExistenceCheck :: NX ) ;
285- let result = conn
286- . set_options :: < _ , _ , Value > ( & state_key, new. to_string ( ) , options)
287- . await
288- . map_err ( |e| {
289- // dbg!(&e);
290- warn ! ( "🔍⚠️ Could not create state key: {:?}" , e) ;
291- ApcErrorKind :: GeneralError ( "Could not create the state key" . to_owned ( ) )
292- } ) ?;
293- if result == redis:: Value :: Nil {
294- error ! ( "🔍⚠️ Tried to recreate state_key {state_key}" ) ;
295- return Err (
296- ApcErrorKind :: GeneralError ( "Tried to recreate state_key" . to_string ( ) ) ,
297- ) ;
298- }
299- } else {
300- trace ! ( "🔍 Checking {:?}" , & old) ;
301- // safety check (yes, there's still a slight chance of a race, but it's small)
302- if let Some ( old) = old {
303- let check_state: String = conn. get ( & state_key) . await ?;
304- trace ! ( "🔍 Checking state for {}: {:?}" , id, & check_state) ;
305- if check_state != old. to_string ( ) {
306- trace ! (
307- "🔍 Attempting to update state for {} from {} to {}, but current state is different: {:?}" ,
308- id, old, new, check_state
309- ) ;
310- return Err ( ApcErrorKind :: GeneralError (
311- "State mismatch during reliability record update" . to_owned ( ) ,
312- ) ) ;
313- }
314- } ;
315- }
316-
317327 // remove the old state from the expiry set, if it exists.
318328 // There should only be one message at a given state in the `expiry` table.
319329 // Since we only use that table to track messages that may expire. (We
320330 // decrement "expired" messages in the `gc` function, so having messages
321331 // in multiple states may decrement counts incorrectly.))
322332 if let Some ( old) = old {
333+ trace ! ( "🔍 ➖ {old} - {id}" ) ;
334+ trace ! ( "🔍 🪈⭕ HINCRBY {COUNTS} 1" ) ;
323335 pipe. hincr ( COUNTS , old. to_string ( ) , -1 ) ;
324336 let key = ExpiryKey {
325337 id : id. to_string ( ) ,
326338 state : old. to_owned ( ) ,
327339 }
328340 . to_string ( ) ;
329- pipe. zrem ( EXPIRY , & key) ;
330341 trace ! ( "🔍 internal remove old state: {:?}" , key) ;
342+ trace ! ( "🔍 🪈⭕ ZREM {EXPIRY} {key}" ) ;
343+ pipe. zrem ( EXPIRY , & key) ;
331344 }
332345 if !new. is_terminal ( ) {
333346 // Write the expiration only if the state is non-terminal. Otherwise we run the risk of
@@ -337,42 +350,51 @@ impl PushReliability {
337350 state : new. to_owned ( ) ,
338351 }
339352 . to_string ( ) ;
353+ trace ! ( "🔍 🪈⭕ ZADD {EXPIRY} {:?} {key}" , expr. unwrap_or_default( ) ) ;
340354 pipe. zadd ( EXPIRY , & key, expr. unwrap_or_default ( ) ) ;
341355 trace ! ( "🔍 internal record result: {:?}" , key) ;
342356 }
343- trace ! ( "🔍 upping {:?}" , & new ) ;
357+ trace ! ( "🔍 ➕ {new} - {id}" ) ;
344358 // Bump up the new state count, and set the state key's state if it still exists.
359+ trace ! ( "🔍 🪈⭕ HINCRBY {COUNTS} {new} 1" ) ;
345360 pipe. hincr ( COUNTS , new. to_string ( ) , 1 ) ;
346- let options = redis:: SetOptions :: default ( )
347- . with_expiration ( redis:: SetExpiry :: KEEPTTL )
348- . conditional_set ( redis:: ExistenceCheck :: XX )
349- . get ( true ) ;
350- pipe. set_options ( & state_key, new. to_string ( ) , options) ;
361+ if new != ReliabilityState :: Received {
362+ let options = redis:: SetOptions :: default ( )
363+ . with_expiration ( redis:: SetExpiry :: KEEPTTL )
364+ . conditional_set ( redis:: ExistenceCheck :: XX )
365+ . get ( true ) ;
366+ trace ! ( "🔍 🪈⭕ SET {state_key} {new} XX KEEPTTL" ) ;
367+ pipe. set_options ( & state_key, new. to_string ( ) , options) ;
368+ }
351369 // `exec_query` returns `RedisResult<()>`.
352370 // `query_async` returns `RedisResult<Option<redis::Value>, RedisError>`.
353371 // We really don't care about the returned result here, but transaction
354372 // retries if we return Ok(None), so we run the exec and return
355373 // a nonce `Some` value.
356374 // The turbo-fish is a fallback for edition 2024
357375 let result = pipe. query_async :: < redis:: Value > ( conn) . await ?;
376+ trace ! ( "🔍 🪈 {id} - {:?}" , & result) ;
358377 // The last element returned from the command is the result of the pipeline.
359378 // If Redis encounters an error, it will return a `nil` as well. We handle both
360379 // the same (retry), so we can normalize errors as `nil`.
361- // The last of which should be the result of the `SET` command, which has `GET`
362- // set. This should either return the prior value or `Ok` if things worked, else
363- // it should return `nil`, in which case we record a soft error.
380+ // The last of which should be the result of the piped command set.
381+ // This should return `nil` if there is any error, in which case we record
382+ // a soft error. (On success, it will return the result of the last command
383+ // in the pipe, which may vary due to the current state).
364384 // This could also be strung together as a cascade of functions, but it's broken
365385 // out to discrete steps for readability.
386+ if result == redis:: Value :: Nil {
387+ warn ! ( "🔍⚠🪈 {id} - Pipe failed, retry." ) ;
388+ return Ok ( None ) ;
389+ }
366390 if let Some ( operations) = result. as_sequence ( ) {
367391 // We have responses, the first items report the state of the commands,
368392 // the final line is a list of command results.
369393 if let Some ( result_values) = operations. last ( ) {
370394 if let Some ( results) = result_values. as_sequence ( ) {
371395 // The last command should contain the prior state. If it returned `Nil`
372396 // for some, unexpected reason, note the error.
373- if new != ReliabilityState :: Received
374- && Some ( & redis:: Value :: Nil ) == results. last ( )
375- {
397+ if Some ( & redis:: Value :: Nil ) == results. last ( ) {
376398 error ! ( "🔍🚨 WARNING: Lock Issue for {id}" )
377399 // There is some debate about whether or not to rerun
378400 // the transaction if this state is reached.
@@ -392,7 +414,7 @@ impl PushReliability {
392414 )
393415 . await
394416 . inspect_err ( |e| {
395- // dbg!(&e);
417+ //dbg!(&e);
396418 warn ! ( "🔍⚠️Error occurred during transaction: {:?}" , e) ;
397419 } ) ?;
398420 Ok ( ( ) )
@@ -447,6 +469,7 @@ impl PushReliability {
447469 return Err ( ApcErrorKind :: GeneralError ( err. to_owned ( ) ) ) ;
448470 } ;
449471 // Adjust the COUNTS and then remove the record from the list of expired rows.
472+ trace ! ( "🔍 ➖🪦 {} - {key}" , expiry_key. state. to_string( ) ) ;
450473 pipe. hincr ( COUNTS , expiry_key. state . to_string ( ) , -1 ) ;
451474 pipe. hincr ( COUNTS , ReliabilityState :: Expired . to_string ( ) , 1 ) ;
452475 pipe. zrem ( EXPIRY , key) ;
@@ -652,21 +675,10 @@ mod tests {
652675 . arg ( COUNTS )
653676 . arg ( new. to_string ( ) )
654677 . arg ( 1 )
655- // Create/update the state.holder value.
656- . cmd ( "SET" )
657- . arg ( & state_id)
658- . arg ( new. to_string ( ) )
659- . arg ( "XX" )
660- . arg ( "GET" )
661- . arg ( "KEEPTTL" )
662678 // Run the transaction
663679 . cmd ( "EXEC" ) ;
664680
665681 let mut conn = MockRedisConnection :: new ( vec ! [
666- MockCmd :: new(
667- redis:: cmd( "WATCH" ) . arg( & state_id) . arg( EXPIRY ) ,
668- Ok ( redis:: Value :: Okay ) ,
669- ) ,
670682 MockCmd :: new(
671683 redis:: cmd( "SET" )
672684 . arg( & state_id)
@@ -676,6 +688,10 @@ mod tests {
676688 . arg( expr) ,
677689 Ok ( redis:: Value :: Okay ) ,
678690 ) ,
691+ MockCmd :: new(
692+ redis:: cmd( "WATCH" ) . arg( & state_id) . arg( EXPIRY ) ,
693+ Ok ( redis:: Value :: Okay ) ,
694+ ) ,
679695 MockCmd :: new(
680696 pipeline,
681697 Ok ( redis:: Value :: Array ( vec![
0 commit comments