|
38 | 38 | import java.util.concurrent.atomic.AtomicInteger; |
39 | 39 | import java.util.function.Function; |
40 | 40 |
|
| 41 | +import static java.util.concurrent.TimeUnit.HOURS; |
41 | 42 | import static java.util.concurrent.TimeUnit.MINUTES; |
42 | 43 | import static org.slf4j.LoggerFactory.getLogger; |
43 | 44 |
|
@@ -125,6 +126,22 @@ private void topicMessageListener(final InputQueue inputQueue, final String mess |
125 | 126 | } |
126 | 127 | } |
127 | 128 |
|
| 129 | + @Scheduled(initialDelay = 1, fixedRate = 1, timeUnit = HOURS) |
| 130 | + private void refreshTopicListeners() { |
| 131 | + for (var entry : inputTopics.entrySet()) { |
| 132 | + try { |
| 133 | + var topicListener = entry.getValue(); |
| 134 | + topicListener.topic().removeListener(topicListener.id()); |
| 135 | + String id = topicListener.topic().addListener(String.class, (channel, message) -> topicMessageListener(entry.getKey(), message)); |
| 136 | + inputTopics.remove(entry.getKey()); |
| 137 | + inputTopics.put(entry.getKey(), new TopicListener(topicListener.topic(), id)); |
| 138 | + LOGGER.info("Refreshed {} topic listener {}", entry.getKey().topicName(), id); |
| 139 | + } catch (Exception e) { |
| 140 | + LOGGER.error("Error refreshing topic listener for {}", entry.getKey().topicName(), e); |
| 141 | + } |
| 142 | + } |
| 143 | + } |
| 144 | + |
128 | 145 | private String feedName() { |
129 | 146 | return getClass().getSimpleName(); |
130 | 147 | } |
|
0 commit comments