Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: delete CloseableMonitor #1353

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-pubsublite:1.11.0'
implementation 'com.google.cloud:google-cloud-pubsublite:1.11.1'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.11.0"
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.11.1"
```

## Authentication
Expand Down
5 changes: 5 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
</differences>

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import java.util.List;
import java.util.concurrent.Future;

public final class MoreApiFutures {
private MoreApiFutures() {}
Expand All @@ -42,4 +47,18 @@ public void onSuccess(T t) {
},
SystemExecutors.getFuturesExecutor());
}

public static ApiFuture<Void> whenFirstDone(List<ApiFuture<?>> futures) {
SettableApiFuture<Void> anyDone = SettableApiFuture.create();
futures.forEach(f -> f.addListener(() -> anyDone.set(null), directExecutor()));
return anyDone;
}

public static <T> T get(Future<T> f) {
try {
return f.get();
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsublite.internal;

import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.Monitor.Guard;
import java.util.ArrayDeque;
import java.util.Queue;
Expand All @@ -26,22 +27,22 @@
public final class SerialExecutor implements AutoCloseable, Executor {
private final Executor executor;

private final CloseableMonitor monitor = new CloseableMonitor();
private final Monitor monitor = new Monitor();
private final Guard isInactive =
new Guard(monitor.monitor) {
new Guard(monitor) {
@Override
public boolean isSatisfied() {
return !isTaskActive;
}
};

@GuardedBy("monitor.monitor")
@GuardedBy("monitor")
private final Queue<Runnable> tasks;

@GuardedBy("monitor.monitor")
@GuardedBy("monitor")
private boolean isTaskActive;

@GuardedBy("monitor.monitor")
@GuardedBy("monitor")
private boolean isShutdown;

public SerialExecutor(Executor executor) {
Expand All @@ -53,7 +54,7 @@ public SerialExecutor(Executor executor) {

/** Waits until there are no active tasks. */
public void waitUntilInactive() {
try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isInactive)) {}
monitor.enterWhenUninterruptibly(isInactive);
}

/**
Expand All @@ -62,14 +63,18 @@ public void waitUntilInactive() {
*/
@Override
public void close() {
try (CloseableMonitor.Hold h = monitor.enter()) {
monitor.enter();
try {
isShutdown = true;
} finally {
monitor.leave();
}
}

@Override
public void execute(Runnable r) {
try (CloseableMonitor.Hold h = monitor.enter()) {
monitor.enter();
try {
if (isShutdown) {
return;
}
Expand All @@ -86,21 +91,29 @@ public void execute(Runnable r) {
if (!isTaskActive) {
scheduleNextTask();
}
} finally {
monitor.leave();
}
}

private boolean shouldExecuteTask() {
try (CloseableMonitor.Hold h = monitor.enter()) {
monitor.enter();
try {
return !isShutdown;
} finally {
monitor.leave();
}
}

private void scheduleNextTask() {
try (CloseableMonitor.Hold h = monitor.enter()) {
monitor.enter();
try {
isTaskActive = !tasks.isEmpty() && !isShutdown;
if (isTaskActive) {
executor.execute(tasks.poll());
}
} finally {
monitor.leave();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignment;
Expand All @@ -41,12 +40,10 @@ public class AssignerImpl extends ProxyService
private final PartitionAssignmentRequest initialRequest;
private final String uuidHex;

private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
@GuardedBy("this")
private final RetryingConnection<PartitionAssignmentRequest, ConnectedAssigner> connection;

@GuardedBy("monitor.monitor")
@GuardedBy("this")
private final PartitionAssignmentReceiver receiver;

@VisibleForTesting
Expand Down Expand Up @@ -80,10 +77,8 @@ public AssignerImpl(
}

@Override
public void triggerReinitialize(CheckedApiException streamError) {
try (CloseableMonitor.Hold h = monitor.enter()) {
connection.reinitialize(initialRequest);
}
public synchronized void triggerReinitialize(CheckedApiException streamError) {
connection.reinitialize(initialRequest);
}

private static Set<Partition> toSet(PartitionAssignment assignment) throws ApiException {
Expand All @@ -95,13 +90,11 @@ private static Set<Partition> toSet(PartitionAssignment assignment) throws ApiEx
}

@Override
public void onClientResponse(PartitionAssignment value) throws CheckedApiException {
try (CloseableMonitor.Hold h = monitor.enter()) {
Set<Partition> partitions = toSet(value);
logger.atFine().log("Subscriber with uuid %s received assignment: %s", uuidHex, partitions);
receiver.handleAssignment(partitions);
logger.atInfo().log("Subscriber with uuid %s handled assignment: %s", uuidHex, partitions);
connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack));
}
public synchronized void onClientResponse(PartitionAssignment value) throws CheckedApiException {
Set<Partition> partitions = toSet(value);
logger.atFine().log("Subscriber with uuid %s received assignment: %s", uuidHex, partitions);
receiver.handleAssignment(partitions);
logger.atInfo().log("Subscriber with uuid %s handled assignment: %s", uuidHex, partitions);
connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack));
}
}
Loading