Skip to content

Commit

Permalink
[BugFix] Fix NPE when using exteral olap table to insert data
Browse files Browse the repository at this point in the history
Signed-off-by: drake_wang <[email protected]>
  • Loading branch information
wxl24life committed Nov 7, 2024
1 parent 6abe282 commit 93ed55e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ public void complete() throws UserException {
enableAutomaticPartition, automaticBucketSize, getOpenPartitions());
tSink.setPartition(partitionParam);
tSink.setLocation(createLocation(dstTable, partitionParam, enableReplicatedStorage, warehouseId));
tSink.setNodes_info(GlobalStateMgr.getCurrentState().createNodesInfo(warehouseId,
getSystemInfoService(dstTable)));
tSink.setNodes_info(GlobalStateMgr.getCurrentState().createNodesInfo(dstTable instanceof ExternalOlapTable,
warehouseId, getSystemInfoService(dstTable)));
tSink.setPartial_update_mode(this.partialUpdateMode);
tSink.setAutomatic_bucket_size(automaticBucketSize);
if (canUseColocateMVIndex(dstTable)) {
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,17 +514,37 @@ public JournalObservable getJournalObservable() {
return journalObservable;
}

public TNodesInfo createNodesInfo(boolean isExternalCluster, long warehouseId, SystemInfoService systemInfoService) {
TNodesInfo nodesInfo = new TNodesInfo();
if (isExternalCluster) {
// for external olap table scenario, use backends by default (no need to consider cluster mode)
for (Long id : systemInfoService.getBackendIds(false)) {
Backend backend = systemInfoService.getBackend(id);
nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getIP(), backend.getBrpcPort()));
}
} else {
nodesInfo = createNodesInfo(warehouseId, systemInfoService);
}
return nodesInfo;
}

public TNodesInfo createNodesInfo(long warehouseId, SystemInfoService systemInfoService) {
TNodesInfo nodesInfo = new TNodesInfo();
if (RunMode.isSharedDataMode()) {
List<Long> computeNodeIds = warehouseMgr.getAllComputeNodeIds(warehouseId);
for (Long cnId : computeNodeIds) {
ComputeNode cn = systemInfoService.getBackendOrComputeNode(cnId);
if (cn == null) {
continue;
}
nodesInfo.addToNodes(new TNodeInfo(cnId, 0, cn.getIP(), cn.getBrpcPort()));
}
} else {
for (Long id : systemInfoService.getBackendIds(false)) {
Backend backend = systemInfoService.getBackend(id);
if (backend == null) {
continue;
}
nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getIP(), backend.getBrpcPort()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.OperationType;
import com.starrocks.sql.ast.ModifyFrontendAddressClause;
import com.starrocks.system.Backend;
import com.starrocks.system.Frontend;
import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TNodesInfo;
import com.starrocks.utframe.UtFrameUtils;
import mockit.Expectations;
import mockit.Mock;
Expand Down Expand Up @@ -89,6 +92,9 @@ public void testSaveLoadHeader() throws Exception {
globalStateMgr.loadHeader(image2.getDataInputStream());
}

@Mocked
private WarehouseManager warehouseManager;

private GlobalStateMgr mockGlobalStateMgr() throws Exception {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();

Expand All @@ -114,6 +120,10 @@ private GlobalStateMgr mockGlobalStateMgr() throws Exception {
field4.setAccessible(true);
field4.set(globalStateMgr, nodeMgr);

Field field5 = globalStateMgr.getClass().getDeclaredField("warehouseMgr");
field5.setAccessible(true);
field5.set(globalStateMgr, warehouseManager);

return globalStateMgr;
}

Expand Down Expand Up @@ -310,4 +320,64 @@ public void testErrorOccursWhileRemovingClusterIdAndRoleWhenStartAtFirstTime() {
Assert.assertEquals(removeFileErrorMessage, suppressedExceptions[0].getMessage());
}
}
}

@Test
public void testCreateNodesInfo() throws Exception {
SystemInfoService systemInfoService = new SystemInfoService();
Backend backend = new Backend();
backend.setId(1234);
backend.setHost("127.0.0.1");
backend.setBePort(10000);
backend.setHttpPort(10001);
backend.setBrpcPort(10002);
backend.setAlive(true);
backend.setBackendState(Backend.BackendState.values()[1]);
systemInfoService.addBackend(backend);

GlobalStateMgr globalStateMgr = mockGlobalStateMgr();

{
boolean isExternalTable = true;
TNodesInfo nodesInfo =
globalStateMgr.createNodesInfo(isExternalTable, WarehouseManager.DEFAULT_WAREHOUSE_ID,
systemInfoService);
Assert.assertEquals(1, nodesInfo.nodes.size());
}

{
new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_NOTHING;
}
};

boolean isExternalTable = false;
TNodesInfo nodesInfo = GlobalStateMgr.getCurrentState()
.createNodesInfo(isExternalTable, WarehouseManager.DEFAULT_WAREHOUSE_ID, systemInfoService);
Assert.assertEquals(1, nodesInfo.nodes.size());
}

{
new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_DATA;
}
};

long warehouseId = 1000L;
new Expectations() {
{
warehouseManager.getAllComputeNodeIds(warehouseId);
result = List.of(1234L);
}
};

boolean isExternalTable = false;
TNodesInfo nodesInfo = GlobalStateMgr.getCurrentState()
.createNodesInfo(isExternalTable, warehouseId, systemInfoService);
Assert.assertEquals(1, nodesInfo.nodes.size());
}
}
}

0 comments on commit 93ed55e

Please sign in to comment.