Skip to content

Commit 8cfe01a

Browse files
committed
[strimzi#434] feat: added create topic endpoint
Signed-off-by: ilkerkocatepe <[email protected]>
1 parent 4cb7075 commit 8cfe01a

File tree

6 files changed

+136
-0
lines changed

6 files changed

+136
-0
lines changed

src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java

+25
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
import org.apache.kafka.clients.admin.AdminClient;
1010
import org.apache.kafka.clients.admin.Config;
1111
import org.apache.kafka.clients.admin.ListOffsetsResult;
12+
import org.apache.kafka.clients.admin.NewTopic;
1213
import org.apache.kafka.clients.admin.OffsetSpec;
1314
import org.apache.kafka.clients.admin.TopicDescription;
1415
import org.apache.kafka.common.TopicPartition;
1516
import org.apache.kafka.common.config.ConfigResource;
1617
import org.apache.logging.log4j.LogManager;
1718
import org.apache.logging.log4j.Logger;
1819

20+
import java.util.Collections;
1921
import java.util.List;
2022
import java.util.Map;
2123
import java.util.Properties;
@@ -84,6 +86,29 @@ public CompletionStage<Set<String>> listTopics() {
8486
return promise;
8587
}
8688

89+
/**
90+
* Creates a topic with given name
91+
*
92+
* @param topicName topic name to create
93+
* @return a CompletionStage Void
94+
*/
95+
public CompletionStage<Void> createTopic(String topicName) {
96+
log.trace("Create topic thread {}", Thread.currentThread());
97+
log.info("Create topic {}", topicName);
98+
CompletableFuture<Void> promise = new CompletableFuture<>();
99+
this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 2, (short) 1)))
100+
.all()
101+
.whenComplete((topic, exception) -> {
102+
log.trace("Create topic callback thread {}", Thread.currentThread());
103+
if (exception == null) {
104+
promise.complete(topic);
105+
} else {
106+
promise.completeExceptionally(exception);
107+
}
108+
});
109+
return promise;
110+
}
111+
87112
/**
88113
* Returns the description of the specified topics.
89114
*

src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java

+30
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha
8181
doGetTopic(routingContext);
8282
break;
8383

84+
case CREATE_TOPIC:
85+
doCreateTopic(routingContext);
86+
break;
87+
8488
case LIST_PARTITIONS:
8589
doListPartitions(routingContext);
8690
break;
@@ -173,6 +177,32 @@ public void doGetTopic(RoutingContext routingContext) {
173177
});
174178
}
175179

180+
/**
181+
* Create a topic described in the HTTP request
182+
*
183+
* @param routingContext the routing context
184+
*/
185+
public void doCreateTopic(RoutingContext routingContext) {
186+
String topicName = routingContext.pathParam("topicname");
187+
188+
this.kafkaBridgeAdmin.createTopic(topicName)
189+
.whenComplete(((topic, exception) -> {
190+
log.trace("Create topic handler thread {}", Thread.currentThread());
191+
if (exception == null) {
192+
ArrayNode root = JsonUtils.createArrayNode();
193+
HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(),
194+
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(root));
195+
} else {
196+
HttpBridgeError error = new HttpBridgeError(
197+
HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
198+
exception.getMessage()
199+
);
200+
HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
201+
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
202+
}
203+
}));
204+
}
205+
176206
/**
177207
* Get partitions information related to the topic in the HTTP request
178208
*

src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java

+14
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public void start(Promise<Void> startPromise) {
157157
routerBuilder.operation(this.SEEK_TO_END.getOperationId().toString()).handler(this.SEEK_TO_END);
158158
routerBuilder.operation(this.LIST_TOPICS.getOperationId().toString()).handler(this.LIST_TOPICS);
159159
routerBuilder.operation(this.GET_TOPIC.getOperationId().toString()).handler(this.GET_TOPIC);
160+
routerBuilder.operation(this.CREATE_TOPIC.getOperationId().toString()).handler(this.CREATE_TOPIC);
160161
routerBuilder.operation(this.LIST_PARTITIONS.getOperationId().toString()).handler(this.LIST_PARTITIONS);
161162
routerBuilder.operation(this.GET_PARTITION.getOperationId().toString()).handler(this.GET_PARTITION);
162163
routerBuilder.operation(this.GET_OFFSETS.getOperationId().toString()).handler(this.GET_OFFSETS);
@@ -381,6 +382,11 @@ private void getTopic(RoutingContext routingContext) {
381382
processAdminClient(routingContext);
382383
}
383384

385+
private void createTopic(RoutingContext routingContext) {
386+
this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC);
387+
processAdminClient(routingContext);
388+
}
389+
384390
private void listPartitions(RoutingContext routingContext) {
385391
this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS);
386392
processAdminClient(routingContext);
@@ -726,6 +732,14 @@ public void process(RoutingContext routingContext) {
726732
}
727733
};
728734

735+
final HttpOpenApiOperation CREATE_TOPIC = new HttpOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC) {
736+
737+
@Override
738+
public void process(RoutingContext routingContext) {
739+
createTopic(routingContext);
740+
}
741+
};
742+
729743
final HttpOpenApiOperation LIST_PARTITIONS = new HttpOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS) {
730744

731745
@Override

src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public enum HttpOpenApiOperations {
2828
LIST_TOPICS("listTopics"),
2929
/** get information for a specific topic */
3030
GET_TOPIC("getTopic"),
31+
/** creates a topic with specified name */
32+
CREATE_TOPIC("createTopic"),
3133
/** list partitions for a specific topic */
3234
LIST_PARTITIONS("listPartitions"),
3335
/** get partition information for a specific topic */

