Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Orchestrator(
* and started (asynchronously) the transition to SYNC_VIA_API, if an API is available.
*/
public synchronized Completable start() {
return performSynchronized(() -> {
return performSynchronized("start", () -> {
switch (targetState.get()) {
case LOCAL_ONLY:
disposeNetworkChanges();
Expand All @@ -185,12 +185,11 @@ public synchronized Completable start() {
* @return A completable which emits success when orchestrator stops
*/
public synchronized Completable stop() {
return performSynchronized(this::transitionToStopped);
return performSynchronized("stop", this::transitionToStopped);
}

private Completable performSynchronized(Action action) {
boolean permitAvailable = startStopSemaphore.availablePermits() > 0;
LOG.debug("Attempting to acquire lock. Permits available = " + permitAvailable);
private Completable performSynchronized(String methodName, Action action) {
LOG.debug("[" + methodName + "] Attempting to acquire lock (permits: " + startStopSemaphore.availablePermits() + ")");
try {
if (!startStopSemaphore.tryAcquire(LOCAL_OP_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
return Completable.error(new DataStoreException("Timed out acquiring orchestrator lock.",
Expand All @@ -200,13 +199,13 @@ private Completable performSynchronized(Action action) {
return Completable.error(new DataStoreException("Interrupted while acquiring orchestrator lock.",
"Retry your request."));
}
LOG.info("Orchestrator lock acquired.");
LOG.info("[" + methodName + "] Orchestrator lock acquired (permits: " + startStopSemaphore.availablePermits() + ")");
return Completable.fromAction(action).doOnError((e) -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
LOG.info("[" + methodName + "] Orchestrator lock released (permits: " + startStopSemaphore.availablePermits() + ")");
}).andThen(Completable.fromAction(() -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
LOG.info("[" + methodName + "] Orchestrator lock released (permits: " + startStopSemaphore.availablePermits() + ")");
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import io.reactivex.rxjava3.subjects.PublishSubject
import io.reactivex.rxjava3.subjects.Subject
import java.util.Objects
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
Expand Down Expand Up @@ -69,6 +70,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
@VisibleForTesting
fun getMutationForModelId(modelId: String, modelClass: String): PendingMutation<out Model>? {
val mutationResult = AtomicReference<PendingMutation<out Model>>()

Completable.create { emitter: CompletableEmitter ->
storage.query(
PersistentRecord::class.java,
Expand All @@ -93,8 +95,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
)
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.acquireSemaphore(semaphore, "getMutationForModelId")
.blockingAwait()
return mutationResult.get()
}
Expand All @@ -106,7 +107,10 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
): Set<String> {
// We chunk sql query to 950 items to prevent hitting 1k sqlite predicate limit
// Improvement would be to use IN, but not currently supported in our query builders
val methodName = "fetchPendingMutations"
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
val pendingMutations: Set<String> = models.chunked(950).fold(mutableSetOf()) { acc, chunk ->
val queryOptions = Where.matches(
QueryPredicateGroup(
Expand Down Expand Up @@ -149,12 +153,14 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
acc
}
LOG.debug { "[$methodName] Releasing outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.release()
return pendingMutations
}

private fun getMutationById(mutationId: String): PendingMutation<out Model>? {
val mutationResult = AtomicReference<PendingMutation<out Model>>()

Completable.create { emitter: CompletableEmitter ->
storage.query(
PersistentRecord::class.java,
Expand All @@ -175,8 +181,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
)
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.acquireSemaphore(semaphore, "getMutationById")
.blockingAwait()
return mutationResult.get()
}
Expand All @@ -195,8 +200,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
return@defer resolveConflict<T>(existingMutation, incomingMutation)
}
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.acquireSemaphore(semaphore, "enqueue")
}

private fun <T : Model> resolveConflict(
Expand All @@ -221,7 +225,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
// to get identically the thing that was saved. But we know the save succeeded.
// So, let's skip the unwrapping, and use the thing that was enqueued,
// the pendingMutation, directly.
LOG.info("Successfully enqueued $pendingMutation")
LOG.info { "Successfully enqueued $pendingMutation" }
if (addingNewMutation) {
numMutationsInOutbox += 1
}
Expand All @@ -236,9 +240,8 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
}

override fun remove(pendingMutationId: TimeBasedUuid): Completable = removeNotLocking(pendingMutationId)
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
override fun remove(pendingMutationId: TimeBasedUuid): Completable =
removeNotLocking(pendingMutationId).acquireSemaphore(semaphore, "remove")

private fun removeNotLocking(pendingMutationId: TimeBasedUuid): Completable {
Objects.requireNonNull(pendingMutationId)
Expand All @@ -256,7 +259,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
QueryPredicates.all(),
{
inFlightMutations.remove(pendingMutationId)
LOG.info("Successfully removed from mutations outbox$pendingMutation")
LOG.info { "Successfully removed from mutations outbox$pendingMutation" }
numMutationsInOutbox -= 1
val contentAvailable = numMutationsInOutbox > 0
if (contentAvailable) {
Expand Down Expand Up @@ -320,8 +323,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
)
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.acquireSemaphore(semaphore, "load")
}

override fun events(): Observable<OutboxEvent> = events
Expand Down Expand Up @@ -373,6 +375,31 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
)
}

internal fun Completable.acquireSemaphore(semaphore: Semaphore, methodName: String): Completable {
val semaphoreReleased = AtomicBoolean(false)
return this.doOnSubscribe {
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
}
.doOnTerminate {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug {
"[$methodName] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()})"
}
semaphore.release()
}
}
.doFinally {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug {
"[$methodName] Releasing outbox semaphore in onFinally (permits: ${semaphore.availablePermits()})"
}
semaphore.release()
}
}
}

/**
* Encapsulate the logic to determine which actions to take based on incoming and existing
* mutations. Non-static so we can access instance methods of the outer class. Private because
Expand All @@ -390,11 +417,11 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
* @return A completable with the actions to resolve the conflict.
*/
fun resolve(): Completable {
LOG.debug(
LOG.debug {
"IncomingMutationConflict - " +
" existing " + existing.mutationType +
" incoming " + incoming.mutationType
)
}
return when (incoming.mutationType) {
PendingMutation.Type.CREATE -> handleIncomingCreate()
PendingMutation.Type.UPDATE -> handleIncomingUpdate()
Expand Down Expand Up @@ -564,7 +591,6 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
) as PendingMutation<T>
}
}

companion object {
private val LOG = Amplify.Logging.logger(CategoryType.DATASTORE, "amplify:aws-datastore")
}
Expand Down