Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import io.reactivex.rxjava3.subjects.PublishSubject
import io.reactivex.rxjava3.subjects.Subject
import java.util.Objects
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
Expand Down Expand Up @@ -93,8 +94,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
)
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.acquireSemaphore(semaphore, "getMutationForModelId")
.blockingAwait()
return mutationResult.get()
}
Expand All @@ -106,7 +106,10 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
): Set<String> {
// We chunk sql query to 950 items to prevent hitting 1k sqlite predicate limit
// Improvement would be to use IN, but not currently supported in our query builders
val methodName = "fetchPendingMutations"
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
val pendingMutations: Set<String> = models.chunked(950).fold(mutableSetOf()) { acc, chunk ->
val queryOptions = Where.matches(
QueryPredicateGroup(
Expand Down Expand Up @@ -149,12 +152,14 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
acc
}
LOG.debug { "[$methodName] Releasing outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.release()
return pendingMutations
}

private fun getMutationById(mutationId: String): PendingMutation<out Model>? {
val mutationResult = AtomicReference<PendingMutation<out Model>>()

Completable.create { emitter: CompletableEmitter ->
storage.query(
PersistentRecord::class.java,
Expand All @@ -175,8 +180,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
)
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.acquireSemaphore(semaphore, "getMutationById")
.blockingAwait()
return mutationResult.get()
}
Expand All @@ -195,8 +199,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
return@defer resolveConflict<T>(existingMutation, incomingMutation)
}
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.acquireSemaphore(semaphore, "enqueue")
}

private fun <T : Model> resolveConflict(
Expand All @@ -221,7 +224,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
// to get identically the thing that was saved. But we know the save succeeded.
// So, let's skip the unwrapping, and use the thing that was enqueued,
// the pendingMutation, directly.
LOG.info("Successfully enqueued $pendingMutation")
LOG.info { "Successfully enqueued $pendingMutation" }
if (addingNewMutation) {
numMutationsInOutbox += 1
}
Expand All @@ -236,9 +239,8 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
}

override fun remove(pendingMutationId: TimeBasedUuid): Completable = removeNotLocking(pendingMutationId)
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
override fun remove(pendingMutationId: TimeBasedUuid): Completable =
removeNotLocking(pendingMutationId).acquireSemaphore(semaphore, "remove")

private fun removeNotLocking(pendingMutationId: TimeBasedUuid): Completable {
Objects.requireNonNull(pendingMutationId)
Expand All @@ -256,7 +258,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
QueryPredicates.all(),
{
inFlightMutations.remove(pendingMutationId)
LOG.info("Successfully removed from mutations outbox$pendingMutation")
LOG.info { "Successfully removed from mutations outbox$pendingMutation" }
numMutationsInOutbox -= 1
val contentAvailable = numMutationsInOutbox > 0
if (contentAvailable) {
Expand Down Expand Up @@ -320,8 +322,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
)
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.acquireSemaphore(semaphore, "load")
}

override fun events(): Observable<OutboxEvent> = events
Expand Down Expand Up @@ -373,6 +374,31 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
)
}

internal fun Completable.acquireSemaphore(semaphore: Semaphore, methodName: String): Completable {
val semaphoreReleased = AtomicBoolean(false)
return this.doOnSubscribe {
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
}
.doOnTerminate {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug {
"[$methodName] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()})"
}
semaphore.release()
}
}
.doFinally {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug {
"[$methodName] Releasing outbox semaphore in onFinally (permits: ${semaphore.availablePermits()})"
}
semaphore.release()
}
}
}

/**
* Encapsulate the logic to determine which actions to take based on incoming and existing
* mutations. Non-static so we can access instance methods of the outer class. Private because
Expand All @@ -390,11 +416,11 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
* @return A completable with the actions to resolve the conflict.
*/
fun resolve(): Completable {
LOG.debug(
LOG.debug {
"IncomingMutationConflict - " +
" existing " + existing.mutationType +
" incoming " + incoming.mutationType
)
}
return when (incoming.mutationType) {
PendingMutation.Type.CREATE -> handleIncomingCreate()
PendingMutation.Type.UPDATE -> handleIncomingUpdate()
Expand Down Expand Up @@ -564,7 +590,6 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
) as PendingMutation<T>
}
}

companion object {
private val LOG = Amplify.Logging.logger(CategoryType.DATASTORE, "amplify:aws-datastore")
}
Expand Down