Skip to content

Commit d7d6604

Browse files
committed
Remediate some issues to successfully pass bazel tests
1 parent 1d5eafa commit d7d6604

File tree

5 files changed

+32
-97
lines changed

5 files changed

+32
-97
lines changed

src/main/java/build/buildfarm/common/redis/RedisClient.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
import java.util.concurrent.atomic.AtomicReference;
2626
import java.util.function.Consumer;
2727
import redis.clients.jedis.UnifiedJedis;
28+
29+
import java.util.function.Supplier;
30+
import java.util.logging.Level;
31+
import lombok.extern.java.Log;
2832
import redis.clients.jedis.exceptions.JedisClusterOperationException;
2933
import redis.clients.jedis.exceptions.JedisConnectionException;
3034
import redis.clients.jedis.exceptions.JedisDataException;
@@ -76,14 +80,31 @@ public JedisMisconfigurationException(final String message, final Throwable caus
7680
}
7781
}
7882

79-
private final UnifiedJedis jedis;
83+
// We store the factory in case we want to re-create the jedis client.
84+
private Supplier<UnifiedJedis> unifiedJedisFactory;
85+
86+
private UnifiedJedis jedis;
8087

8188
private boolean closed = false;
8289

8390
public RedisClient(UnifiedJedis jedis) {
8491
this.jedis = jedis;
8592
}
8693

