Skip to content

Commit

Permalink
Batch execute insertRelationalTablet (#13837)
Browse files Browse the repository at this point in the history
  • Loading branch information
HTHou authored Oct 21, 2024
1 parent 2340593 commit 931b5bd
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1072,14 +1072,11 @@ private long getLastFlushTime(long timePartitionID, IDeviceID deviceID) {
: Long.MAX_VALUE;
}

private boolean splitAndInsert(
private void split(
InsertTabletNode insertTabletNode,
int loc,
int endOffset,
TSStatus[] results,
long[] costsForMetrics) {
boolean noFailure = true;

Map<Long, List<int[]>[]> splitInfo) {
// before is first start point
int before = loc;
long beforeTime = insertTabletNode.getTimes()[before];
Expand All @@ -1088,7 +1085,6 @@ private boolean splitAndInsert(
// init flush time map
initFlushTimeMap(beforeTimePartition);

int insertCnt = 0;
// if is sequence
boolean isSequence = false;
while (loc < endOffset) {
Expand All @@ -1100,27 +1096,7 @@ private boolean splitAndInsert(
if (timePartitionId != beforeTimePartition) {
initFlushTimeMap(timePartitionId);
lastFlushTime = getLastFlushTime(timePartitionId, insertTabletNode.getDeviceID(loc));
// a new partition, insert the remaining of the previous partition
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode,
before,
loc,
isSequence,
results,
beforeTimePartition,
noFailure,
costsForMetrics)
&& noFailure;
if (before < loc) {
insertCnt += 1;
logger.debug(
"insertTabletToTsFileProcessor, insertCnt:{}, noFailure:{}, before:{}, loc:{}",
insertCnt,
noFailure,
before,
loc);
}
updateSplitInfo(splitInfo, beforeTimePartition, isSequence, new int[] {before, loc});
before = loc;
beforeTimePartition = timePartitionId;
isSequence = time > lastFlushTime;
Expand All @@ -1129,27 +1105,8 @@ private boolean splitAndInsert(
if (time > lastFlushTime) {
// the same partition and switch to sequence data
// insert previous range into unsequence
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode,
before,
loc,
isSequence,
results,
beforeTimePartition,
noFailure,
costsForMetrics)
&& noFailure;
updateSplitInfo(splitInfo, beforeTimePartition, isSequence, new int[] {before, loc});
before = loc;
if (before < loc) {
insertCnt += 1;
logger.debug(
"insertTabletToTsFileProcessor, insertCnt:{}, noFailure:{}, before:{}, loc:{}",
insertCnt,
noFailure,
before,
loc);
}
isSequence = true;
}
}
Expand All @@ -1159,26 +1116,68 @@ private boolean splitAndInsert(

// do not forget last part
if (before < loc) {
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode,
before,
loc,
isSequence,
results,
beforeTimePartition,
noFailure,
costsForMetrics)
&& noFailure;
insertCnt += 1;
logger.debug(
"insertTabletToTsFileProcessor, insertCnt:{}, noFailure:{}, before:{}, loc:{}",
insertCnt,
noFailure,
before,
loc);
updateSplitInfo(splitInfo, beforeTimePartition, isSequence, new int[] {before, loc});
}
}

private void updateSplitInfo(
Map<Long, List<int[]>[]> splitInfo, long partitionId, boolean isSequence, int[] newRange) {
if (newRange[0] >= newRange[1]) {
return;
}

List<int[]>[] rangeLists = splitInfo.computeIfAbsent(partitionId, k -> new List[2]);
List<int[]> rangeList = rangeLists[isSequence ? 1 : 0];
if (rangeList == null) {
rangeList = new ArrayList<>();
rangeLists[isSequence ? 1 : 0] = rangeList;
}
if (!rangeList.isEmpty()) {
int[] lastRange = rangeList.get(rangeList.size() - 1);
if (lastRange[1] == newRange[0]) {
lastRange[1] = newRange[1];
return;
}
}
rangeList.add(newRange);
}

private boolean doInsert(
InsertTabletNode insertTabletNode,
Map<Long, List<int[]>[]> splitMap,
TSStatus[] results,
long[] costsForMetrics) {
boolean noFailure = true;
for (Entry<Long, List<int[]>[]> entry : splitMap.entrySet()) {
long timePartitionId = entry.getKey();
List<int[]>[] rangeLists = entry.getValue();
List<int[]> sequenceRangeList = rangeLists[1];
if (sequenceRangeList != null) {
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode,
sequenceRangeList,
true,
results,
timePartitionId,
noFailure,
costsForMetrics)
&& noFailure;
}
List<int[]> unSequenceRangeList = rangeLists[0];
if (unSequenceRangeList != null) {
noFailure =
insertTabletToTsFileProcessor(
insertTabletNode,
unSequenceRangeList,
false,
results,
timePartitionId,
noFailure,
costsForMetrics)
&& noFailure;
}
}
return noFailure;
}

Expand Down Expand Up @@ -1227,12 +1226,14 @@ private boolean executeInsertTablet(
List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount());
int start = loc;
Map<Long, List<int[]>[]> splitInfo = new HashMap<>();
for (Pair<IDeviceID, Integer> deviceEndOffsetPair : deviceEndOffsetPairs) {
int end = deviceEndOffsetPair.getRight();
noFailure =
noFailure && splitAndInsert(insertTabletNode, start, end, results, costsForMetrics);
split(insertTabletNode, start, end, splitInfo);
start = end;
}
noFailure = noFailure && doInsert(insertTabletNode, splitInfo, results, costsForMetrics);

if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
&& !insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
// disable updating last cache on follower
Expand Down Expand Up @@ -1309,39 +1310,40 @@ private int checkTTL(
*
* @param insertTabletNode insert a tablet of a device
* @param sequence whether is sequence
* @param start start index of rows to be inserted in insertTabletPlan
* @param end end index of rows to be inserted in insertTabletPlan
* @param rangeList start and end index list of rows to be inserted in insertTabletPlan
* @param results result array
* @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true otherwise
*/
private boolean insertTabletToTsFileProcessor(
InsertTabletNode insertTabletNode,
int start,
int end,
List<int[]> rangeList,
boolean sequence,
TSStatus[] results,
long timePartitionId,
boolean noFailure,
long[] costsForMetrics) {
// return when start >= end or all measurement failed
if (start >= end || insertTabletNode.allMeasurementFailed()) {
if (insertTabletNode.allMeasurementFailed()) {
if (logger.isDebugEnabled()) {
logger.debug(
"Won't insert tablet {}, because {}",
insertTabletNode.getSearchIndex(),
start >= end ? "start >= end" : "insertTabletNode allMeasurementFailed");
"insertTabletNode allMeasurementFailed");
}
return true;
}

TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
if (tsFileProcessor == null) {
for (int i = start; i < end; i++) {
results[i] =
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR,
"can not create TsFileProcessor, timePartitionId: " + timePartitionId);
for (int[] rangePair : rangeList) {
int start = rangePair[0];
int end = rangePair[1];
for (int i = start; i < end; i++) {
results[i] =
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR,
"can not create TsFileProcessor, timePartitionId: " + timePartitionId);
}
}
return false;
}
Expand All @@ -1351,7 +1353,7 @@ private boolean insertTabletToTsFileProcessor(

try {
tsFileProcessor.insertTablet(
insertTabletNode, start, end, results, noFailure, costsForMetrics);
insertTabletNode, rangeList, results, noFailure, costsForMetrics);
} catch (WriteProcessRejectException e) {
logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
return false;
Expand Down
Loading

0 comments on commit 931b5bd

Please sign in to comment.