diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java index 017e6d35ee..63272a4e67 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/OIDCDiscoveryConfigListener.java @@ -20,12 +20,12 @@ import io.vertx.core.Vertx; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,24 +36,27 @@ public class OIDCDiscoveryConfigListener implements AutoCloseable { private final Vertx vertx; private final FileWatcher configFeaturesWatcher; private final int timeoutSeconds; - private List> callbacks; - private OIDCDiscoveryConfig oidcDiscoveryConfig; + private final ConcurrentHashMap> callbacks; + private final AtomicReference oidcDiscoveryConfig; + private final AtomicInteger callbackIdGenerator; public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int timeoutSeconds) throws IOException { this.featuresConfigPath = featuresConfigPath; this.vertx = vertx; this.timeoutSeconds = timeoutSeconds; + this.oidcDiscoveryConfig = new AtomicReference<>(); + this.callbacks = new ConcurrentHashMap<>(); + this.callbackIdGenerator = new AtomicInteger(0); this.buildFeaturesAndOIDCDiscoveryConfig(); this.configFeaturesWatcher = new FileWatcher(new File(featuresConfigPath + "/" + FeaturesConfig.KEY_AUTHENTICATION_OIDC), () -> { - if (this.oidcDiscoveryConfig == null) { + if (this.oidcDiscoveryConfig.get() == null) { this.buildFeaturesAndOIDCDiscoveryConfig(); - if (this.oidcDiscoveryConfig != null && this.callbacks != null) { - this.callbacks.stream() - .filter(Objects::nonNull) - .forEach(c -> c.accept(this.oidcDiscoveryConfig)); + OIDCDiscoveryConfig config = this.oidcDiscoveryConfig.get(); + if (config != null) { + this.callbacks.values().forEach(callback -> callback.accept(config)); } } }); @@ -62,27 +65,25 @@ public OIDCDiscoveryConfigListener(String featuresConfigPath, Vertx vertx, int t } public OIDCDiscoveryConfig getOidcDiscoveryConfig() { - return oidcDiscoveryConfig; + return oidcDiscoveryConfig.get(); } public int registerCallback(Consumer callback) { - if (this.callbacks == null) { - this.callbacks = new ArrayList<>(); - } - - this.callbacks.add(callback); - return this.callbacks.size() - 1; + int id = callbackIdGenerator.incrementAndGet(); + this.callbacks.put(id, callback); + return id; } public void deregisterCallback(int callbackId) { - this.callbacks.set(callbackId, null); + this.callbacks.remove(callbackId); } private void buildOIDCDiscoveryConfig() throws ExecutionException, InterruptedException, TimeoutException { - this.oidcDiscoveryConfig = OIDCDiscoveryConfig.build(this.vertx) + OIDCDiscoveryConfig config = OIDCDiscoveryConfig.build(this.vertx) .toCompletionStage() .toCompletableFuture() .get(this.timeoutSeconds, TimeUnit.SECONDS); + this.oidcDiscoveryConfig.set(config); } private void buildFeaturesAndOIDCDiscoveryConfig() {