Skip to content

Commit a4f401d

Browse files
committed
setup
1 parent 74a0d30 commit a4f401d

File tree

8 files changed

+90
-7
lines changed

8 files changed

+90
-7
lines changed

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/ErrorPayload.java

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828

2929
public class ErrorPayload implements SubscriptionPollPayload {
3030

31+
private static final String OUTDATED_ERROR_MSG = "outdated subscription event";
32+
public static final ErrorPayload OUTDATED_ERROR_PAYLOAD =
33+
new ErrorPayload(OUTDATED_ERROR_MSG, false);
34+
3135
/** The error message describing the issue. */
3236
private transient String errorMessage;
3337

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java

+4
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,10 @@ private Optional<SubscriptionMessage> pollTabletsInternal(
10221022

10231023
final String errorMessage = ((ErrorPayload) payload).getErrorMessage();
10241024
final boolean critical = ((ErrorPayload) payload).isCritical();
1025+
if (Objects.equals(payload, ErrorPayload.OUTDATED_ERROR_PAYLOAD)) {
1026+
// suppress warn log when poll outdated subscription event
1027+
return Optional.empty();
1028+
}
10251029
LOGGER.warn(
10261030
"Error occurred when SubscriptionConsumer {} polling tablets with commit context {}: {}, critical: {}",
10271031
this,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java

+9
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,15 @@ public List<SubscriptionCommitContext> commit(
116116
return broker.commit(consumerId, commitContexts, nack);
117117
}
118118

119+
public boolean isCommitContextOutdated(final SubscriptionCommitContext commitContext) {
120+
final String consumerGroupId = commitContext.getConsumerGroupId();
121+
final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
122+
if (Objects.isNull(broker)) {
123+
return true;
124+
}
125+
return broker.isCommitContextOutdated(commitContext);
126+
}
127+
119128
/////////////////////////////// broker ///////////////////////////////
120129

121130
public boolean isBrokerExist(final String consumerGroupId) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java

+10
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,16 @@ public List<SubscriptionCommitContext> commit(
329329
return successfulCommitContexts;
330330
}
331331

332+
public boolean isCommitContextOutdated(final SubscriptionCommitContext commitContext) {
333+
final String topicName = commitContext.getTopicName();
334+
final SubscriptionPrefetchingQueue prefetchingQueue =
335+
topicNameToPrefetchingQueue.get(topicName);
336+
if (Objects.isNull(prefetchingQueue)) {
337+
return true;
338+
}
339+
return prefetchingQueue.isCommitContextOutdated(commitContext);
340+
}
341+
332342
/////////////////////////////// prefetching queue ///////////////////////////////
333343

334344
public void bindPrefetchingQueue(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java

+21
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ public abstract class SubscriptionPrefetchingQueue {
6868
private final SubscriptionBlockingPendingQueue inputPendingQueue;
6969

7070
private final AtomicLong commitIdGenerator;
71+
// record initial commit for outdated event detection
72+
private final long initialCommitId;
7173

7274
/** A queue containing a series of prefetched pollable {@link SubscriptionEvent}. */
7375
protected final PriorityBlockingQueue<SubscriptionEvent> prefetchingQueue;
@@ -113,6 +115,7 @@ public SubscriptionPrefetchingQueue(
113115
this.topicName = topicName;
114116
this.inputPendingQueue = inputPendingQueue;
115117
this.commitIdGenerator = commitIdGenerator;
118+
this.initialCommitId = commitIdGenerator.get();
116119

117120
this.prefetchingQueue = new PriorityBlockingQueue<>();
118121
this.inFlightEvents = new ConcurrentHashMap<>();
@@ -582,6 +585,24 @@ protected SubscriptionEvent generateSubscriptionPollErrorResponse(final String e
582585
INVALID_COMMIT_ID));
583586
}
584587

588+
protected SubscriptionEvent generateSubscriptionPollOutdatedErrorResponse() {
589+
// consider non-critical by default, meaning the client can retry
590+
return new SubscriptionEvent(
591+
SubscriptionPollResponseType.ERROR.getType(),
592+
ErrorPayload.OUTDATED_ERROR_PAYLOAD,
593+
new SubscriptionCommitContext(
594+
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
595+
PipeDataNodeAgent.runtime().getRebootTimes(),
596+
topicName,
597+
brokerId,
598+
INVALID_COMMIT_ID));
599+
}
600+
601+
public boolean isCommitContextOutdated(final SubscriptionCommitContext commitContext) {
602+
return PipeDataNodeAgent.runtime().getRebootTimes() > commitContext.getRebootTimes()
603+
|| initialCommitId > commitContext.getCommitId();
604+
}
605+
585606
//////////////////////////// APIs provided for metric framework ////////////////////////////
586607

587608
public String getPrefetchingQueueId() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java

+10
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,16 @@ public SubscriptionEvent pollTablets(
7878
(key, ev) -> {
7979
// 1. Extract current event and check it
8080
if (Objects.isNull(ev)) {
81+
if (isCommitContextOutdated(commitContext)) {
82+
LOGGER.warn(
83+
"SubscriptionPrefetchingTabletQueue {} detected outdated poll request, consumer {}, commit context {}, offset {}",
84+
this,
85+
consumerId,
86+
commitContext,
87+
offset);
88+
eventRef.set(generateSubscriptionPollOutdatedErrorResponse());
89+
return null;
90+
}
8191
final String errorMessage =
8292
String.format(
8393
"SubscriptionPrefetchingTabletQueue %s is currently not transferring any tablet to consumer %s, commit context: %s, offset: %s",

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java

+10
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ public SubscriptionEvent pollTsFile(
8484
(key, ev) -> {
8585
// 1. Extract current event and check it
8686
if (Objects.isNull(ev)) {
87+
if (isCommitContextOutdated(commitContext)) {
88+
LOGGER.warn(
89+
"SubscriptionPrefetchingTsFileQueue {} detected outdated poll request, consumer {}, commit context {}, writing offset {}",
90+
this,
91+
consumerId,
92+
commitContext,
93+
writingOffset);
94+
eventRef.set(generateSubscriptionPollOutdatedErrorResponse());
95+
return null;
96+
}
8797
final String errorMessage =
8898
String.format(
8999
"SubscriptionPrefetchingTsFileQueue %s is currently not transferring any file to consumer %s, commit context: %s, writing offset: %s",

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java

+22-7
Original file line numberDiff line numberDiff line change
@@ -425,14 +425,20 @@ private TPipeSubscribeResp handlePipeSubscribePollInternal(final PipeSubscribePo
425425
final SubscriptionCommitContext commitContext = event.getCommitContext();
426426
final SubscriptionPollResponse response = event.getCurrentResponse();
427427
if (Objects.isNull(response)) {
428+
final boolean isOutdated =
429+
SubscriptionAgent.broker()
430+
.isCommitContextOutdated(event.getCommitContext());
428431
LOGGER.warn(
429-
"Subscription: consumer {} poll null response for event {} with request: {}",
432+
"Subscription: consumer {} poll null response for event {} (outdated: {}) with request: {}",
430433
consumerConfig,
431434
event,
435+
isOutdated,
432436
req.getRequest());
433437
// nack
434-
SubscriptionAgent.broker()
435-
.commit(consumerConfig, Collections.singletonList(commitContext), true);
438+
if (!isOutdated) {
439+
SubscriptionAgent.broker()
440+
.commit(consumerConfig, Collections.singletonList(commitContext), true);
441+
}
436442
return null;
437443
}
438444

@@ -462,24 +468,33 @@ private TPipeSubscribeResp handlePipeSubscribePollInternal(final PipeSubscribePo
462468
req.getRequest());
463469
return byteBuffer;
464470
} catch (final Exception e) {
471+
final boolean isOutdated =
472+
SubscriptionAgent.broker()
473+
.isCommitContextOutdated(event.getCommitContext());
465474
if (e instanceof SubscriptionPayloadExceedException) {
466475
LOGGER.error(
467-
"Subscription: consumer {} poll excessive payload {} with request: {}, something unexpected happened with parameter configuration or payload control...",
476+
"Subscription: consumer {} poll excessive payload {} for event {} (outdated: {}) with request: {}, something unexpected happened with parameter configuration or payload control...",
468477
consumerConfig,
469478
response,
479+
event,
480+
isOutdated,
470481
req.getRequest(),
471482
e);
472483
} else {
473484
LOGGER.warn(
474-
"Subscription: consumer {} poll {} failed with request: {}",
485+
"Subscription: consumer {} poll {} for event {} (outdated: {}) failed with request: {}",
475486
consumerConfig,
476487
response,
488+
event,
489+
isOutdated,
477490
req.getRequest(),
478491
e);
479492
}
480493
// nack
481-
SubscriptionAgent.broker()
482-
.commit(consumerConfig, Collections.singletonList(commitContext), true);
494+
if (!isOutdated) {
495+
SubscriptionAgent.broker()
496+
.commit(consumerConfig, Collections.singletonList(commitContext), true);
497+
}
483498
return null;
484499
}
485500
})

0 commit comments

Comments
 (0)