@@ -42,11 +42,11 @@ import io.reactivex.rxjava3.subjects.PublishSubject
4242import io.reactivex.rxjava3.subjects.Subject
4343import java.util.Objects
4444import java.util.concurrent.Semaphore
45+ import java.util.concurrent.atomic.AtomicBoolean
4546import java.util.concurrent.atomic.AtomicReference
4647import kotlin.coroutines.resume
4748import kotlin.coroutines.suspendCoroutine
4849import kotlinx.coroutines.runBlocking
49- import java.util.concurrent.atomic.AtomicBoolean
5050
5151/* *
5252 * The [MutationOutbox] is a persistently-backed in-order staging ground
@@ -69,9 +69,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
6969
7070 @VisibleForTesting
7171 fun getMutationForModelId (modelId : String , modelClass : String ): PendingMutation <out Model >? {
72- val methodName = " getMutationForModelId"
7372 val mutationResult = AtomicReference <PendingMutation <out Model >>()
74- val semaphoreReleased = AtomicBoolean (false )
7573
7674 Completable .create { emitter: CompletableEmitter ->
7775 storage.query(
@@ -97,23 +95,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
9795 }
9896 )
9997 }
100- .doOnSubscribe {
101- LOG .debug { " [$methodName ] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()} )" }
102- semaphore.acquire()
103- LOG .debug { " [$methodName ] Acquired outbox semaphore (permits: ${semaphore.availablePermits()} )" }
104- }
105- .doOnTerminate {
106- if (semaphoreReleased.compareAndSet(false , true )) {
107- LOG .debug { " [$methodName ] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()} )" }
108- semaphore.release()
109- }
110- }
111- .doFinally {
112- if (semaphoreReleased.compareAndSet(false , true )) {
113- LOG .debug { " [$methodName ] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()} )" }
114- semaphore.release()
115- }
116- }
98+ .acquireSemaphore(semaphore, " getMutationForModelId" )
11799 .blockingAwait()
118100 return mutationResult.get()
119101 }
@@ -177,9 +159,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
177159 }
178160
179161 private fun getMutationById (mutationId : String ): PendingMutation <out Model >? {
180- val methodName = " getMutationById"
181162 val mutationResult = AtomicReference <PendingMutation <out Model >>()
182- val semaphoreReleased = AtomicBoolean (false )
183163
184164 Completable .create { emitter: CompletableEmitter ->
185165 storage.query(
@@ -201,31 +181,12 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
201181 }
202182 )
203183 }
204- .doOnSubscribe {
205- LOG .debug { " [$methodName ] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()} )" }
206- semaphore.acquire()
207- LOG .debug { " [$methodName ] Acquired outbox semaphore (permits: ${semaphore.availablePermits()} )" }
208- }
209- .doOnTerminate {
210- if (semaphoreReleased.compareAndSet(false , true )) {
211- LOG .debug { " [$methodName ] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()} )" }
212- semaphore.release()
213- }
214- }
215- .doFinally {
216- if (semaphoreReleased.compareAndSet(false , true )) {
217- LOG .debug { " [$methodName ] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()} )" }
218- semaphore.release()
219- }
220- }
184+ .acquireSemaphore(semaphore, " getMutationById" )
221185 .blockingAwait()
222186 return mutationResult.get()
223187 }
224188
225189 override fun <T : Model > enqueue (incomingMutation : PendingMutation <T >): Completable {
226- val methodName = " enqueue"
227- val semaphoreReleased = AtomicBoolean (false )
228-
229190 return Completable .defer {
230191 // If there is no existing mutation for the model, then just apply the incoming
231192 // mutation, and be done with this.
@@ -239,23 +200,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
239200 return @defer resolveConflict<T >(existingMutation, incomingMutation)
240201 }
241202 }
242- .doOnSubscribe {
243- LOG .debug { " [$methodName ] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()} )" }
244- semaphore.acquire()
245- LOG .debug { " [$methodName ] Acquired outbox semaphore (permits: ${semaphore.availablePermits()} )" }
246- }
247- .doOnTerminate {
248- if (semaphoreReleased.compareAndSet(false , true )) {
249- LOG .debug { " [$methodName ] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()} )" }
250- semaphore.release()
251- }
252- }
253- .doFinally {
254- if (semaphoreReleased.compareAndSet(false , true )) {
255- LOG .debug { " [$methodName ] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()} )" }
256- semaphore.release()
257- }
258- }
203+ .acquireSemaphore(semaphore, " enqueue" )
259204 }
260205
261206 private fun <T : Model > resolveConflict (
@@ -295,29 +240,8 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
295240 }
296241 }
297242
298- override fun remove (pendingMutationId : TimeBasedUuid ): Completable {
299- val methodName = " remove"
300- val semaphoreReleased = AtomicBoolean (false )
301-
302- return removeNotLocking(pendingMutationId)
303- .doOnSubscribe {
304- LOG .debug { " [$methodName ] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()} )" }
305- semaphore.acquire()
306- LOG .debug { " [$methodName ] Acquired outbox semaphore (permits: ${semaphore.availablePermits()} )" }
307- }
308- .doOnTerminate {
309- if (semaphoreReleased.compareAndSet(false , true )) {
310- LOG .debug { " [$methodName ] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()} )" }
311- semaphore.release()
312- }
313- }
314- .doFinally {
315- if (semaphoreReleased.compareAndSet(false , true )) {
316- LOG .debug { " [$methodName ] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()} )" }
317- semaphore.release()
318- }
319- }
320- }
243+ override fun remove (pendingMutationId : TimeBasedUuid ): Completable =
244+ removeNotLocking(pendingMutationId).acquireSemaphore(semaphore, " remove" )
321245
322246 private fun removeNotLocking (pendingMutationId : TimeBasedUuid ): Completable {
323247 Objects .requireNonNull(pendingMutationId)
@@ -354,9 +278,6 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
354278 }
355279
356280 override fun load (): Completable {
357- val methodName = " load"
358- val semaphoreReleased = AtomicBoolean (false )
359-
360281 return Completable .create { emitter: CompletableEmitter ->
361282 inFlightMutations.clear()
362283 var queryOptions = Where .matchesAll()
@@ -402,23 +323,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
402323 }
403324 )
404325 }
405- .doOnSubscribe {
406- LOG .debug { " [$methodName ] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()} )" }
407- semaphore.acquire()
408- LOG .debug { " [$methodName ] Acquired outbox semaphore (permits: ${semaphore.availablePermits()} )" }
409- }
410- .doOnTerminate {
411- if (semaphoreReleased.compareAndSet(false , true )) {
412- LOG .debug { " [$methodName ] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()} )" }
413- semaphore.release()
414- }
415- }
416- .doFinally {
417- if (semaphoreReleased.compareAndSet(false , true )) {
418- LOG .debug { " [$methodName ] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()} )" }
419- semaphore.release()
420- }
421- }
326+ .acquireSemaphore(semaphore, " load" )
422327 }
423328
424329 override fun events (): Observable <OutboxEvent > = events
@@ -470,6 +375,31 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
470375 )
471376 }
472377
378+ internal fun Completable.acquireSemaphore (semaphore : Semaphore , methodName : String ): Completable {
379+ val semaphoreReleased = AtomicBoolean (false )
380+ return this .doOnSubscribe {
381+ LOG .debug { " [$methodName ] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()} )" }
382+ semaphore.acquire()
383+ LOG .debug { " [$methodName ] Acquired outbox semaphore (permits: ${semaphore.availablePermits()} )" }
384+ }
385+ .doOnTerminate {
386+ if (semaphoreReleased.compareAndSet(false , true )) {
387+ LOG .debug {
388+ " [$methodName ] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()} )"
389+ }
390+ semaphore.release()
391+ }
392+ }
393+ .doFinally {
394+ if (semaphoreReleased.compareAndSet(false , true )) {
395+ LOG .debug {
396+ " [$methodName ] Releasing outbox semaphore in onFinally (permits: ${semaphore.availablePermits()} )"
397+ }
398+ semaphore.release()
399+ }
400+ }
401+ }
402+
473403 /* *
474404 * Encapsulate the logic to determine which actions to take based on incoming and existing
475405 * mutations. Non-static so we can access instance methods of the outer class. Private because
@@ -489,8 +419,8 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
489419 fun resolve (): Completable {
490420 LOG .debug {
491421 " IncomingMutationConflict - " +
492- " existing " + existing.mutationType +
493- " incoming " + incoming.mutationType
422+ " existing " + existing.mutationType +
423+ " incoming " + incoming.mutationType
494424 }
495425 return when (incoming.mutationType) {
496426 PendingMutation .Type .CREATE -> handleIncomingCreate()
@@ -661,7 +591,6 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
661591 ) as PendingMutation <T >
662592 }
663593 }
664-
665594 companion object {
666595 private val LOG = Amplify .Logging .logger(CategoryType .DATASTORE , " amplify:aws-datastore" )
667596 }
0 commit comments