Skip to content

Commit 69cbbe4

Browse files
fix: ensure S3 endpoint is not required for table topic since AWS client can guess it most of the time (#2915) (#2917)
Co-authored-by: Romain Manni-Bucau <[email protected]>
1 parent 3d2dae1 commit 69cbbe4

File tree

2 files changed

+152
-5
lines changed

2 files changed

+152
-5
lines changed

core/src/main/java/kafka/automq/table/CatalogFactory.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,15 +217,16 @@ private Catalog runAs(Supplier<Catalog> func) {
217217
}
218218
}
219219

220+
// important: use putIfAbsent to let the user override all values directly in catalog configuration
220221
private void putDataBucketAsWarehouse(boolean s3a) {
221-
if (bucketURI.endpoint() != null) {
222-
options.put("s3.endpoint", bucketURI.endpoint());
222+
if (StringUtils.isNotBlank(bucketURI.endpoint())) {
223+
options.putIfAbsent("s3.endpoint", bucketURI.endpoint());
223224
}
224225
if (bucketURI.extensionBool(AwsObjectStorage.PATH_STYLE_KEY, false)) {
225-
options.put("s3.path-style-access", "true");
226+
options.putIfAbsent("s3.path-style-access", "true");
226227
}
227-
options.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
228-
options.put("warehouse", String.format((s3a ? "s3a" : "s3") + "://%s/iceberg", bucketURI.bucket()));
228+
options.putIfAbsent("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
229+
options.putIfAbsent("warehouse", String.format((s3a ? "s3a" : "s3") + "://%s/iceberg", bucketURI.bucket()));
229230
}
230231

231232
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package kafka.automq.table;
2+
3+
import kafka.server.KafkaConfig;
4+
5+
import org.apache.kafka.raft.QuorumConfig;
6+
import org.apache.kafka.server.config.KRaftConfigs;
7+
8+
import com.sun.net.httpserver.HttpServer;
9+
10+
import org.apache.iceberg.aws.s3.S3FileIOProperties;
11+
import org.apache.iceberg.inmemory.InMemoryFileIO;
12+
import org.apache.iceberg.rest.RESTCatalog;
13+
import org.apache.iceberg.util.SerializableMap;
14+
import org.junit.jupiter.api.Test;
15+
import org.junit.jupiter.api.TestInstance;
16+
17+
import java.io.IOException;
18+
import java.net.InetSocketAddress;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.concurrent.CopyOnWriteArrayList;
22+
import java.util.stream.Stream;
23+
24+
import static java.nio.charset.StandardCharsets.UTF_8;
25+
import static java.util.stream.Collectors.toMap;
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
28+
import static org.junit.jupiter.api.Assertions.assertNull;
29+
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
30+
31+
@TestInstance(PER_CLASS)
32+
class CatalogFactoryTest {
33+
// minimalistic properties to let KafkaConfig validation pass and let us test our catalog factory
34+
private final Map<String, String> requiredKafkaConfigProperties = Map.of(
35+
KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "PLAINTEXT",
36+
KRaftConfigs.NODE_ID_CONFIG, "2",
37+
KRaftConfigs.PROCESS_ROLES_CONFIG, "controller",
38+
QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9092"
39+
);
40+
41+
@Test
42+
void restPassthroughProperties() throws IOException {
43+
final var restCatalog = new RestCatalogMock();
44+
try (final var autoClose = restCatalog) {
45+
final var config = new KafkaConfig(merge(requiredKafkaConfigProperties, Map.of(
46+
"automq.table.topic.catalog.type", "rest",
47+
"automq.table.topic.catalog.uri", restCatalog.base(),
48+
"automq.table.topic.catalog.header.x-custom", "my-x", // Apache Polaris needs a tenant header for ex
49+
// automq specific/enforced (not standard catalog passthrough)
50+
"s3.data.buckets", "0@s3://my_bucket?region=us-east-1&endpoint=http://localhost:12345&pathStyle=true"
51+
)));
52+
final var catalog = new CatalogFactory.Builder(config).build();
53+
assertInstanceOf(RESTCatalog.class, catalog).close();
54+
}
55+
assertEquals(List.of("GET /v1/config?warehouse=s3://my_bucket/iceberg\nmy-x"), restCatalog.requests());
56+
}
57+
58+
@Test
59+
void ignoreEmptyS3EndpointForRestCatalog() throws IOException {
60+
FakeS3IO.lastS3FileIOProperties = null;
61+
try (final var restCatalog = new RestCatalogMock()) {
62+
final var config = new KafkaConfig(merge(requiredKafkaConfigProperties, Map.of(
63+
"automq.table.topic.catalog.type", "rest",
64+
"automq.table.topic.catalog.uri", restCatalog.base(),
65+
"automq.table.topic.catalog.io-impl", FakeS3IO.class.getName(),
66+
"s3.data.buckets", "0@s3://my_bucket?region=us-east-1"
67+
)));
68+
final var catalog = new CatalogFactory.Builder(config).build();
69+
assertInstanceOf(RESTCatalog.class, catalog).close();
70+
assertNull(FakeS3IO.lastS3FileIOProperties().endpoint(), "S3FileIO endpoint should be null when not set - not even empty");
71+
} finally {
72+
FakeS3IO.lastS3FileIOProperties = null;
73+
}
74+
}
75+
76+
@SafeVarargs
77+
private Map<String, String> merge(final Map<String, String>... all) {
78+
return Stream.of(all)
79+
.flatMap(it -> it.entrySet().stream())
80+
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b) -> b));
81+
}
82+
83+
public static class FakeS3IO extends InMemoryFileIO {
84+
private static S3FileIOProperties lastS3FileIOProperties;
85+
86+
private static S3FileIOProperties lastS3FileIOProperties() {
87+
return lastS3FileIOProperties;
88+
}
89+
90+
@Override
91+
public void initialize(final Map<String, String> properties) {
92+
lastS3FileIOProperties = new S3FileIOProperties(SerializableMap.copyOf(properties));
93+
super.initialize(properties);
94+
}
95+
}
96+
97+
private static class RestCatalogMock implements AutoCloseable {
98+
private final List<String> requests = new CopyOnWriteArrayList<>(); // normally overkill but makes the test more accurate
99+
private final HttpServer catalogBackend;
100+
101+
private RestCatalogMock() throws IOException {
102+
catalogBackend = HttpServer.create(new InetSocketAddress("localhost", 0), 16);
103+
catalogBackend.createContext("/").setHandler(ex -> {
104+
try (ex) {
105+
final var method = ex.getRequestMethod();
106+
requests.add(
107+
method + ' ' + ex.getRequestURI().getPath() + '?' + ex.getRequestURI().getQuery() +
108+
('\n' + String.join("", ex.getRequestHeaders().getOrDefault("x-custom", List.of()))) +
109+
('\n' + new String(ex.getRequestBody().readAllBytes(), UTF_8)).strip());
110+
111+
if (method.equals("GET") &&
112+
ex.getRequestURI().getPath().equals("/v1/config") &&
113+
"warehouse=s3%3A%2F%2Fmy_bucket%2Ficeberg".equals(ex.getRequestURI().getRawQuery())) {
114+
final var body = """
115+
{
116+
"defaults": {},
117+
"overrides": {}
118+
}
119+
""".getBytes(UTF_8);
120+
ex.getResponseHeaders().add("content-type", "application/json");
121+
ex.sendResponseHeaders(200, body.length);
122+
ex.getResponseBody().write(body);
123+
return;
124+
}
125+
126+
// else we just called an unexpected endpoint, issue a HTTP 404
127+
ex.sendResponseHeaders(404, 0);
128+
}
129+
});
130+
catalogBackend.start();
131+
}
132+
133+
private String base() {
134+
return "http://localhost:" + catalogBackend.getAddress().getPort();
135+
}
136+
137+
private List<String> requests() {
138+
return requests;
139+
}
140+
141+
@Override
142+
public void close() {
143+
catalogBackend.stop(0);
144+
}
145+
}
146+
}

0 commit comments

Comments
 (0)