Skip to content

Commit a87e31a

Browse files
committed
Going full reactive with Kafka; ditching PostgreSQL+TimescaleDB
1 parent d0b653e commit a87e31a

File tree

7 files changed

+44
-123
lines changed

7 files changed

+44
-123
lines changed

docker-compose.yml

-13
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,6 @@ services:
1616
- ./prometheus.yml:/etc/prometheus/prometheus.yml
1717
- prometheus-data:/prometheus
1818

19-
timescaledb:
20-
image: timescale/timescaledb:latest-pg15
21-
ports:
22-
- "5432:5432"
23-
environment:
24-
POSTGRES_USER: nhp
25-
POSTGRES_PASSWORD: nhppass
26-
POSTGRES_DB: nhpdb
27-
volumes:
28-
- timescale-data:/var/lib/postgresql/data
29-
30-
3119
grafana:
3220
image: grafana/grafana
3321
ports:
@@ -42,6 +30,5 @@ services:
4230

4331

4432
volumes:
45-
timescale-data:
4633
prometheus-data:
4734
grafana-data:

src/main/java/com/nhp/config/StreamProperties.java

-13
This file was deleted.

src/main/java/com/nhp/controller/MonitoredPrefixController.java

+19-29
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212

1313
import com.nhp.dto.MultipleMonitoredPrefixesRequest;
1414
import com.nhp.dto.SingleMonitoredPrefixRequest;
15-
import com.nhp.model.MonitoredPrefix;
16-
import com.nhp.repos.MonitoredPrefixesRepo;
1715
import com.nhp.stream.RipeStreamClient;
1816

