|
15 | 15 | import io.strimzi.kafka.bridge.config.BridgeConfig;
|
16 | 16 | import io.strimzi.kafka.bridge.http.converter.JsonUtils;
|
17 | 17 | import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
|
| 18 | +import io.vertx.core.json.JsonObject; |
18 | 19 | import io.vertx.ext.web.RoutingContext;
|
19 | 20 | import org.apache.kafka.clients.admin.Config;
|
20 | 21 | import org.apache.kafka.clients.admin.ConfigEntry;
|
|
32 | 33 | import java.util.HashSet;
|
33 | 34 | import java.util.List;
|
34 | 35 | import java.util.Map;
|
| 36 | +import java.util.Optional; |
35 | 37 | import java.util.Set;
|
36 | 38 | import java.util.concurrent.CompletableFuture;
|
37 | 39 | import java.util.concurrent.CompletionStage;
|
@@ -81,6 +83,10 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha
|
81 | 83 | doGetTopic(routingContext);
|
82 | 84 | break;
|
83 | 85 |
|
| 86 | + case CREATE_TOPIC: |
| 87 | + doCreateTopic(routingContext); |
| 88 | + break; |
| 89 | + |
84 | 90 | case LIST_PARTITIONS:
|
85 | 91 | doListPartitions(routingContext);
|
86 | 92 | break;
|
@@ -173,6 +179,47 @@ public void doGetTopic(RoutingContext routingContext) {
|
173 | 179 | });
|
174 | 180 | }
|
175 | 181 |
|
| 182 | + /** |
| 183 | + * Create a topic with described name, partitions count, and replication factor in the body of the HTTP request |
| 184 | + * |
| 185 | + * @param routingContext the routing context |
| 186 | + */ |
| 187 | + public void doCreateTopic(RoutingContext routingContext) { |
| 188 | + JsonObject jsonBody = routingContext.body().asJsonObject(); |
| 189 | + |
| 190 | + if (jsonBody.isEmpty()) { |
| 191 | + HttpBridgeError error = new HttpBridgeError( |
| 192 | + HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), |
| 193 | + "Request body must be a JSON object" |
| 194 | + ); |
| 195 | + HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), |
| 196 | + BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); |
| 197 | + return; |
| 198 | + } |
| 199 | + |
| 200 | + String topicName = jsonBody.getString("topic_name"); |
| 201 | + Optional<Integer> partitionsCount = Optional.ofNullable(jsonBody.getInteger("partitions_count")); |
| 202 | + Optional<Short> replicationFactor = Optional.ofNullable(jsonBody.getInteger("replication_factor")) |
| 203 | + .map(Integer::shortValue); |
| 204 | + |
| 205 | + this.kafkaBridgeAdmin.createTopic(topicName, partitionsCount, replicationFactor) |
| 206 | + .whenComplete(((topic, exception) -> { |
| 207 | + LOGGER.trace("Create topic handler thread {}", Thread.currentThread()); |
| 208 | + if (exception == null) { |
| 209 | + JsonNode root = JsonUtils.createObjectNode(); |
| 210 | + HttpUtils.sendResponse(routingContext, HttpResponseStatus.CREATED.code(), |
| 211 | + BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(root)); |
| 212 | + } else { |
| 213 | + HttpBridgeError error = new HttpBridgeError( |
| 214 | + HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), |
| 215 | + exception.getMessage() |
| 216 | + ); |
| 217 | + HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), |
| 218 | + BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); |
| 219 | + } |
| 220 | + })); |
| 221 | + } |
| 222 | + |
176 | 223 | /**
|
177 | 224 | * Get partitions information related to the topic in the HTTP request
|
178 | 225 | *
|
|
0 commit comments