Skip to content

Commit

Permalink
add mow check
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Aug 16, 2024
1 parent 9cbe4d3 commit e8f0500
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public static class FeClient {

public static final String GET_LOAD_INFO = "/api/%s/_load_info";

public static final String GET_DDL = "/api/_get_ddl";

private final List<String> feNodes;

private final String auth;
Expand All @@ -87,12 +89,12 @@ public FeClient(String feAddresses, String user, String password) {

private List<String> parseFeNodes(String feAddresses) {
if (StringUtils.isBlank(feAddresses)) {
throw new IllegalArgumentException();
throw new IllegalArgumentException("feAddresses is empty");
}
String[] feArr = feAddresses.split(",");
if (Arrays.stream(feArr).map(x -> x.split(":"))
.anyMatch(x -> x.length != 2 || x[0].isEmpty() || x[1].isEmpty())) {
throw new IllegalArgumentException();
throw new IllegalArgumentException("feAddresses contains invalid format, " + feAddresses);
}
return Arrays.stream(feArr).collect(Collectors.toList());
}
Expand Down Expand Up @@ -178,7 +180,7 @@ public void updateIngestionLoad(String db, Long loadId, Map<String, String> stat
res.getCode(), res.getMsg(), res.getData().isNull() ? null : res.getData().asText()));
}
} catch (IOException | URISyntaxException e) {
throw new SparkLoadException("update spark load failed", e);
throw new SparkLoadException("update load failed", e);
}

}
Expand All @@ -202,7 +204,32 @@ public LoadInfo getLoadInfo(String db, String label) throws SparkLoadException {
}
return res.getJobInfo();
} catch (IOException | URISyntaxException e) {
throw new SparkLoadException("update spark load failed", e);
throw new SparkLoadException("get load info failed", e);
}

}

