Skip to content

Commit 03d4b34

Browse files
authored
Merge pull request #18 from taosdata/dev/zyyang
feat(subscribe): add timeout parameter#TD-33849
2 parents 24a98b8 + 399c5aa commit 03d4b34

File tree

2 files changed

+14
-0
lines changed

2 files changed

+14
-0
lines changed

src/main/java/com/taosdata/demo/service/Subscriber.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public class Subscriber implements CommandLineRunner {
3232
private String consumerConfigFile;
3333
@Value("${subscriber.topic-names}")
3434
private String[] topicNames;
35+
@Value("${subscriber.timeout}")
36+
private String timeout;
3537
@Value("${subscriber.poll-timeout}")
3638
private int pollTimeout;
3739
@Value("${subscriber.print-data-in-log: false}")
@@ -102,10 +104,20 @@ private void consume(Properties properties) throws Exception {
102104
}
103105

104106
int count = 0;
107+
long start = System.currentTimeMillis();
108+
long timeout_in_milli = Long.MAX_VALUE;
109+
try {
110+
timeout_in_milli = Long.parseLong(timeout);
111+
} catch (Exception e) {
112+
log.warn("failed to parse timeout: {}, use default: {}", timeout, timeout_in_milli);
113+
}
105114
while (true) {
106115
printOffsets(consumer, "before poll(" + count + ")");
107116
try {
108117
ConsumerRecords<Map<String, Object>> records = consumer.poll(Duration.ofMillis(pollTimeout));
118+
if (records.isEmpty() && System.currentTimeMillis() - start > timeout_in_milli) {
119+
break;
120+
}
109121
for (ConsumerRecord<Map<String, Object>> record : records) {
110122
String line = formatter.format(record);
111123
if (printDataInLog) {

src/main/resources/application.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ subscriber:
33
concurrency: 1
44
# 要订阅哪些topic,以逗号分隔
55
topic-names: tp1,tp2,tp3
6+
# 订阅超时,超过这个时间没有订阅到数据就会退出
7+
timeout: 5000
68
# 每次 pull 的超时时间
79
poll-timeout: 1000
810
# pull 以后,写到一个文件里

0 commit comments

Comments
 (0)