Skip to content

Commit 4acf8ab

Browse files
committedOct 5, 2024
[strimzi#434] feat: added create topic endpoint - path changed
Signed-off-by: ilkerkocatepe <kocatepeilker@gmail.com>
1 parent 5e16a8d commit 4acf8ab

File tree

6 files changed

+49
-9
lines changed

6 files changed

+49
-9
lines changed
 

‎src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,11 @@ public CompletionStage<Set<String>> listTopics() {
9292
* @param topicName topic name to create
9393
* @return a CompletionStage Void
9494
*/
95-
public CompletionStage<Void> createTopic(String topicName) {
95+
public CompletionStage<Void> createTopic(String topicName, int partitions, short replicationFactor) {
9696
LOGGER.trace("Create topic thread {}", Thread.currentThread());
97-
LOGGER.info("Create topic {}", topicName);
97+
LOGGER.info("Create topic {}, partitions {}, replicationFactor {}", topicName, partitions, replicationFactor);
9898
CompletableFuture<Void> promise = new CompletableFuture<>();
99-
this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 2, (short) 1)))
99+
this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, partitions, replicationFactor)))
100100
.all()
101101
.whenComplete((topic, exception) -> {
102102
LOGGER.trace("Create topic callback thread {}", Thread.currentThread());

‎src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,10 @@ public void doGetTopic(RoutingContext routingContext) {
184184
*/
185185
public void doCreateTopic(RoutingContext routingContext) {
186186
String topicName = routingContext.pathParam("topicname");
187+
int partitions = Integer.parseInt(routingContext.queryParams().get("partitions"));
188+
short replicationFactor = Short.parseShort(routingContext.queryParams().get("replication_factor"));
187189

188-
this.kafkaBridgeAdmin.createTopic(topicName)
190+
this.kafkaBridgeAdmin.createTopic(topicName, partitions, replicationFactor)
189191
.whenComplete(((topic, exception) -> {
190192
LOGGER.trace("Create topic handler thread {}", Thread.currentThread());
191193
if (exception == null) {

‎src/main/resources/openapi.json

+19-1
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@
753753
}
754754
]
755755
},
756-
"/create-topic/{topicname}": {
756+
"/admin/topics/{topicname}": {
757757
"post": {
758758
"tags": [
759759
"Topics"
@@ -800,6 +800,24 @@
800800
"schema": {
801801
"type": "string"
802802
}
803+
},
804+
{
805+
"name": "partitions",
806+
"in": "query",
807+
"description": "Number of partitions for the topic.",
808+
"required": true,
809+
"schema": {
810+
"type": "integer"
811+
}
812+
},
813+
{
814+
"name": "replication_factor",
815+
"in": "query",
816+
"description": "Replication factor for the topic.",
817+
"required": true,
818+
"schema": {
819+
"type": "integer"
820+
}
803821
}
804822
]
805823
},

‎src/main/resources/openapiv2.json

+19-1
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,7 @@
686686
}
687687
]
688688
},
689-
"/create-topic/{topicname}": {
689+
"/admin/topics/{topicname}": {
690690
"post": {
691691
"tags": [
692692
"Topics"
@@ -725,6 +725,24 @@
725725
"schema": {
726726
"type": "string"
727727
}
728+
},
729+
{
730+
"name": "partitions",
731+
"in": "query",
732+
"description": "Number of partitions for the topic.",
733+
"required": true,
734+
"schema": {
735+
"type": "integer"
736+
}
737+
},
738+
{
739+
"name": "replication_factor",
740+
"in": "query",
741+
"description": "Replication factor for the topic.",
742+
"required": true,
743+
"schema": {
744+
"type": "integer"
745+
}
728746
}
729747
]
730748
},

‎src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,9 @@ void setupTopic(VertxTestContext context, String topic, int partitions, int coun
219219
@Test
220220
void createTopicTest(VertxTestContext context) {
221221
baseService()
222-
.postRequest("/create-topic/" + topic)
222+
.postRequest("/admin/topics/" + topic)
223+
.addQueryParam("partitions", "1")
224+
.addQueryParam("replication_factor", "1")
223225
.as(BodyCodec.jsonArray())
224226
.send(ar -> {
225227
context.verify(() -> {

‎src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ void openapiTest(VertxTestContext context) {
151151
assertThat(paths.containsKey("/topics/{topicname}/partitions/{partitionid}/offsets"), is(true));
152152
assertThat(paths.containsKey("/topics/{topicname}/partitions"), is(true));
153153
assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/topics/{topicname}/partitions/{partitionid}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.SEND_TO_PARTITION.toString()));
154-
assertThat(paths.containsKey("/create-topic/{topicname}"), is(true));
155-
assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/create-topic/{topicname}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.CREATE_TOPIC.toString()));
154+
assertThat(paths.containsKey("/admin/topics/{topicname}"), is(true));
155+
assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/admin/topics/{topicname}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.CREATE_TOPIC.toString()));
156156
assertThat(paths.containsKey("/healthy"), is(true));
157157
assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/healthy").getJsonObject("get").getString("operationId"), is(HttpOpenApiOperations.HEALTHY.toString()));
158158
assertThat(paths.containsKey("/ready"), is(true));

0 commit comments

Comments
 (0)
Please sign in to comment.