public String getDDL(String db, String table) throws SparkLoadException {

HttpGet httpGet = new HttpGet();
addCommonHeaders(httpGet);
try {
Map<String, String> params = new HashMap<>();
params.put("db", db);
params.put("table", table);
String content = executeRequest(httpGet, GET_DDL, params);
if (StringUtils.isBlank(content)) {
throw new SparkLoadException(String.format("request get ddl failed, path: %s", GET_DDL));
}
ResponseEntity res = JsonUtils.readValue(content, ResponseEntity.class);
if (res.getCode() != 0 || !res.getData().has("create_table")
|| res.getData().get("create_table").isEmpty()) {
throw new SparkLoadException(String.format("get ddl failed, status: %s, msg: %s, data: %s",
res.getCode(), res.getMsg(), JsonUtils.writeValueAsString(res.getData())));
}
return res.getData().get("create_table").get(0).asText();
} catch (IOException | URISyntaxException e) {
throw new SparkLoadException("get ddl failed", e);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.doris.config;

import org.apache.doris.SparkLoadRunner;
import org.apache.doris.client.DorisClient;
import org.apache.doris.common.Constants;
import org.apache.doris.common.enums.LoadMode;
import org.apache.doris.common.enums.TaskType;
import org.apache.doris.exception.SparkLoadException;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
Expand All @@ -30,6 +32,7 @@

import java.io.File;
import java.net.URI;
import java.sql.DriverManager;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -160,6 +163,16 @@ public void checkTaskInfo() {
Map<String, TaskInfo> tasks = getLoadTasks();
Preconditions.checkArgument(!tasks.isEmpty(), "loadTasks is empty");
for (Map.Entry<String, TaskInfo> entry : tasks.entrySet()) {
String table = entry.getKey();
try {
DorisClient.FeClient feClient = DorisClient.getFeClient(feAddresses, user, password);
String ddl = feClient.getDDL(database, table);
if (StringUtils.isNoneBlank(ddl) && ddl.contains("\"enable_unique_key_merge_on_write\" = \"true\"")) {
throw new IllegalArgumentException("Merge On Write is not supported");
}
} catch (SparkLoadException e) {
throw new IllegalArgumentException("check table failed", e);
}
TaskInfo taskInfo = entry.getValue();
switch (taskInfo.getType()) {
case HIVE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,18 @@ class DorisClientTest {

@Test
public void getFeClient() {
Assertions.assertThrows(IllegalArgumentException.class, () -> DorisClient.getFeClient("", "", ""));
Assertions.assertThrows(IllegalArgumentException.class, () -> DorisClient.getFeClient("127.0.0.1", "", ""));
Assertions.assertThrows(IllegalArgumentException.class, () -> DorisClient.getFeClient("127.0.0.1:", "", ""));
Assertions.assertThrows(IllegalArgumentException.class, () -> DorisClient.getFeClient(":8030", "", ""));
IllegalArgumentException e1 =
Assertions.assertThrows(IllegalArgumentException.class, () -> DorisClient.getFeClient("", "", ""));
Assertions.assertEquals("feAddresses is empty", e1.getMessage());
IllegalArgumentException e2 = Assertions.assertThrows(IllegalArgumentException.class,
() -> DorisClient.getFeClient("127.0.0.1", "", ""));
Assertions.assertEquals("feAddresses contains invalid format, 127.0.0.1", e2.getMessage());
IllegalArgumentException e3 = Assertions.assertThrows(IllegalArgumentException.class,
() -> DorisClient.getFeClient("127.0.0.1:", "", ""));
Assertions.assertEquals("feAddresses contains invalid format, 127.0.0.1:", e3.getMessage());
IllegalArgumentException e4 =
Assertions.assertThrows(IllegalArgumentException.class, () -> DorisClient.getFeClient(":8030", "", ""));
Assertions.assertEquals("feAddresses contains invalid format, :8030", e4.getMessage());
Assertions.assertDoesNotThrow(() -> DorisClient.getFeClient("127.0.0.1:8030", "", ""));
}

Expand Down Expand Up @@ -226,6 +234,73 @@ public CloseableHttpResponse execute(

}

@Test
public void getDDL() {

DorisClient.FeClient feClient = new DorisClient.FeClient("127.0.0.1:8030", "", "");

new MockUp<CloseableHttpClient>(CloseableHttpClient.class) {
@Mock
public CloseableHttpResponse execute(
final HttpUriRequest request) throws IOException, ClientProtocolException {
MockedCloseableHttpResponse response = new MockedCloseableHttpResponse();
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return response;
}
};
SparkLoadException e1 =
Assertions.assertThrows(SparkLoadException.class, () -> feClient.getDDL("db", "test"));
Assertions.assertEquals("request get ddl failed, path: /api/_get_ddl", e1.getMessage());

new MockUp<CloseableHttpClient>(CloseableHttpClient.class) {
@Mock
public CloseableHttpResponse execute(
final HttpUriRequest request) throws IOException, ClientProtocolException {
MockedCloseableHttpResponse response = new MockedCloseableHttpResponse();
response.setStatusCode(HttpStatus.SC_OK);
response.setEntity(new StringEntity("{\"code\":1,\"msg\":\"\",\"data\":{},\"count\":0}"));
return response;
}
};
SparkLoadException e2 =
Assertions.assertThrows(SparkLoadException.class, () -> feClient.getDDL("db", "test"));
Assertions.assertEquals("get ddl failed, status: 1, msg: , data: {}", e2.getMessage());

new MockUp<CloseableHttpClient>(CloseableHttpClient.class) {
@Mock
public CloseableHttpResponse execute(
final HttpUriRequest request) throws IOException, ClientProtocolException {
MockedCloseableHttpResponse response = new MockedCloseableHttpResponse();
response.setStatusCode(HttpStatus.SC_OK);
response.setEntity(new StringEntity("{\"code\":0,\"msg\":\"\",\"data\":{},\"count\":0}"));
return response;
}
};
SparkLoadException e3 =
Assertions.assertThrows(SparkLoadException.class, () -> feClient.getDDL("db", "test"));
Assertions.assertEquals("get ddl failed, status: 0, msg: , data: {}", e3.getMessage());

new MockUp<CloseableHttpClient>(CloseableHttpClient.class) {
@Mock
public CloseableHttpResponse execute(
final HttpUriRequest request) throws IOException, ClientProtocolException {
MockedCloseableHttpResponse response = new MockedCloseableHttpResponse();
response.setStatusCode(HttpStatus.SC_OK);
response.setEntity(new StringEntity("{\"code\":0,\"msg\":\"\"," +
"\"data\":{\"create_table\": [\"CREATE TABLE `tbl1` (\\n `k1` int(11) NULL " +
"COMMENT \\\"\\\",\\n `k2` int(11) NULL COMMENT \\\"\\\"\\n) ENGINE=OLAP\\n" +
"DUPLICATE KEY(`k1`, `k2`)\\nCOMMENT \\\"OLAP\\\"\\nDISTRIBUTED BY HASH(`k1`) BUCKETS 1\\n" +
"PROPERTIES (\\n\\\"replication_num\\\" = \\\"1\\\",\\n\\\"version_info\\\" = \\\"1,0\\\",\\n" +
"\\\"in_memory\\\" = \\\"false\\\",\\n\\\"storage_format\\\" = \\\"DEFAULT\\\"\\n);\"]\n}," +
"\"count\":0}"));
return response;
}
};
Assertions.assertDoesNotThrow(() -> feClient.getDDL("db", "test"));


}

private class MockedCloseableHttpResponse implements CloseableHttpResponse {

private StatusLine statusLine;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.apache.doris.config;


import org.apache.doris.client.DorisClient;
import org.apache.doris.common.enums.TaskType;
import org.apache.doris.exception.SparkLoadException;

import mockit.Mock;
import mockit.MockUp;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -46,12 +50,21 @@ public void checkFeAddress() {
public void checkTaskInfo() {

JobConfig jobConfig = new JobConfig();
jobConfig.setFeAddresses("127.0.0.1:8030");

jobConfig.setLoadTasks(new HashMap<>());
IllegalArgumentException e1 =
Assertions.assertThrows(IllegalArgumentException.class, jobConfig::checkTaskInfo);
Assertions.assertEquals("loadTasks is empty", e1.getMessage());

new MockUp<DorisClient.FeClient>(DorisClient.FeClient.class) {
@Mock
public String getDDL(String db, String table) throws SparkLoadException {
return "create table tbl1 (col1 int, col2 int, col3 int, col4 int) unique key (col1) properties (" +
"\"enable_unique_key_merge_on_write\" = \"false\")";
}
};

Map<String, JobConfig.TaskInfo> loadTasks1 = new HashMap<>();
JobConfig.TaskInfo taskInfo1 = new JobConfig.TaskInfo();
taskInfo1.setType(TaskType.FILE);
Expand Down Expand Up @@ -90,6 +103,16 @@ public void checkTaskInfo() {
taskInfo3.setHiveTable("tbl");
Assertions.assertDoesNotThrow(jobConfig::checkTaskInfo);

new MockUp<DorisClient.FeClient>(DorisClient.FeClient.class) {
@Mock
public String getDDL(String db, String table) throws SparkLoadException {
return "create table tbl1 (col1 int, col2 int, col3 int, col4 int) unique key (col1) properties (" +
"\"enable_unique_key_merge_on_write\" = \"true\")";
}
};
IllegalArgumentException e5 =
Assertions.assertThrows(IllegalArgumentException.class, jobConfig::checkTaskInfo);

}

@Test
Expand Down

0 comments on commit e8f0500

Please sign in to comment.