Skip to content

Commit b97247a

Browse files
committed
[strimzi#434] feat: added create topic endpoint
Signed-off-by: ilkerkocatepe <[email protected]>
1 parent bc04d53 commit b97247a

File tree

6 files changed

+136
-0
lines changed

6 files changed

+136
-0
lines changed

src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java

+25
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
import org.apache.kafka.clients.admin.AdminClient;
1010
import org.apache.kafka.clients.admin.Config;
1111
import org.apache.kafka.clients.admin.ListOffsetsResult;
12+
import org.apache.kafka.clients.admin.NewTopic;
1213
import org.apache.kafka.clients.admin.OffsetSpec;
1314
import org.apache.kafka.clients.admin.TopicDescription;
1415
import org.apache.kafka.common.TopicPartition;
1516
import org.apache.kafka.common.config.ConfigResource;
1617
import org.slf4j.Logger;
1718
import org.slf4j.LoggerFactory;
1819

20+
import java.util.Collections;
1921
import java.util.List;
2022
import java.util.Map;
2123
import java.util.Properties;
@@ -84,6 +86,29 @@ public CompletionStage<Set<String>> listTopics() {
8486
return promise;
8587
}
8688

89+
/**
90+
* Creates a topic with given name
91+
*
92+
* @param topicName topic name to create
93+
* @return a CompletionStage Void
94+
*/
95+
public CompletionStage<Void> createTopic(String topicName) {
96+
log.trace("Create topic thread {}", Thread.currentThread());
97+
log.info("Create topic {}", topicName);
98+
CompletableFuture<Void> promise = new CompletableFuture<>();
99+
this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 2, (short) 1)))
100+
.all()
101+
.whenComplete((topic, exception) -> {
102+
log.trace("Create topic callback thread {}", Thread.currentThread());
103+
if (exception == null) {
104+
promise.complete(topic);
105+
} else {
106+
promise.completeExceptionally(exception);
107+
}
108+
});
109+
return promise;
110+
}
111+
87112
/**
88113
* Returns the description of the specified topics.
89114
*

src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java

+30
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha
7878
doGetTopic(routingContext);
7979
break;
8080

81+
case CREATE_TOPIC:
82+
doCreateTopic(routingContext);
83+
break;
84+
8185
case LIST_PARTITIONS:
8286
doListPartitions(routingContext);
8387
break;
@@ -170,6 +174,32 @@ public void doGetTopic(RoutingContext routingContext) {
170174
});
171175
}
172176

177+
/**
178+
* Create a topic described in the HTTP request
179+
*
180+
* @param routingContext the routing context
181+
*/
182+
public void doCreateTopic(RoutingContext routingContext) {
183+
String topicName = routingContext.pathParam("topicname");
184+
185+
this.kafkaBridgeAdmin.createTopic(topicName)
186+
.whenComplete(((topic, exception) -> {
187+
log.trace("Create topic handler thread {}", Thread.currentThread());
188+
if (exception == null) {
189+
ArrayNode root = JsonUtils.createArrayNode();
190+
HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(),
191+
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(root));
192+
} else {
193+
HttpBridgeError error = new HttpBridgeError(
194+
HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
195+
exception.getMessage()
196+
);
197+
HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
198+
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
199+
}
200+
}));
201+
}
202+
173203
/**
174204
* Get partitions information related to the topic in the HTTP request
175205
*

src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java

+14
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ public void start(Promise<Void> startPromise) {
159159
routerBuilder.operation(this.SEEK_TO_END.getOperationId().toString()).handler(this.SEEK_TO_END);
160160
routerBuilder.operation(this.LIST_TOPICS.getOperationId().toString()).handler(this.LIST_TOPICS);
161161
routerBuilder.operation(this.GET_TOPIC.getOperationId().toString()).handler(this.GET_TOPIC);
162+
routerBuilder.operation(this.CREATE_TOPIC.getOperationId().toString()).handler(this.CREATE_TOPIC);
162163
routerBuilder.operation(this.LIST_PARTITIONS.getOperationId().toString()).handler(this.LIST_PARTITIONS);
163164
routerBuilder.operation(this.GET_PARTITION.getOperationId().toString()).handler(this.GET_PARTITION);
164165
routerBuilder.operation(this.GET_OFFSETS.getOperationId().toString()).handler(this.GET_OFFSETS);
@@ -378,6 +379,11 @@ private void getTopic(RoutingContext routingContext) {
378379
processAdminClient(routingContext);
379380
}
380381

382+
private void createTopic(RoutingContext routingContext) {
383+
this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC);
384+
processAdminClient(routingContext);
385+
}
386+
381387
private void listPartitions(RoutingContext routingContext) {
382388
this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS);
383389
processAdminClient(routingContext);
@@ -721,6 +727,14 @@ public void process(RoutingContext routingContext) {
721727
}
722728
};
723729

730+
final HttpOpenApiOperation CREATE_TOPIC = new HttpOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC) {
731+
732+
@Override
733+
public void process(RoutingContext routingContext) {
734+
createTopic(routingContext);
735+
}
736+
};
737+
724738
final HttpOpenApiOperation LIST_PARTITIONS = new HttpOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS) {
725739

726740
@Override

src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public enum HttpOpenApiOperations {
2828
LIST_TOPICS("listTopics"),
2929
/** get information for a specific topic */
3030
GET_TOPIC("getTopic"),
31+
/** creates a topic with specified name */
32+
CREATE_TOPIC("createTopic"),
3133
/** list partitions for a specific topic */
3234
LIST_PARTITIONS("listPartitions"),
3335
/** get partition information for a specific topic */

