@@ -145,33 +145,42 @@ public function deleteElement($listUuid, $elementUuid)
145145 $ numberOfChunks = $ this ->getNumberOfChunks ($ listUuid );
146146 $ chunkSize = $ this ->getChunkSize ($ listUuid );
147147
148- for ($ i = 1 ; $ i <= $ numberOfChunks ; ++$ i ) {
149- $ chunkNumber = $ listUuid .self ::SEPARATOR .self ::CHUNK .'- ' .$ i ;
150- $ chunk = $ this ->client ->hgetall ($ chunkNumber );
151-
152- if (array_key_exists ($ elementUuid , $ chunk )) {
153- // delete elements from chunk
154- $ this ->client ->hdel ($ chunkNumber , $ elementUuid );
155-
156- // update list index
157- $ prevIndex = $ this ->getIndex ($ listUuid );
158- $ this ->addOrUpdateListToIndex (
159- $ listUuid ,
160- ($ prevIndex ['size ' ] - 1 ),
161- $ numberOfChunks ,
162- $ chunkSize ,
163- $ prevIndex ['ttl ' ]
164- );
148+ $ options = [
149+ 'cas ' => true ,
150+ 'watch ' => $ this ->getArrayChunksKeys ($ listUuid , $ numberOfChunks ),
151+ 'retry ' => 3 ,
152+ ];
165153
166- // delete headers if counter = 0
167- $ headersKey = $ listUuid .self ::SEPARATOR .self ::HEADERS ;
168- if ($ this ->getCounter ($ listUuid ) === 0 ) {
169- $ this ->client ->del ($ headersKey );
170- }
154+ // delete in a transaction
155+ $ this ->client ->transaction ($ options , function ($ tx ) use ($ listUuid , $ numberOfChunks , $ chunkSize , $ elementUuid ) {
156+ for ($ i = 1 ; $ i <= $ numberOfChunks ; ++$ i ) {
157+ $ chunkNumber = $ listUuid .self ::SEPARATOR .self ::CHUNK .'- ' .$ i ;
158+ $ chunk = $ this ->client ->hgetall ($ chunkNumber );
159+
160+ if (array_key_exists ($ elementUuid , $ chunk )) {
161+ // delete elements from chunk
162+ $ tx ->hdel ($ chunkNumber , $ elementUuid );
163+
164+ // update list index
165+ $ prevIndex = $ this ->getIndex ($ listUuid );
166+ $ this ->addOrUpdateListToIndex (
167+ $ listUuid ,
168+ ($ prevIndex ['size ' ] - 1 ),
169+ $ numberOfChunks ,
170+ $ chunkSize ,
171+ $ prevIndex ['ttl ' ]
172+ );
173+
174+ // delete headers if counter = 0
175+ $ headersKey = $ listUuid .self ::SEPARATOR .self ::HEADERS ;
176+ if ($ this ->getCounter ($ listUuid ) === 0 ) {
177+ $ tx ->del ($ headersKey );
178+ }
171179
172- break ;
180+ break ;
181+ }
173182 }
174- }
183+ });
175184 }
176185
177186 /**
@@ -292,31 +301,40 @@ public function pushElement($listUuid, ListElement $listElement)
292301 throw new ListElementNotConsistentException ('Element ' .(string ) $ listElement ->getUuid ().' is not consistent with list data. ' );
293302 }
294303
295- $ number = $ this ->getNumberOfChunks ($ listUuid );
304+ $ numberOfChunks = $ this ->getNumberOfChunks ($ listUuid );
296305 $ chunkSize = $ this ->getChunkSize ($ listUuid );
297306
298- $ chunkNumber = $ listUuid .self ::SEPARATOR .self ::CHUNK .'- ' .$ number ;
307+ $ chunkNumber = $ listUuid .self ::SEPARATOR .self ::CHUNK .'- ' .$ numberOfChunks ;
299308
300- if (($ chunkSize - count ($ this ->client ->hgetall ($ chunkNumber ))) === 0 ) {
301- ++$ number ;
302- $ chunkNumber = $ listUuid .self ::SEPARATOR .self ::CHUNK .'- ' .$ number ;
303- }
309+ $ options = [
310+ 'cas ' => true ,
311+ 'watch ' => $ this ->getArrayChunksKeys ($ listUuid , $ numberOfChunks ),
312+ 'retry ' => 3 ,
313+ ];
304314
305- $ this ->client ->hset (
306- (string ) $ chunkNumber ,
307- (string ) $ elementUuid ,
308- (string ) $ body
309- );
315+ // persist in a transaction
316+ $ this ->client ->transaction ($ options , function ($ tx ) use ($ chunkNumber , $ numberOfChunks , $ chunkSize , $ listUuid , $ elementUuid , $ body ) {
317+ if (($ chunkSize - count ($ tx ->hgetall ($ chunkNumber ))) === 0 ) {
318+ ++$ numberOfChunks ;
319+ $ chunkNumber = $ listUuid .self ::SEPARATOR .self ::CHUNK .'- ' .$ numberOfChunks ;
320+ }
310321
311- // update list index
312- $ prevIndex = $ this ->getIndex ($ listUuid );
313- $ this ->addOrUpdateListToIndex (
314- $ listUuid ,
315- ($ prevIndex ['size ' ] + 1 ),
316- $ number ,
317- $ chunkSize ,
318- $ this ->getTtl ($ listUuid )
319- );
322+ $ tx ->hset (
323+ (string ) $ chunkNumber ,
324+ (string ) $ elementUuid ,
325+ (string ) $ body
326+ );
327+
328+ // update list index
329+ $ prevIndex = $ this ->getIndex ($ listUuid );
330+ $ this ->addOrUpdateListToIndex (
331+ $ listUuid ,
332+ ($ prevIndex ['size ' ] + 1 ),
333+ $ numberOfChunks ,
334+ $ chunkSize ,
335+ $ this ->getTtl ($ listUuid )
336+ );
337+ });
320338 }
321339
322340 /**
@@ -402,25 +420,35 @@ public function updateTtl($listUuid, $ttl)
402420
403421 // update ttl of all chunks
404422 $ numberOfChunks = $ this ->getNumberOfChunks ($ listUuid );
405- for ($ i = 1 ; $ i <= $ numberOfChunks ; ++$ i ) {
406- $ this ->client ->expire (
407- (string ) $ listUuid .self ::SEPARATOR .self ::CHUNK .'- ' .$ i ,
408- (int ) $ ttl
409- );
410- }
411423
412- // update ttl of headers array (if present)
413- if ($ this ->getHeaders ($ listUuid )) {
414- $ this ->client ->expire ($ listUuid .self ::SEPARATOR .self ::HEADERS , $ ttl );
415- }
424+ $ options = [
425+ 'cas ' => true ,
426+ 'watch ' => $ this ->getArrayChunksKeys ($ listUuid , $ numberOfChunks ),
427+ 'retry ' => 3 ,
428+ ];
416429
417- // update index
418- $ this ->addOrUpdateListToIndex (
419- $ listUuid ,
420- $ this ->getCounter ($ listUuid ),
421- $ this ->getNumberOfChunks ($ listUuid ),
422- $ this ->getChunkSize ($ listUuid ),
423- $ ttl
424- );
430+ // persist in a transaction
431+ $ this ->client ->transaction ($ options , function ($ tx ) use ($ numberOfChunks , $ listUuid , $ ttl ) {
432+ for ($ i = 1 ; $ i <= $ numberOfChunks ; ++$ i ) {
433+ $ tx ->expire (
434+ (string ) $ listUuid .self ::SEPARATOR .self ::CHUNK .'- ' .$ i ,
435+ (int ) $ ttl
436+ );
437+ }
438+
439+ // update ttl of headers array (if present)
440+ if ($ this ->getHeaders ($ listUuid )) {
441+ $ tx ->expire ($ listUuid .self ::SEPARATOR .self ::HEADERS , $ ttl );
442+ }
443+
444+ // update index
445+ $ this ->addOrUpdateListToIndex (
446+ $ listUuid ,
447+ $ this ->getCounter ($ listUuid ),
448+ $ this ->getNumberOfChunks ($ listUuid ),
449+ $ this ->getChunkSize ($ listUuid ),
450+ $ ttl
451+ );
452+ });
425453 }
426454}
0 commit comments