src/main/resources/openapi.json

+50
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,56 @@
753753
}
754754
]
755755
},
756+
"/create-topic/{topicname}": {
757+
"post": {
758+
"tags": [
759+
"Topics"
760+
],
761+
"description": "Creates a topic with given topic name.",
762+
"operationId": "createTopic",
763+
"responses": {
764+
"200": {
765+
"description": "Topic metadata",
766+
"content": {
767+
"application/vnd.kafka.v2+json": {
768+
"schema": {
769+
"$ref": "#/components/schemas/TopicMetadata"
770+
}
771+
}
772+
}
773+
},
774+
"404": {
775+
"description": "The specified topic was not found.",
776+
"content": {
777+
"application/vnd.kafka.v2+json": {
778+
"schema": {
779+
"$ref": "#/components/schemas/Error"
780+
},
781+
"examples": {
782+
"response": {
783+
"value": {
784+
"error_code": 404,
785+
"message": "The specified topic was not found."
786+
}
787+
}
788+
}
789+
}
790+
}
791+
}
792+
}
793+
},
794+
"parameters": [
795+
{
796+
"name": "topicname",
797+
"in": "path",
798+
"description": "Name of the topic to send records to or retrieve metadata from.",
799+
"required": true,
800+
"schema": {
801+
"type": "string"
802+
}
803+
}
804+
]
805+
},
756806
"/consumers/{groupid}/instances/{name}/records": {
757807
"get": {
758808
"tags": [

src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java

+15
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,19 @@ void setupTopic(VertxTestContext context, String topic, int partitions, int coun
215215
});
216216
producer.get(TEST_TIMEOUT, TimeUnit.SECONDS);
217217
}
218+
219+
@Test
220+
void createTopicTest(VertxTestContext context) {
221+
baseService()
222+
.postRequest("/create-topic/" + topic)
223+
.as(BodyCodec.jsonArray())
224+
.send(ar -> {
225+
context.verify(() -> {
226+
assertThat(ar.succeeded(), is(true));
227+
HttpResponse<JsonArray> response = ar.result();
228+
assertThat(response.statusCode(), is(HttpResponseStatus.OK.code()));
229+
});
230+
context.completeNow();
231+
});
232+
}
218233
}

0 commit comments

Comments
 (0)