Skip to content

Commit bd23cb6

Browse files
branch-4.0: [feature](storage) Add OzoneProperties to support Apache Ozone #60809 (#60896)
Cherry-picked from #60809 Co-authored-by: Chenjunwei <[email protected]>
1 parent f63a43f commit bd23cb6

File tree

4 files changed

+345
-1
lines changed

4 files changed

+345
-1
lines changed

fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,10 @@ private static StorageProperties findStorageProperties(StorageProperties.Type ty
313313
&& storagePropertiesMap.containsKey(StorageProperties.Type.MINIO)) {
314314
return storagePropertiesMap.get(StorageProperties.Type.MINIO);
315315
}
316+
if (type == StorageProperties.Type.S3
317+
&& storagePropertiesMap.containsKey(StorageProperties.Type.OZONE)) {
318+
return storagePropertiesMap.get(StorageProperties.Type.OZONE);
319+
}
316320

317321
// Step 3: Compatibility fallback based on schema
318322
// In previous configurations, the schema name may not strictly match the actual storage type.
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.property.storage;
19+
20+
import org.apache.doris.datasource.property.ConnectorProperty;
21+
22+
import com.google.common.collect.ImmutableSet;
23+
import lombok.Getter;
24+
import lombok.Setter;
25+
import org.apache.commons.lang3.StringUtils;
26+
27+
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.regex.Pattern;
30+
31+
public class OzoneProperties extends AbstractS3CompatibleProperties {
32+
33+
@Setter
34+
@Getter
35+
@ConnectorProperty(names = {"ozone.endpoint", "s3.endpoint"},
36+
required = false,
37+
description = "The endpoint of Ozone S3 Gateway.")
38+
protected String endpoint = "";
39+
40+
@Setter
41+
@Getter
42+
@ConnectorProperty(names = {"ozone.region", "s3.region"},
43+
required = false,
44+
description = "The region of Ozone S3 Gateway.")
45+
protected String region = "us-east-1";
46+
47+
@Getter
48+
@ConnectorProperty(names = {"ozone.access_key", "s3.access_key", "s3.access-key-id"},
49+
required = false,
50+
sensitive = true,
51+
description = "The access key of Ozone S3 Gateway.")
52+
protected String accessKey = "";
53+
54+
@Getter
55+
@ConnectorProperty(names = {"ozone.secret_key", "s3.secret_key", "s3.secret-access-key"},
56+
required = false,
57+
sensitive = true,
58+
description = "The secret key of Ozone S3 Gateway.")
59+
protected String secretKey = "";
60+
61+
@Getter
62+
@ConnectorProperty(names = {"ozone.session_token", "s3.session_token", "s3.session-token"},
63+
required = false,
64+
sensitive = true,
65+
description = "The session token of Ozone S3 Gateway.")
66+
protected String sessionToken = "";
67+
68+
@Getter
69+
@ConnectorProperty(names = {"ozone.connection.maximum", "s3.connection.maximum"},
70+
required = false,
71+
description = "Maximum number of connections.")
72+
protected String maxConnections = "100";
73+
74+
@Getter
75+
@ConnectorProperty(names = {"ozone.connection.request.timeout", "s3.connection.request.timeout"},
76+
required = false,
77+
description = "Request timeout in seconds.")
78+
protected String requestTimeoutS = "10000";
79+
80+
@Getter
81+
@ConnectorProperty(names = {"ozone.connection.timeout", "s3.connection.timeout"},
82+
required = false,
83+
description = "Connection timeout in seconds.")
84+
protected String connectionTimeoutS = "10000";
85+
86+
@Setter
87+
@Getter
88+
@ConnectorProperty(names = {"ozone.use_path_style", "use_path_style", "s3.path-style-access"},
89+
required = false,
90+
description = "Whether to use path style URL for the storage.")
91+
protected String usePathStyle = "true";
92+
93+
@Setter
94+
@Getter
95+
@ConnectorProperty(names = {"ozone.force_parsing_by_standard_uri", "force_parsing_by_standard_uri"},
96+
required = false,
97+
description = "Whether to use path style URL for the storage.")
98+
protected String forceParsingByStandardUrl = "false";
99+
100+
protected OzoneProperties(Map<String, String> origProps) {
101+
super(Type.OZONE, origProps);
102+
}
103+
104+
@Override
105+
public void initNormalizeAndCheckProps() {
106+
hydrateFromOriginalProps();
107+
super.initNormalizeAndCheckProps();
108+
hydrateFromOriginalProps();
109+
}
110+
111+
private void hydrateFromOriginalProps() {
112+
endpoint = StringUtils.firstNonBlank(
113+
endpoint,
114+
origProps.get("ozone.endpoint"),
115+
origProps.get("s3.endpoint"));
116+
region = StringUtils.firstNonBlank(region, origProps.get("ozone.region"), origProps.get("s3.region"));
117+
accessKey = StringUtils.firstNonBlank(
118+
accessKey,
119+
origProps.get("ozone.access_key"),
120+
origProps.get("s3.access_key"),
121+
origProps.get("s3.access-key-id"));
122+
secretKey = StringUtils.firstNonBlank(
123+
secretKey,
124+
origProps.get("ozone.secret_key"),
125+
origProps.get("s3.secret_key"),
126+
origProps.get("s3.secret-access-key"));
127+
sessionToken = StringUtils.firstNonBlank(sessionToken, origProps.get("ozone.session_token"),
128+
origProps.get("s3.session_token"), origProps.get("s3.session-token"));
129+
usePathStyle = StringUtils.firstNonBlank(usePathStyle, origProps.get("ozone.use_path_style"),
130+
origProps.get("use_path_style"), origProps.get("s3.path-style-access"));
131+
forceParsingByStandardUrl = StringUtils.firstNonBlank(forceParsingByStandardUrl,
132+
origProps.get("ozone.force_parsing_by_standard_uri"),
133+
origProps.get("force_parsing_by_standard_uri"));
134+
}
135+
136+
@Override
137+
protected Set<Pattern> endpointPatterns() {
138+
return ImmutableSet.of(Pattern.compile("^(?:https?://)?[a-zA-Z0-9.-]+(?::\\d+)?$"));
139+
}
140+
141+
@Override
142+
protected void setEndpointIfPossible() {
143+
super.setEndpointIfPossible();
144+
if (StringUtils.isBlank(getEndpoint())) {
145+
throw new IllegalArgumentException("Property ozone.endpoint is required.");
146+
}
147+
}
148+
149+
@Override
150+
protected Set<String> schemas() {
151+
return ImmutableSet.of("s3", "s3a", "s3n");
152+
}
153+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public abstract class StorageProperties extends ConnectionProperties {
4242
public static final String FS_S3_SUPPORT = "fs.s3.support";
4343
public static final String FS_GCS_SUPPORT = "fs.gcs.support";
4444
public static final String FS_MINIO_SUPPORT = "fs.minio.support";
45+
public static final String FS_OZONE_SUPPORT = "fs.ozone.support";
4546
public static final String FS_BROKER_SUPPORT = "fs.broker.support";
4647
public static final String FS_AZURE_SUPPORT = "fs.azure.support";
4748
public static final String FS_OSS_SUPPORT = "fs.oss.support";
@@ -67,6 +68,7 @@ public enum Type {
6768
GCS,
6869
OSS_HDFS,
6970
MINIO,
71+
OZONE,
7072
AZURE,
7173
BROKER,
7274
LOCAL,
@@ -203,7 +205,9 @@ public static StorageProperties createPrimary(Map<String, String> origProps) {
203205
props -> (isFsSupport(props, FS_AZURE_SUPPORT)
204206
|| AzureProperties.guessIsMe(props)) ? new AzureProperties(props) : null,
205207
props -> (isFsSupport(props, FS_MINIO_SUPPORT)
206-
|| MinioProperties.guessIsMe(props)) ? new MinioProperties(props) : null,
208+
|| (!isFsSupport(props, FS_OZONE_SUPPORT)
209+
&& MinioProperties.guessIsMe(props))) ? new MinioProperties(props) : null,
210+
props -> isFsSupport(props, FS_OZONE_SUPPORT) ? new OzoneProperties(props) : null,
207211
props -> (isFsSupport(props, FS_BROKER_SUPPORT)
208212
|| BrokerProperties.guessIsMe(props)) ? new BrokerProperties(props) : null,
209213
props -> (isFsSupport(props, FS_LOCAL_SUPPORT)
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.property.storage;
19+
20+
import org.apache.doris.common.ExceptionChecker;
21+
import org.apache.doris.common.UserException;
22+
import org.apache.doris.common.util.LocationPath;
23+
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
28+
import java.util.HashMap;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.function.Function;
32+
import java.util.stream.Collectors;
33+
34+
public class OzonePropertiesTest {
35+
private Map<String, String> origProps;
36+
37+
@BeforeEach
38+
public void setup() {
39+
origProps = new HashMap<>();
40+
}
41+
42+
@Test
43+
public void testValidOzoneConfiguration() {
44+
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
45+
origProps.put("ozone.endpoint", "http://ozone-s3g:9878");
46+
origProps.put("ozone.access_key", "hadoop");
47+
origProps.put("ozone.secret_key", "hadoop");
48+
49+
OzoneProperties ozoneProperties = (OzoneProperties) StorageProperties.createPrimary(origProps);
50+
Map<String, String> backendProps = ozoneProperties.getBackendConfigProperties();
51+
52+
Assertions.assertEquals(StorageProperties.Type.OZONE, ozoneProperties.getType());
53+
Assertions.assertEquals("http://ozone-s3g:9878", ozoneProperties.getEndpoint());
54+
Assertions.assertEquals("hadoop", ozoneProperties.getAccessKey());
55+
Assertions.assertEquals("hadoop", ozoneProperties.getSecretKey());
56+
Assertions.assertEquals("us-east-1", ozoneProperties.getRegion());
57+
Assertions.assertEquals("true", ozoneProperties.getUsePathStyle());
58+
59+
Assertions.assertEquals("http://ozone-s3g:9878", backendProps.get("AWS_ENDPOINT"));
60+
Assertions.assertEquals("hadoop", backendProps.get("AWS_ACCESS_KEY"));
61+
Assertions.assertEquals("hadoop", backendProps.get("AWS_SECRET_KEY"));
62+
Assertions.assertEquals("us-east-1", backendProps.get("AWS_REGION"));
63+
Assertions.assertEquals("true", backendProps.get("use_path_style"));
64+
}
65+
66+
@Test
67+
public void testS3PropertiesBinding() {
68+
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
69+
origProps.put("s3.endpoint", "http://ozone-s3g:9878");
70+
origProps.put("s3.access_key", "hadoop");
71+
origProps.put("s3.secret_key", "hadoop");
72+
origProps.put("use_path_style", "true");
73+
origProps.put("s3.region", "us-east-1");
74+
75+
OzoneProperties ozoneProperties = (OzoneProperties) StorageProperties.createPrimary(origProps);
76+
Map<String, String> backendProps = ozoneProperties.getBackendConfigProperties();
77+
78+
Assertions.assertEquals("http://ozone-s3g:9878", ozoneProperties.getEndpoint());
79+
Assertions.assertEquals("hadoop", ozoneProperties.getAccessKey());
80+
Assertions.assertEquals("hadoop", ozoneProperties.getSecretKey());
81+
Assertions.assertEquals("true", ozoneProperties.getUsePathStyle());
82+
83+
Assertions.assertEquals("http://ozone-s3g:9878", backendProps.get("AWS_ENDPOINT"));
84+
Assertions.assertEquals("hadoop", backendProps.get("AWS_ACCESS_KEY"));
85+
Assertions.assertEquals("hadoop", backendProps.get("AWS_SECRET_KEY"));
86+
}
87+
88+
@Test
89+
public void testFsS3aPropertiesAreNotSupported() {
90+
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
91+
origProps.put("fs.s3a.endpoint", "http://ozone-s3g:9878");
92+
origProps.put("fs.s3a.access.key", "hadoop");
93+
origProps.put("fs.s3a.secret.key", "hadoop");
94+
95+
ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class,
96+
"Property ozone.endpoint is required.",
97+
() -> StorageProperties.createPrimary(origProps));
98+
}
99+
100+
@Test
101+
public void testCreateAllWithDefaultFs() throws UserException {
102+
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
103+
origProps.put("fs.defaultFS", "s3a://dn-data/");
104+
origProps.put("s3.endpoint", "http://ozone-s3g:9878");
105+
origProps.put("s3.access_key", "hadoop");
106+
origProps.put("s3.secret_key", "hadoop");
107+
origProps.put("use_path_style", "true");
108+
109+
List<StorageProperties> properties = StorageProperties.createAll(origProps);
110+
Assertions.assertEquals(HdfsProperties.class, properties.get(0).getClass());
111+
Assertions.assertEquals(OzoneProperties.class, properties.get(1).getClass());
112+
113+
Map<StorageProperties.Type, StorageProperties> propertiesMap = properties.stream()
114+
.collect(Collectors.toMap(StorageProperties::getType, Function.identity()));
115+
LocationPath locationPath = LocationPath.of("s3a://dn-data/warehouse/test_table", propertiesMap);
116+
Assertions.assertTrue(locationPath.getStorageProperties() instanceof OzoneProperties);
117+
}
118+
119+
@Test
120+
public void testCreateAllWithDefaultFsAndOzoneProperties() throws UserException {
121+
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
122+
origProps.put("fs.defaultFS", "s3a://dn-data/");
123+
origProps.put("ozone.endpoint", "http://ozone-s3g:9878");
124+
origProps.put("ozone.access_key", "hadoop");
125+
origProps.put("ozone.secret_key", "hadoop");
126+
origProps.put("ozone.use_path_style", "true");
127+
origProps.put("ozone.region", "us-east-1");
128+
129+
List<StorageProperties> properties = StorageProperties.createAll(origProps);
130+
Assertions.assertEquals(HdfsProperties.class, properties.get(0).getClass());
131+
Assertions.assertEquals(OzoneProperties.class, properties.get(1).getClass());
132+
133+
OzoneProperties ozoneProperties = (OzoneProperties) properties.get(1);
134+
Assertions.assertEquals("hadoop", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.access.key"));
135+
Assertions.assertEquals("hadoop", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.secret.key"));
136+
Assertions.assertEquals("http://ozone-s3g:9878", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.endpoint"));
137+
Assertions.assertEquals("us-east-1", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.endpoint.region"));
138+
Assertions.assertEquals("true", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.path.style.access"));
139+
}
140+
141+
@Test
142+
public void testMissingAccessKeyOrSecretKey() {
143+
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
144+
origProps.put("ozone.endpoint", "http://ozone-s3g:9878");
145+
origProps.put("ozone.access_key", "hadoop");
146+
ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class,
147+
"Both the access key and the secret key must be set.",
148+
() -> StorageProperties.createPrimary(origProps));
149+
150+
origProps.remove("ozone.access_key");
151+
origProps.put("ozone.secret_key", "hadoop");
152+
ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class,
153+
"Both the access key and the secret key must be set.",
154+
() -> StorageProperties.createPrimary(origProps));
155+
}
156+
157+
@Test
158+
public void testMissingEndpoint() {
159+
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
160+
origProps.put("ozone.access_key", "hadoop");
161+
origProps.put("ozone.secret_key", "hadoop");
162+
ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class,
163+
"Property ozone.endpoint is required.",
164+
() -> StorageProperties.createPrimary(origProps));
165+
}
166+
167+
@Test
168+
public void testRequireExplicitFsOzoneSupport() throws UserException {
169+
origProps.put("ozone.endpoint", "http://127.0.0.1:9878");
170+
origProps.put("ozone.access_key", "hadoop");
171+
origProps.put("ozone.secret_key", "hadoop");
172+
173+
List<StorageProperties> propertiesWithoutFlag = StorageProperties.createAll(origProps);
174+
Assertions.assertEquals(1, propertiesWithoutFlag.size());
175+
Assertions.assertEquals(HdfsProperties.class, propertiesWithoutFlag.get(0).getClass());
176+
177+
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
178+
List<StorageProperties> propertiesWithFlag = StorageProperties.createAll(origProps);
179+
Assertions.assertEquals(2, propertiesWithFlag.size());
180+
Assertions.assertEquals(HdfsProperties.class, propertiesWithFlag.get(0).getClass());
181+
Assertions.assertEquals(OzoneProperties.class, propertiesWithFlag.get(1).getClass());
182+
}
183+
}

0 commit comments

Comments
 (0)