Skip to content

Commit 4ed4d5d

Browse files
ozangunalpcescoffier
authored andcommitted
Add health-enabled attribute to AMQP connector
Fixes #2990
1 parent f78f654 commit 4ed4d5d

File tree

3 files changed

+33
-12
lines changed

3 files changed

+33
-12
lines changed

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
@ConnectorAttribute(name = "client-options-name", direction = INCOMING_AND_OUTGOING, description = "The name of the AMQP Client Option bean used to customize the AMQP client configuration", type = "string", alias = "amqp-client-options-name")
5757
@ConnectorAttribute(name = "client-ssl-context-name", direction = INCOMING_AND_OUTGOING, description = "The name of an SSLContext bean to use for connecting to AMQP when SSL is used", type = "string", alias = "amqp-client-ssl-context-name", hiddenFromDocumentation = true)
5858
@ConnectorAttribute(name = "tracing-enabled", direction = INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", type = "boolean", defaultValue = "true")
59+
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
5960
@ConnectorAttribute(name = "health-timeout", direction = INCOMING_AND_OUTGOING, description = "The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed.", type = "int", defaultValue = "3")
6061
@ConnectorAttribute(name = "cloud-events", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Enables (default) or disables the Cloud Event support. If enabled on an _incoming_ channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an _outgoing_, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.", defaultValue = "true")
6162
@ConnectorAttribute(name = "capabilities", type = "string", direction = INCOMING_AND_OUTGOING, description = " A comma-separated list of capabilities proposed by the sender or receiver client.")
@@ -182,20 +183,24 @@ public List<AmqpClient> getClients() {
182183
public HealthReport getReadiness() {
183184
HealthReport.HealthReportBuilder builder = HealthReport.builder();
184185
for (Map.Entry<String, IncomingAmqpChannel> holder : incomingChannels.entrySet()) {
185-
try {
186-
builder.add(holder.getKey(), holder.getValue().isConnected().await()
187-
.atMost(Duration.ofSeconds(holder.getValue().getHealthTimeout())));
188-
} catch (Exception e) {
189-
builder.add(holder.getKey(), false, e.getMessage());
186+
if (holder.getValue().isHealthEnabled()) {
187+
try {
188+
builder.add(holder.getKey(), holder.getValue().isConnected().await()
189+
.atMost(Duration.ofSeconds(holder.getValue().getHealthTimeout())));
190+
} catch (Exception e) {
191+
builder.add(holder.getKey(), false, e.getMessage());
192+
}
190193
}
191194
}
192195

193196
for (Map.Entry<String, OutgoingAmqpChannel> sender : outgoingChannels.entrySet()) {
194-
try {
195-
builder.add(sender.getKey(), sender.getValue().isConnected().await()
196-
.atMost(Duration.ofSeconds(sender.getValue().getHealthTimeout())));
197-
} catch (Exception e) {
198-
builder.add(sender.getKey(), false, e.getMessage());
197+
if (sender.getValue().isHealthEnabled()) {
198+
try {
199+
builder.add(sender.getKey(), sender.getValue().isConnected().await()
200+
.atMost(Duration.ofSeconds(sender.getValue().getHealthTimeout())));
201+
} catch (Exception e) {
202+
builder.add(sender.getKey(), false, e.getMessage());
203+
}
199204
}
200205
}
201206

@@ -212,10 +217,14 @@ public HealthReport getReadiness() {
212217
public HealthReport getLiveness() {
213218
HealthReport.HealthReportBuilder builder = HealthReport.builder();
214219
for (Map.Entry<String, IncomingAmqpChannel> entry : incomingChannels.entrySet()) {
215-
builder.add(entry.getKey(), entry.getValue().isOpen());
220+
if (entry.getValue().isHealthEnabled()) {
221+
builder.add(entry.getKey(), entry.getValue().isOpen());
222+
}
216223
}
217224
for (Map.Entry<String, OutgoingAmqpChannel> entry : outgoingChannels.entrySet()) {
218-
builder.add(entry.getKey(), entry.getValue().isOpen());
225+
if (entry.getValue().isHealthEnabled()) {
226+
builder.add(entry.getKey(), entry.getValue().isOpen());
227+
}
219228
}
220229
return builder.build();
221230
}

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/IncomingAmqpChannel.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@ public class IncomingAmqpChannel {
3939
private final AtomicBoolean opened;
4040
private final BiConsumer<String, Throwable> reportFailure;
4141
private final ConnectionHolder holder;
42+
private final boolean healthEnabled;
4243

4344
public IncomingAmqpChannel(AmqpConnectorIncomingConfiguration ic, AmqpClient client, Vertx vertx,
4445
Instance<OpenTelemetry> openTelemetryInstance, BiConsumer<String, Throwable> reportFailure) {
4546
this.reportFailure = reportFailure;
4647
this.opened = new AtomicBoolean(false);
48+
this.healthEnabled = ic.getHealthEnabled();
4749

4850
String channel = ic.getChannel();
4951
String address = ic.getAddress().orElse(channel);
@@ -160,6 +162,10 @@ public long getHealthTimeout() {
160162
return holder.getHealthTimeout();
161163
}
162164

165+
public boolean isHealthEnabled() {
166+
return healthEnabled;
167+
}
168+
163169
public void close() {
164170
opened.set(false);
165171
}

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/OutgoingAmqpChannel.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ public class OutgoingAmqpChannel {
2626
private final AtomicBoolean opened;
2727
private final Flow.Subscriber<Message<?>> subscriber;
2828
private final AmqpCreditBasedSender processor;
29+
private final boolean healthEnabled;
2930

3031
public OutgoingAmqpChannel(AmqpConnectorOutgoingConfiguration oc, AmqpClient client, Vertx vertx,
3132
Instance<OpenTelemetry> openTelemetryInstance, BiConsumer<String, Throwable> reportFailure) {
3233
String configuredAddress = oc.getAddress().orElseGet(oc::getChannel);
3334

3435
opened = new AtomicBoolean(false);
36+
healthEnabled = oc.getHealthEnabled();
3537

3638
AtomicReference<AmqpSender> sender = new AtomicReference<>();
3739
String link = oc.getLinkName().orElseGet(oc::getChannel);
@@ -131,6 +133,10 @@ public long getHealthTimeout() {
131133
return processor.getHealthTimeout();
132134
}
133135

136+
public boolean isHealthEnabled() {
137+
return healthEnabled;
138+
}
139+
134140
public void close() {
135141
processor.cancel();
136142
opened.set(false);

0 commit comments

Comments
 (0)