Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Orchestrator(
* and started (asynchronously) the transition to SYNC_VIA_API, if an API is available.
*/
public synchronized Completable start() {
return performSynchronized(() -> {
return performSynchronized("start", () -> {
switch (targetState.get()) {
case LOCAL_ONLY:
disposeNetworkChanges();
Expand All @@ -185,12 +185,11 @@ public synchronized Completable start() {
* @return A completable which emits success when orchestrator stops
*/
public synchronized Completable stop() {
return performSynchronized(this::transitionToStopped);
return performSynchronized("stop", this::transitionToStopped);
}

private Completable performSynchronized(Action action) {
boolean permitAvailable = startStopSemaphore.availablePermits() > 0;
LOG.debug("Attempting to acquire lock. Permits available = " + permitAvailable);
private Completable performSynchronized(String methodName, Action action) {
LOG.debug("[" + methodName + "] Attempting to acquire lock (permits: " + startStopSemaphore.availablePermits() + ")");
try {
if (!startStopSemaphore.tryAcquire(LOCAL_OP_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
return Completable.error(new DataStoreException("Timed out acquiring orchestrator lock.",
Expand All @@ -200,13 +199,13 @@ private Completable performSynchronized(Action action) {
return Completable.error(new DataStoreException("Interrupted while acquiring orchestrator lock.",
"Retry your request."));
}
LOG.info("Orchestrator lock acquired.");
LOG.info("[" + methodName + "] Orchestrator lock acquired (permits: " + startStopSemaphore.availablePermits() + ")");
return Completable.fromAction(action).doOnError((e) -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
LOG.info("[" + methodName + "] Orchestrator lock released (permits: " + startStopSemaphore.availablePermits() + ")");
}).andThen(Completable.fromAction(() -> {
startStopSemaphore.release();
LOG.info("Orchestrator lock released.");
LOG.info("[" + methodName + "] Orchestrator lock released (permits: " + startStopSemaphore.availablePermits() + ")");
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicBoolean

/**
* The [MutationOutbox] is a persistently-backed in-order staging ground
Expand All @@ -68,7 +69,10 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter

@VisibleForTesting
fun getMutationForModelId(modelId: String, modelClass: String): PendingMutation<out Model>? {
val methodName = "getMutationForModelId"
val mutationResult = AtomicReference<PendingMutation<out Model>>()
val semaphoreReleased = AtomicBoolean(false)

Completable.create { emitter: CompletableEmitter ->
storage.query(
PersistentRecord::class.java,
Expand All @@ -93,8 +97,23 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
)
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.doOnSubscribe {
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
}
.doOnTerminate {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
.doFinally {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
.blockingAwait()
return mutationResult.get()
}
Expand All @@ -106,7 +125,10 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
): Set<String> {
// We chunk sql query to 950 items to prevent hitting 1k sqlite predicate limit
// Improvement would be to use IN, but not currently supported in our query builders
val methodName = "fetchPendingMutations"
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
val pendingMutations: Set<String> = models.chunked(950).fold(mutableSetOf()) { acc, chunk ->
val queryOptions = Where.matches(
QueryPredicateGroup(
Expand Down Expand Up @@ -149,12 +171,16 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
acc
}
LOG.debug { "[$methodName] Releasing outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.release()
return pendingMutations
}

private fun getMutationById(mutationId: String): PendingMutation<out Model>? {
val methodName = "getMutationById"
val mutationResult = AtomicReference<PendingMutation<out Model>>()
val semaphoreReleased = AtomicBoolean(false)

Completable.create { emitter: CompletableEmitter ->
storage.query(
PersistentRecord::class.java,
Expand All @@ -175,13 +201,31 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
)
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.doOnSubscribe {
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
}
.doOnTerminate {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
.doFinally {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
.blockingAwait()
return mutationResult.get()
}

override fun <T : Model> enqueue(incomingMutation: PendingMutation<T>): Completable {
val methodName = "enqueue"
val semaphoreReleased = AtomicBoolean(false)

return Completable.defer {
// If there is no existing mutation for the model, then just apply the incoming
// mutation, and be done with this.
Expand All @@ -195,8 +239,23 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
return@defer resolveConflict<T>(existingMutation, incomingMutation)
}
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.doOnSubscribe {
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
}
.doOnTerminate {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
.doFinally {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
}

private fun <T : Model> resolveConflict(
Expand All @@ -221,7 +280,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
// to get identically the thing that was saved. But we know the save succeeded.
// So, let's skip the unwrapping, and use the thing that was enqueued,
// the pendingMutation, directly.
LOG.info("Successfully enqueued $pendingMutation")
LOG.info { "Successfully enqueued $pendingMutation" }
if (addingNewMutation) {
numMutationsInOutbox += 1
}
Expand All @@ -236,9 +295,29 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
}

override fun remove(pendingMutationId: TimeBasedUuid): Completable = removeNotLocking(pendingMutationId)
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
override fun remove(pendingMutationId: TimeBasedUuid): Completable {
val methodName = "remove"
val semaphoreReleased = AtomicBoolean(false)

return removeNotLocking(pendingMutationId)
.doOnSubscribe {
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
}
.doOnTerminate {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
.doFinally {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
}

private fun removeNotLocking(pendingMutationId: TimeBasedUuid): Completable {
Objects.requireNonNull(pendingMutationId)
Expand All @@ -256,7 +335,7 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
QueryPredicates.all(),
{
inFlightMutations.remove(pendingMutationId)
LOG.info("Successfully removed from mutations outbox$pendingMutation")
LOG.info { "Successfully removed from mutations outbox$pendingMutation" }
numMutationsInOutbox -= 1
val contentAvailable = numMutationsInOutbox > 0
if (contentAvailable) {
Expand All @@ -275,6 +354,9 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}

override fun load(): Completable {
val methodName = "load"
val semaphoreReleased = AtomicBoolean(false)

return Completable.create { emitter: CompletableEmitter ->
inFlightMutations.clear()
var queryOptions = Where.matchesAll()
Expand Down Expand Up @@ -320,8 +402,23 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
}
)
}
.doOnSubscribe { semaphore.acquire() }
.doOnTerminate { semaphore.release() }
.doOnSubscribe {
LOG.debug { "[$methodName] Acquiring outbox semaphore (permits: ${semaphore.availablePermits()})" }
semaphore.acquire()
LOG.debug { "[$methodName] Acquired outbox semaphore (permits: ${semaphore.availablePermits()})" }
}
.doOnTerminate {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in onTerminate (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
.doFinally {
if (semaphoreReleased.compareAndSet(false, true)) {
LOG.debug { "[$methodName] Releasing outbox semaphore in doFinally (permits: ${semaphore.availablePermits()})" }
semaphore.release()
}
}
}

override fun events(): Observable<OutboxEvent> = events
Expand Down Expand Up @@ -390,11 +487,11 @@ internal class PersistentMutationOutbox(private val storage: LocalStorageAdapter
* @return A completable with the actions to resolve the conflict.
*/
fun resolve(): Completable {
LOG.debug(
LOG.debug {
"IncomingMutationConflict - " +
" existing " + existing.mutationType +
" incoming " + incoming.mutationType
)
" existing " + existing.mutationType +
" incoming " + incoming.mutationType
}
return when (incoming.mutationType) {
PendingMutation.Type.CREATE -> handleIncomingCreate()
PendingMutation.Type.UPDATE -> handleIncomingUpdate()
Expand Down
Loading