Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run health checks on the I/O pool. #1140

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class JvbDoctor
/**
* Health check tasks map.
*/
private final Map<Bridge, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
private final Map<Bridge, PeriodicHealthCheckTask> tasks = new ConcurrentHashMap<>();

private final HealthCheckListener listener;

Expand Down Expand Up @@ -95,7 +95,7 @@ synchronized public void shutdown()
@Override
public void bridgeRemoved(Bridge bridge)
{
ScheduledFuture<?> healthTask = tasks.remove(bridge);
PeriodicHealthCheckTask healthTask = tasks.remove(bridge);
if (healthTask == null)
{
logger.warn("Trying to remove a bridge that does not exist anymore: " + bridge);
Expand All @@ -104,7 +104,7 @@ public void bridgeRemoved(Bridge bridge)

logger.info("Stopping health-check task for: " + bridge);

healthTask.cancel(true);
healthTask.cancel();
}

@Override
Expand All @@ -120,14 +120,10 @@ public void bridgeAdded(Bridge bridge)
? new HealthCheckPresenceTask(bridge)
: new HealthCheckTask(bridge);

ScheduledFuture<?> healthTask
= TaskPools.getScheduledPool().scheduleAtFixedRate(
task,
healthCheckInterval,
healthCheckInterval,
TimeUnit.MILLISECONDS);
PeriodicHealthCheckTask periodicTask
= new PeriodicHealthCheckTask(task, healthCheckInterval);

tasks.put(bridge, healthTask);
tasks.put(bridge, periodicTask);

logger.info("Scheduled health-check task for: " + bridge);
}
Expand All @@ -137,6 +133,41 @@ public void bridgeIsShuttingDown(@NotNull Bridge bridge)
{
}

private static class PeriodicHealthCheckTask
{
private Runnable innerTask;

private final ScheduledFuture<?> future;
private Future<?> innerFuture;
private final Object lock = new Object();

private PeriodicHealthCheckTask(Runnable task, long healthCheckInterval)
{
innerTask = task;
future = TaskPools.getScheduledPool().scheduleAtFixedRate(
() -> innerFuture = TaskPools.getIoPool().submit(runInner),
healthCheckInterval,
healthCheckInterval,
TimeUnit.MILLISECONDS);
}

private final Runnable runInner = () -> {
synchronized (lock)
{
innerTask.run();
}
};

private void cancel()
{
future.cancel(true);
if (innerFuture != null)
{
innerFuture.cancel(true);
}
}
}

private class HealthCheckTask extends AbstractHealthCheckTask
{
private HealthCheckTask(Bridge bridge)
Expand Down
Loading