1917
import java.time.Instant;
@@ -23,51 +21,43 @@
2321
@RequestMapping("/api/prefixes")
2422
public class MonitoredPrefixController {
2523

26-
@Autowired
27-
private final MonitoredPrefixesRepo monitoredPrefixesRepo;
28-
2924
@Autowired
3025
private final RipeStreamClient ripeStreamClient;
3126

32-
public MonitoredPrefixController(MonitoredPrefixesRepo repo, RipeStreamClient client) {
33-
monitoredPrefixesRepo = repo;
27+
public MonitoredPrefixController(RipeStreamClient client) {
3428
ripeStreamClient = client;
3529
}
3630

3731
@PostMapping("/add-one")
3832
public ResponseEntity<Void> addPrefix(@RequestBody SingleMonitoredPrefixRequest request) {
39-
monitoredPrefixesRepo.save(
40-
MonitoredPrefix.builder()
41-
.prefix(request.getPrefix())
42-
.createdAt(Instant.now())
43-
.build());
44-
// restart the live streamer so we can track this new prefix
45-
ripeStreamClient.restartStreamWithPrefixes();
33+
// TODO save the prefix to the new kafka topic
34+
// We shouldn't have to pass list of prefixes here, find a fix
35+
ripeStreamClient.restartStreamWithPrefixes(List.of("8.8.8.0/24"));
4636
return ResponseEntity.ok().build();
4737
}
4838

4939
@PostMapping("/add")
5040
public ResponseEntity<Void> addPrefixes(@RequestBody MultipleMonitoredPrefixesRequest request) {
51-
List<MonitoredPrefix> prefixes = request.getPrefixes()
52-
.stream()
53-
.map(prefix -> MonitoredPrefix.builder().prefix(prefix).createdAt(Instant.now()).build())
54-
.toList();
41+
5542
// save in batch
56-
monitoredPrefixesRepo.saveAll(prefixes);
43+
// TODO save all these prefixes to the kafka topic
44+
// TODO do something about restartStream needing the list of prefixes
5745

58-
ripeStreamClient.restartStreamWithPrefixes();
46+
ripeStreamClient.restartStreamWithPrefixes(List.of("8.8.8.0/24"));
5947

6048
return ResponseEntity.ok().build();
6149
}
6250

63-
@GetMapping
64-
public List<MonitoredPrefix> getPrefixes() {
65-
return monitoredPrefixesRepo.findAll();
66-
}
51+
// TODO Redis implementation for fetching and deleting the prefixes
6752

68-
@DeleteMapping("/{id}")
69-
public ResponseEntity<?> delete(@PathVariable Long id) {
70-
monitoredPrefixesRepo.deleteById(id);
71-
return ResponseEntity.noContent().build();
72-
}
53+
// @GetMapping
54+
// public List<MonitoredPrefix> getPrefixes() {
55+
// return monitoredPrefixesRepo.findAll();
56+
// }
57+
58+
// @DeleteMapping("/{id}")
59+
// public ResponseEntity<?> delete(@PathVariable Long id) {
60+
// monitoredPrefixesRepo.deleteById(id);
61+
// return ResponseEntity.noContent().build();
62+
// }
7363
}

src/main/java/com/nhp/model/MonitoredPrefix.java

-27
This file was deleted.

src/main/java/com/nhp/repos/MonitoredPrefixesRepo.java

-14
This file was deleted.

src/main/java/com/nhp/stream/RipeStreamClient.java

+24-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.nhp.stream;
22

33
import jakarta.annotation.PostConstruct;
4+
import jakarta.annotation.PreDestroy;
45
import lombok.extern.slf4j.Slf4j;
56

67
import java.util.List;
@@ -11,37 +12,46 @@
1112

1213
import com.fasterxml.jackson.core.JsonProcessingException;
1314
import com.fasterxml.jackson.databind.ObjectMapper;
14-
import com.nhp.config.StreamProperties;
15-
import com.nhp.model.MonitoredPrefix;
16-
import com.nhp.repos.MonitoredPrefixesRepo;
15+
import com.nhp.services.MetricsService;
1716

1817
import reactor.core.publisher.Mono;
1918
import reactor.netty.http.client.HttpClient;
2019
import reactor.netty.http.client.WebsocketClientSpec;
2120
import reactor.core.Disposable;
21+
import reactor.util.retry.Retry;
2222

2323
@Slf4j
2424
@Component
2525
public class RipeStreamClient {
2626

2727
@Autowired
28-
private MonitoredPrefixesRepo monitoredPrefixesRepo;
28+
private MetricsService metricsService;
2929

3030
private static final String RIS_WS_URL = "wss://ris-live.ripe.net/v1/ws/";
3131
private final ObjectMapper objectMapper = new ObjectMapper();
3232
private Disposable connection;
3333

3434
@PostConstruct
3535
public void startStream() {
36-
restartStreamWithPrefixes();
36+
// TODO fetch this list of prefixes from the kafka stream
37+
restartStreamWithPrefixes(List.of("8.8.8.0/24"));
3738
}
3839

39-
public void restartStreamWithPrefixes() {
40+
// gracefully handle app shutdown, least thing we can do
41+
@PreDestroy
42+
public void stopStream() {
43+
if (connection != null && !connection.isDisposed()) {
44+
connection.dispose();
45+
log.info("RIPE stream connection closed on shutdown.");
46+
}
47+
}
48+
49+
public void restartStreamWithPrefixes(List<String> prefixes) {
4050
if (connection != null && !connection.isDisposed()) {
4151
connection.dispose();
4252
log.info("Closed previous RIPE stream connection");
4353
}
44-
List<String> prefixes = monitoredPrefixesRepo.findAllPrefixes();
54+
4555
log.info("Connecting to RIPE RIS WebSocket with {} prefix(es)", prefixes.size());
4656

4757
connection = HttpClient.create()
@@ -55,13 +65,19 @@ public void restartStreamWithPrefixes() {
5565

5666
// consumer logic from this socket now
5767
inbound.receive().asString()
58-
.doOnNext(msg -> log.info("BGP Message: {}", msg))
68+
.doOnNext(msg -> {
69+
log.info("BGP Message: {}", msg);
70+
metricsService.incrementBgpMessagesReceieved();
71+
})
5972
.doOnError(error -> log.error("Error while streaming", error))
6073
.doOnComplete(() -> log.warn("Stream completed/disconnected"))
6174
.subscribe();
6275
return Mono.never();
6376
})
6477
.doOnError(error -> log.error("WebSocket connection error", error))
78+
.retryWhen(Retry.backoff(5, java.time.Duration.ofSeconds(5))
79+
.doBeforeRetry(retrySignal -> log.warn("Retrying RIPE WebSocket connection (attempt {})",
80+
retrySignal.totalRetries() + 1)))
6581
.subscribe();
6682
}
6783

src/main/resources/application.yml

+1-19
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,8 @@
1-
stream:
2-
debounce-ms: 5000
3-
4-
spring:
5-
datasource:
6-
url: jdbc:postgresql://timescaledb:5432/nhpdb
7-
username: nhp
8-
password: nhppass
9-
jpa:
10-
hibernate:
11-
ddl-auto: update
12-
show-sql: true
13-
properties:
14-
hibernate:
15-
dialect: org.hibernate.dialect.PostgreSQLDialect
16-
171
logging:
182
level:
193
root: INFO # Reduce global noise (was DEBUG by default in dev)
204
org.springframework: WARN # Suppress most Spring internals
21-
org.hibernate.SQL: OFF # Turn off raw SQL logging
22-
org.hibernate.type.descriptor.sql.BasicBinder: OFF
23-
com.nhp: DEBUG # Your package — keep this at DEBUG to see your own logs
5+
com.nhp: INFO # Your package — keep this at DEBUG to see your own logs
246

257
management:
268
endpoints:

0 commit comments

Comments
 (0)