src/main/resources/openapi.json

+50
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,56 @@
748748
}
749749
]
750750
},
751+
"/create-topic/{topicname}": {
752+
"post": {
753+
"tags": [
754+
"Topics"
755+
],
756+
"description": "Creates a topic with given topic name.",
757+
"operationId": "createTopic",
758+
"responses": {
759+
"200": {
760+
"description": "Topic metadata",
761+
"content": {
762+
"application/vnd.kafka.v2+json": {
763+
"schema": {
764+
"$ref": "#/components/schemas/TopicMetadata"
765+
}
766+
}
767+
}
768+
},
769+
"404": {
770+
"description": "The specified topic was not found.",
771+
"content": {
772+
"application/vnd.kafka.v2+json": {
773+
"schema": {
774+
"$ref": "#/components/schemas/Error"
775+
},
776+
"examples": {
777+
"response": {
778+
"value": {
779+
"error_code": 404,
780+
"message": "The specified topic was not found."
781+
}
782+
}
783+
}
784+
}
785+
}
786+
}
787+
}
788+
},
789+
"parameters": [
790+
{
791+
"name": "topicname",
792+
"in": "path",
793+
"description": "Name of the topic to send records to or retrieve metadata from.",
794+
"required": true,
795+
"schema": {
796+
"type": "string"
797+
}
798+
}
799+
]
800+
},
751801
"/consumers/{groupid}/instances/{name}/records": {
752802
"get": {
753803
"tags": [

src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java

+15
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,19 @@ void setupTopic(VertxTestContext context, String topic, int partitions, int coun
215215
});
216216
producer.get(TEST_TIMEOUT, TimeUnit.SECONDS);
217217
}
218+
219+
@Test
220+
void createTopicTest(VertxTestContext context) {
221+
baseService()
222+
.postRequest("/create-topic/" + topic)
223+
.as(BodyCodec.jsonArray())
224+
.send(ar -> {
225+
context.verify(() -> {
226+
assertThat(ar.succeeded(), is(true));
227+
HttpResponse<JsonArray> response = ar.result();
228+
assertThat(response.statusCode(), is(HttpResponseStatus.OK.code()));
229+
});
230+
context.completeNow();
231+
});
232+
}
218233
}

0 commit comments

Comments
 (0)