94+
public RedisClient(
95+
Supplier<UnifiedJedis> unifiedJedisFactory,
96+
int reconnectClientAttempts,
97+
int reconnectClientWaitDurationMs) {
98+
try {
99+
this.jedis = unifiedJedisFactory.get();
100+
} catch (Exception e) {
101+
log.log(Level.SEVERE, "Unable to establish redis client: " + e.toString());
102+
}
103+
this.unifiedJedisFactory = unifiedJedisFactory;
104+
this.reconnectClientAttempts = reconnectClientAttempts;
105+
this.reconnectClientWaitDurationMs = reconnectClientWaitDurationMs;
106+
}
107+
87108
@Override
88109
public synchronized void close() {
89110
closed = true;
@@ -176,7 +197,7 @@ private <T> T callImpl(JedisContext<T> withJedis) throws IOException {
176197
private void rebuildJedisCluser() {
177198
try {
178199
log.log(Level.SEVERE, "Rebuilding redis client");
179-
jedis = jedisClusterFactory.get();
200+
jedis = unifiedJedisFactory.get();
180201
} catch (Exception e) {
181202
redisClientRebuildErrorCounter.inc();
182203
log.log(Level.SEVERE, "Failed to rebuild redis client");

src/main/java/build/buildfarm/instance/shard/ServerInstance.java

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ public class ServerInstance extends NodeInstance {
247247
private Cache<RequestMetadata, Boolean> recentCacheServedExecutions;
248248

249249
private final Random rand = new Random();
250-
private final Writes writes;
250+
private final Writes writes = new Writes(this::writeInstanceSupplier);
251251
private final int maxCpu;
252252
private final int maxRequeueAttempts;
253253

@@ -382,7 +382,6 @@ public ServerInstance(
382382
this.actionCacheFetchService = actionCacheFetchService;
383383
backplane.setOnUnsubscribe(this::stop);
384384

385-
this.writes = new Writes(writeInstanceCacheLoader());
386385
initializeCaches();
387386

388387
remoteInputStreamFactory =
@@ -1248,35 +1247,9 @@ public void onSuccess(List<String> workersList) {
12481247
protected abstract void onQueue(Deque<String> workers);
12491248
}
12501249

1251-
private CacheLoader<BlobWriteKey, Instance> writeInstanceCacheLoader() {
1252-
return new CacheLoader<BlobWriteKey, Instance>() {
1253-
@SuppressWarnings("NullableProblems")
1254-
@Override
1255-
public Instance load(BlobWriteKey key) {
1256-
String instance = null;
1257-
// Per the REAPI the identifier should end up as a unique UUID per a
1258-
// client level - adding bytes to further mitigate collisions and not
1259-
// store the entire BlobWriteKey.
1260-
String blobKey = key.getIdentifier() + "." + key.getDigest().getSizeBytes();
1261-
try {
1262-
instance = backplane.getWriteInstance(blobKey);
1263-
if (instance != null) {
1264-
return workerStub(instance);
1265-
}
1266-
} catch (IOException e) {
1267-
log.log(Level.WARNING, "error getting write instance for " + instance, e);
1268-
}
1269-
1270-
instance = getRandomWorker();
1271-
try {
1272-
backplane.setWriteInstance(blobKey, instance);
1273-
log.log(Level.INFO, "set write-instance: " + blobKey + " -> " + instance); // TODO: [jmarino]: remove
1274-
} catch (IOException e) {
1275-
log.log(Level.WARNING, "error getting write instance for " + instance, e);
1276-
}
1277-
return workerStub(instance);
1278-
}
1279-
};
1250+
private Instance writeInstanceSupplier() {
1251+
String worker = getRandomWorker();
1252+
return workerStub(worker);
12801253
}
12811254

12821255
String getRandomWorker() {

src/main/java/build/buildfarm/worker/Pipeline.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
package build.buildfarm.worker;
1616

17-
import com.google.common.util.concurrent.SettableFuture;
1817
import java.util.ArrayList;
1918
import java.util.HashMap;
2019
import java.util.List;
@@ -25,7 +24,6 @@
2524
@Log
2625
public class Pipeline {
2726
private final Map<PipelineStage, Thread> stageThreads;
28-
private final PipelineStageThreadGroup stageThreadGroup;
2927
private final Map<PipelineStage, Integer> stageClosePriorities;
3028
private Thread joiningThread = null;
3129
private boolean closing = false;
@@ -35,25 +33,17 @@ public class Pipeline {
3533
public Pipeline() {
3634
stageThreads = new HashMap<>();
3735
stageClosePriorities = new HashMap<>();
38-
stageThreadGroup = new PipelineStageThreadGroup();
3936
}
4037

4138
public void add(PipelineStage stage, int closePriority) {
42-
stageThreads.put(stage, new Thread(stageThreadGroup, stage, stage.name()));
39+
stageThreads.put(stage, new Thread(stage));
4340
if (closePriority < 0) {
4441
throw new IllegalArgumentException("closePriority cannot be negative");
4542
}
4643
stageClosePriorities.put(stage, closePriority);
4744
}
4845

49-
/**
50-
* Start the pipeline.
51-
*
52-
* <p>You can provide callback which is invoked when any stage has an uncaught exception, for
53-
* instance to shutdown the worker gracefully
54-
*/
55-
public void start(SettableFuture<Void> uncaughtExceptionFuture) {
56-
stageThreadGroup.setUncaughtExceptionFuture(uncaughtExceptionFuture);
46+
public void start() {
5747
for (Thread stageThread : stageThreads.values()) {
5848
stageThread.start();
5949
}

src/main/java/build/buildfarm/worker/shard/Worker.java

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,6 @@
9797
import javax.annotation.Nullable;
9898
import javax.naming.ConfigurationException;
9999
import lombok.extern.java.Log;
100-
import org.springframework.beans.factory.annotation.Autowired;
101-
import org.springframework.boot.SpringApplication;
102-
import org.springframework.boot.autoconfigure.SpringBootApplication;
103-
import org.springframework.context.ApplicationContext;
104-
import org.springframework.context.annotation.ComponentScan;
105100

106101
@Log
107102
public final class Worker extends LoggingMain {
@@ -146,7 +141,6 @@ public final class Worker extends LoggingMain {
146141
private LoadingCache<String, Instance> workerStubs;
147142
private AtomicBoolean released = new AtomicBoolean(true);
148143

149-
@Autowired private ApplicationContext springContext;
150144
/**
151145
* The method will prepare the worker for graceful shutdown when the worker is ready. Note on
152146
* using stderr here instead of log. By the time this is called in PreDestroy, the log is no
@@ -198,43 +192,6 @@ private Worker() {
198192
super("BuildFarmShardWorker");
199193
}
200194

201-
private void exitPostPipelineFailure() {
202-
// Shutdown the worker if a pipeline fails. By means of the spring lifecycle
203-
// hooks - e.g. the `PreDestroy` hook here - it will attempt to gracefully
204-
// spin down the pipeline
205-
206-
// By calling these spring shutdown facilities; we're open to the risk that
207-
// a subsystem may be hanging a criticial thread indeffinitly. Deadline the
208-
// shutdown workflow to ensure we don't leave a zombie worker in this
209-
// situation
210-
ScheduledExecutorService shutdownDeadlineExecutor = newSingleThreadScheduledExecutor();
211-
212-
// This may be shorter than the action timeout; assume we have interrupted
213-
// actions in a fatal uncaught exception.
214-
int forceShutdownDeadline = 60;
215-
ScheduledFuture<?> termFuture =
216-
shutdownDeadlineExecutor.schedule(
217-
new Runnable() {
218-
public void run() {
219-
log.log(
220-
Level.SEVERE,
221-
String.format(
222-
"Force terminating due to shutdown deadline exceeded (%d seconds)",
223-
forceShutdownDeadline));
224-
System.exit(1);
225-
}
226-
},
227-
forceShutdownDeadline,
228-
SECONDS);
229-
230-
// Consider defining exit codes to better afford out of band instance
231-
// recovery
232-
int code = SpringApplication.exit(springContext, () -> 1);
233-
termFuture.cancel(false);
234-
shutdownDeadlineExecutor.shutdown();
235-
System.exit(code);
236-
}
237-
238195
private Operation stripOperation(Operation operation) {
239196
return instance.stripOperation(operation);
240197
}
@@ -678,13 +635,7 @@ public void start() throws ConfigurationException, InterruptedException, IOExcep
678635
PrometheusPublisher.startHttpServer(configs.getPrometheusPort());
679636
startFailsafeRegistration();
680637

681-
// Listen for pipeline unhandled exceptions
682-
ExecutorService pipelineExceptionExecutor = newSingleThreadExecutor();
683-
SettableFuture<Void> pipelineExceptionFuture = SettableFuture.create();
684-
pipelineExceptionFuture.addListener(this::exitPostPipelineFailure, pipelineExceptionExecutor);
685-
686-
pipeline.start(pipelineExceptionFuture);
687-
638+
pipeline.start();
688639
healthCheckMetric.labels("start").inc();
689640
executionSlotsTotal.set(configs.getWorker().getExecuteStageWidth());
690641
inputFetchSlotsTotal.set(configs.getWorker().getInputFetchStageWidth());

src/test/java/build/buildfarm/worker/PipelineTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void stageThreadReturnCompletesJoin() throws InterruptedException {
5858
public void run() {}
5959
},
6060
1);
61-
pipeline.start(null);
61+
pipeline.start();
6262
pipeline.join();
6363
}
6464

@@ -73,7 +73,7 @@ public void run() {
7373
}
7474
},
7575
1);
76-
pipeline.start(null);
76+
pipeline.start();
7777
pipeline.join();
7878
}
7979

0 commit comments

Comments
 (0)