Skip to content

Commit 2d22dd2

Browse files
committedJun 13, 2024··
feat: optional dependent
1 parent 705c9b1 commit 2d22dd2

File tree

17 files changed

+433
-10
lines changed

17 files changed

+433
-10
lines changed
 

‎operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ private static List<DependentResourceSpec> dependentResources(
224224
Utils.instantiate(dependent.reconcilePrecondition(), Condition.class, context),
225225
Utils.instantiate(dependent.deletePostcondition(), Condition.class, context),
226226
Utils.instantiate(dependent.activationCondition(), Condition.class, context),
227-
eventSourceName);
227+
eventSourceName, dependent.optional());
228+
228229
specsMap.put(dependentName, spec);
229230
}
230231
return specsMap.values().stream().toList();

‎operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/dependent/DependentResourceSpec.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ public class DependentResourceSpec<R, P extends HasMetadata> {
2626

2727
private final String useEventSourceWithName;
2828

29+
private final boolean optional;
30+
2931
public DependentResourceSpec(Class<? extends DependentResource<R, P>> dependentResourceClass,
3032
String name, Set<String> dependsOn, Condition<?, ?> readyCondition,
3133
Condition<?, ?> reconcileCondition, Condition<?, ?> deletePostCondition,
32-
Condition<?, ?> activationCondition, String useEventSourceWithName) {
34+
Condition<?, ?> activationCondition, String useEventSourceWithName, boolean optional) {
3335
this.dependentResourceClass = dependentResourceClass;
3436
this.name = name;
3537
this.dependsOn = dependsOn;
@@ -38,6 +40,13 @@ public DependentResourceSpec(Class<? extends DependentResource<R, P>> dependentR
3840
this.deletePostCondition = deletePostCondition;
3941
this.activationCondition = activationCondition;
4042
this.useEventSourceWithName = useEventSourceWithName;
43+
this.optional = optional;
44+
45+
if (this.optional && activationCondition != null) {
46+
throw new IllegalArgumentException(
47+
"Dependent resource cannot be both optional and contain activation condition. Dependent resource name: "
48+
+ name + " class: " + dependentResourceClass);
49+
}
4150
}
4251

4352
public Class<? extends DependentResource<R, P>> getDependentResourceClass() {
@@ -98,4 +107,8 @@ public Condition getActivationCondition() {
98107
public Optional<String> getUseEventSourceWithName() {
99108
return Optional.ofNullable(useEventSourceWithName);
100109
}
110+
111+
public boolean isOptional() {
112+
return optional;
113+
}
101114
}

‎operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Dependent.java

+2
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,6 @@
8686
* @return event source name (if any) provided by the dependent resource should be used.
8787
*/
8888
String useEventSourceWithName() default NO_VALUE_SET;
89+
90+
boolean optional() default false;
8991
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import io.fabric8.kubernetes.api.model.HasMetadata;
10+
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition;
11+
import io.javaoperatorsdk.operator.api.reconciler.Context;
12+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
13+
import io.javaoperatorsdk.operator.processing.expiration.Expiration;
14+
import io.javaoperatorsdk.operator.processing.expiration.ExpirationExecution;
15+
import io.javaoperatorsdk.operator.processing.expiration.RetryExpiration;
16+
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
17+
import io.javaoperatorsdk.operator.processing.retry.Retry;
18+
19+
public class CRDPresentActivationCondition implements Condition<HasMetadata, HasMetadata> {
20+
21+
private static final Logger log = LoggerFactory.getLogger(CRDPresentActivationCondition.class);
22+
23+
public static final int DEFAULT_EXPIRATION_INITIAL_INTERVAL = 1000;
24+
public static final int DEFAULT_EXPIRATION_INTERVAL_MULTIPLIER = 4;
25+
public static final int DEFAULT_EXPIRATION_MAX_RETRY_ATTEMPTS = 10;
26+
27+
/**
28+
* The idea behind default expiration is that on cluster start there might be different phases
29+
* when CRDs and controllers are added. For a few times it will be checked if the target CRD is
30+
* not present, after it will just use the cached state.
31+
*/
32+
public static Retry DEFAULT_EXPIRATION_RETRY =
33+
new GenericRetry().setInitialInterval(DEFAULT_EXPIRATION_INITIAL_INTERVAL)
34+
.setIntervalMultiplier(DEFAULT_EXPIRATION_INTERVAL_MULTIPLIER)
35+
.setMaxAttempts(DEFAULT_EXPIRATION_MAX_RETRY_ATTEMPTS);
36+
37+
private final Map<String, CRDCheckState> crdPresenceCache = new ConcurrentHashMap<>();
38+
39+
private final Expiration expiration;
40+
41+
public CRDPresentActivationCondition() {
42+
this(new RetryExpiration(DEFAULT_EXPIRATION_RETRY));
43+
}
44+
45+
public CRDPresentActivationCondition(Expiration expiration) {
46+
this.expiration = expiration;
47+
}
48+
49+
@Override
50+
public boolean isMet(DependentResource<HasMetadata, HasMetadata> dependentResource,
51+
HasMetadata primary, Context<HasMetadata> context) {
52+
53+
var resourceClass = dependentResource.resourceType();
54+
final var crdName = HasMetadata.getFullResourceName(resourceClass);
55+
56+
var crdCheckState = crdPresenceCache.computeIfAbsent(crdName,
57+
g -> new CRDCheckState(expiration.initExecution()));
58+
// in case of parallel execution it is only refreshed once
59+
synchronized (crdCheckState) {
60+
if (crdCheckState.isExpired()) {
61+
log.debug("Refreshing cache for resource: {}", crdName);
62+
final var found = context.getClient().resources(CustomResourceDefinition.class)
63+
.withName(crdName).get() != null;
64+
crdCheckState.refresh(found);
65+
}
66+
}
67+
return crdPresenceCache.get(crdName).isCrdPresent();
68+
}
69+
70+
static class CRDCheckState {
71+
private final ExpirationExecution expirationExecution;
72+
private boolean crdPresent;
73+
74+
public CRDCheckState(ExpirationExecution expirationExecution) {
75+
this.expirationExecution = expirationExecution;
76+
}
77+
78+
void refresh(boolean found) {
79+
crdPresent = found;
80+
expirationExecution.refreshed();
81+
}
82+
83+
boolean isExpired() {
84+
return expirationExecution.isExpired();
85+
}
86+
87+
public boolean isCrdPresent() {
88+
return crdPresent;
89+
}
90+
}
91+
}

‎operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultManagedWorkflow.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,19 @@ public boolean isEmpty() {
7676
public Workflow<P> resolve(KubernetesClient client,
7777
ControllerConfiguration<P> configuration) {
7878
final var alreadyResolved = new HashMap<String, DependentResourceNode>(orderedSpecs.size());
79+
80+
// sharing the activation condition so no parallel requests are done for the same CRD
81+
CRDPresentActivationCondition crdPresentActivationCondition =
82+
new CRDPresentActivationCondition();
83+
7984
for (DependentResourceSpec spec : orderedSpecs) {
8085
final var dependentResource = resolve(spec, client, configuration);
8186
final var node = new DependentResourceNode(
8287
spec.getReconcileCondition(),
8388
spec.getDeletePostCondition(),
8489
spec.getReadyCondition(),
85-
spec.getActivationCondition(),
86-
dependentResource);
90+
spec.isOptional() ? crdPresentActivationCondition : spec.getActivationCondition(),
91+
resolve(spec, client, configuration));
8792
alreadyResolved.put(dependentResource.name(), node);
8893
spec.getDependsOn()
8994
.forEach(depend -> node.addDependsOnRelation(alreadyResolved.get(depend)));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.javaoperatorsdk.operator.processing.expiration;
2+
3+
4+
public interface Expiration {
5+
6+
ExpirationExecution initExecution();
7+
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.javaoperatorsdk.operator.processing.expiration;
2+
3+
public interface ExpirationExecution {
4+
5+
boolean isExpired();
6+
7+
void refreshed();
8+
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.javaoperatorsdk.operator.processing.expiration;
2+
3+
import io.javaoperatorsdk.operator.processing.retry.Retry;
4+
5+
public class RetryExpiration implements Expiration {
6+
7+
private final Retry retry;
8+
9+
public RetryExpiration(Retry retry) {
10+
this.retry = retry;
11+
}
12+
13+
@Override
14+
public ExpirationExecution initExecution() {
15+
return new RetryExpirationExecution(retry.initExecution());
16+
}
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.javaoperatorsdk.operator.processing.expiration;
2+
3+
import java.time.LocalDateTime;
4+
import java.time.temporal.ChronoUnit;
5+
import java.util.Objects;
6+
7+
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
8+
9+
public class RetryExpirationExecution implements ExpirationExecution {
10+
11+
public static final long NO_MORE_EXPIRATION = -1L;
12+
13+
private LocalDateTime lastRefreshTime;
14+
private long delayUntilExpiration;
15+
private final RetryExecution retryExecution;
16+
17+
public RetryExpirationExecution(RetryExecution retryExecution) {
18+
this.retryExecution = retryExecution;
19+
}
20+
21+
@Override
22+
public boolean isExpired() {
23+
if (lastRefreshTime == null) {
24+
return true;
25+
}
26+
if (Objects.equals(delayUntilExpiration, NO_MORE_EXPIRATION)) {
27+
return false;
28+
} else {
29+
return LocalDateTime.now()
30+
.isAfter(lastRefreshTime.plus(delayUntilExpiration, ChronoUnit.MILLIS));
31+
}
32+
}
33+
34+
@Override
35+
public void refreshed() {
36+
lastRefreshTime = LocalDateTime.now();
37+
delayUntilExpiration = retryExecution.nextDelay().orElse(NO_MORE_EXPIRATION);
38+
}
39+
}

‎operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowTestUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class ManagedWorkflowTestUtils {
2121
@SuppressWarnings("unchecked")
2222
public static DependentResourceSpec createDRS(String name, String... dependOns) {
2323
return new DependentResourceSpec(EmptyTestDependentResource.class, name, Set.of(dependOns),
24-
null, null, null, null, null);
24+
null, null, null, null, null, false);
2525
}
2626

2727
public static DependentResourceSpec createDRSWithTraits(String name,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.javaoperatorsdk.operator.processing.expiration;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
6+
7+
import static org.assertj.core.api.Assertions.assertThat;
8+
9+
public class RetryExpirationTestExecution {
10+
11+
public static final int INITIAL_INTERVAL = 25;
12+
public static final int INITIAL_INTERVAL_PLUS_SLACK = INITIAL_INTERVAL + 10;
13+
14+
RetryExpirationExecution expiration = new RetryExpirationExecution(new GenericRetry()
15+
.setInitialInterval(INITIAL_INTERVAL)
16+
.setMaxAttempts(2)
17+
.initExecution());
18+
19+
@Test
20+
public void byDefaultExpired() {
21+
assertThat(expiration.isExpired()).isTrue();
22+
}
23+
24+
@Test
25+
public void expiresAfterTime() throws InterruptedException {
26+
expiration.refreshed();
27+
assertThat(expiration.isExpired()).isFalse();
28+
29+
Thread.sleep(INITIAL_INTERVAL_PLUS_SLACK);
30+
assertThat(expiration.isExpired()).isTrue();
31+
}
32+
33+
@Test
34+
public void refreshResetsExpiration() throws InterruptedException {
35+
expiration.refreshed();
36+
Thread.sleep(INITIAL_INTERVAL_PLUS_SLACK);
37+
assertThat(expiration.isExpired()).isTrue();
38+
39+
expiration.refreshed();
40+
41+
assertThat(expiration.isExpired()).isFalse();
42+
}
43+
44+
@Test
45+
public void notExpiresAfterMaxAttempt() throws InterruptedException {
46+
expiration.refreshed();
47+
expiration.refreshed();
48+
expiration.refreshed();
49+
50+
Thread.sleep(INITIAL_INTERVAL_PLUS_SLACK);
51+
52+
assertThat(expiration.isExpired()).isFalse();
53+
}
54+
55+
}

‎operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java

+31-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.javaoperatorsdk.operator.junit;
22

33
import java.io.ByteArrayInputStream;
4+
import java.io.IOException;
45
import java.io.InputStream;
56
import java.nio.charset.StandardCharsets;
67
import java.time.Duration;
@@ -19,9 +20,11 @@
1920

2021
import io.fabric8.kubernetes.api.model.HasMetadata;
2122
import io.fabric8.kubernetes.api.model.Namespaced;
23+
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition;
2224
import io.fabric8.kubernetes.client.CustomResource;
2325
import io.fabric8.kubernetes.client.KubernetesClient;
2426
import io.fabric8.kubernetes.client.LocalPortForward;
27+
import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
2528
import io.javaoperatorsdk.operator.Operator;
2629
import io.javaoperatorsdk.operator.ReconcilerUtils;
2730
import io.javaoperatorsdk.operator.RegisteredController;
@@ -176,23 +179,46 @@ public static void applyCrd(Class<? extends HasMetadata> resourceClass, Kubernet
176179
applyCrd(ReconcilerUtils.getResourceTypeName(resourceClass), client);
177180
}
178181

179-
public static void applyCrd(String resourceTypeName, KubernetesClient client) {
182+
public static void deleteCrd(Class<? extends HasMetadata> resourceClass,
183+
KubernetesClient client) {
184+
try {
185+
var crd = loadCRD(ReconcilerUtils.getResourceTypeName(resourceClass), client);
186+
client.resource(crd).delete();
187+
188+
Thread.sleep(CRD_READY_WAIT);
189+
} catch (InterruptedException e) {
190+
throw new RuntimeException(e);
191+
}
192+
}
193+
194+
195+
private static CustomResourceDefinition loadCRD(String resourceTypeName,
196+
KubernetesClient client) {
180197
String path = "/META-INF/fabric8/" + resourceTypeName + "-v1.yml";
181198
try (InputStream is = LocallyRunOperatorExtension.class.getResourceAsStream(path)) {
182199
if (is == null) {
183200
throw new IllegalStateException("Cannot find CRD at " + path);
184201
}
185202
var crdString = new String(is.readAllBytes(), StandardCharsets.UTF_8);
186203
LOGGER.debug("Applying CRD: {}", crdString);
187-
final var crd = client.load(new ByteArrayInputStream(crdString.getBytes()));
188-
crd.createOrReplace();
204+
final var resources = client.load(new ByteArrayInputStream(crdString.getBytes()));
205+
return (CustomResourceDefinition) resources.items().get(0);
206+
} catch (IOException e) {
207+
throw new IllegalStateException(e);
208+
}
209+
}
210+
211+
public static void applyCrd(String resourceTypeName, KubernetesClient client) {
212+
try {
213+
var crd = loadCRD(resourceTypeName, client);
214+
client.resource(crd).createOr(NonDeletingOperation::update);
189215
Thread.sleep(CRD_READY_WAIT); // readiness is not applicable for CRD, just wait a little
190-
LOGGER.debug("Applied CRD with path: {}", path);
216+
LOGGER.debug("Applied CRD for type {}", resourceTypeName);
191217
} catch (InterruptedException ex) {
192218
LOGGER.error("Interrupted.", ex);
193219
Thread.currentThread().interrupt();
194220
} catch (Exception ex) {
195-
throw new IllegalStateException("Cannot apply CRD yaml: " + path, ex);
221+
throw new IllegalStateException("Cannot apply CRD for type: " + resourceTypeName, ex);
196222
}
197223
}
198224

0 commit comments

Comments
 (0)