diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index d7281c188072..1772521a4982 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1072,14 +1072,11 @@ private long getLastFlushTime(long timePartitionID, IDeviceID deviceID) { : Long.MAX_VALUE; } - private boolean splitAndInsert( + private void split( InsertTabletNode insertTabletNode, int loc, int endOffset, - TSStatus[] results, - long[] costsForMetrics) { - boolean noFailure = true; - + Map[]> splitInfo) { // before is first start point int before = loc; long beforeTime = insertTabletNode.getTimes()[before]; @@ -1088,7 +1085,6 @@ private boolean splitAndInsert( // init flush time map initFlushTimeMap(beforeTimePartition); - int insertCnt = 0; // if is sequence boolean isSequence = false; while (loc < endOffset) { @@ -1100,27 +1096,7 @@ private boolean splitAndInsert( if (timePartitionId != beforeTimePartition) { initFlushTimeMap(timePartitionId); lastFlushTime = getLastFlushTime(timePartitionId, insertTabletNode.getDeviceID(loc)); - // a new partition, insert the remaining of the previous partition - noFailure = - insertTabletToTsFileProcessor( - insertTabletNode, - before, - loc, - isSequence, - results, - beforeTimePartition, - noFailure, - costsForMetrics) - && noFailure; - if (before < loc) { - insertCnt += 1; - logger.debug( - "insertTabletToTsFileProcessor, insertCnt:{}, noFailure:{}, before:{}, loc:{}", - insertCnt, - noFailure, - before, - loc); - } + updateSplitInfo(splitInfo, beforeTimePartition, isSequence, new int[] {before, loc}); before = loc; beforeTimePartition = timePartitionId; isSequence = time > lastFlushTime; @@ -1129,27 +1105,8 @@ private boolean splitAndInsert( if (time > lastFlushTime) { // the same partition and switch to sequence data // insert previous range into unsequence - noFailure = - insertTabletToTsFileProcessor( - insertTabletNode, - before, - loc, - isSequence, - results, - beforeTimePartition, - noFailure, - costsForMetrics) - && noFailure; + updateSplitInfo(splitInfo, beforeTimePartition, isSequence, new int[] {before, loc}); before = loc; - if (before < loc) { - insertCnt += 1; - logger.debug( - "insertTabletToTsFileProcessor, insertCnt:{}, noFailure:{}, before:{}, loc:{}", - insertCnt, - noFailure, - before, - loc); - } isSequence = true; } } @@ -1159,26 +1116,68 @@ private boolean splitAndInsert( // do not forget last part if (before < loc) { - noFailure = - insertTabletToTsFileProcessor( - insertTabletNode, - before, - loc, - isSequence, - results, - beforeTimePartition, - noFailure, - costsForMetrics) - && noFailure; - insertCnt += 1; - logger.debug( - "insertTabletToTsFileProcessor, insertCnt:{}, noFailure:{}, before:{}, loc:{}", - insertCnt, - noFailure, - before, - loc); + updateSplitInfo(splitInfo, beforeTimePartition, isSequence, new int[] {before, loc}); + } + } + + private void updateSplitInfo( + Map[]> splitInfo, long partitionId, boolean isSequence, int[] newRange) { + if (newRange[0] >= newRange[1]) { + return; + } + + List[] rangeLists = splitInfo.computeIfAbsent(partitionId, k -> new List[2]); + List rangeList = rangeLists[isSequence ? 1 : 0]; + if (rangeList == null) { + rangeList = new ArrayList<>(); + rangeLists[isSequence ? 1 : 0] = rangeList; + } + if (!rangeList.isEmpty()) { + int[] lastRange = rangeList.get(rangeList.size() - 1); + if (lastRange[1] == newRange[0]) { + lastRange[1] = newRange[1]; + return; + } } + rangeList.add(newRange); + } + private boolean doInsert( + InsertTabletNode insertTabletNode, + Map[]> splitMap, + TSStatus[] results, + long[] costsForMetrics) { + boolean noFailure = true; + for (Entry[]> entry : splitMap.entrySet()) { + long timePartitionId = entry.getKey(); + List[] rangeLists = entry.getValue(); + List sequenceRangeList = rangeLists[1]; + if (sequenceRangeList != null) { + noFailure = + insertTabletToTsFileProcessor( + insertTabletNode, + sequenceRangeList, + true, + results, + timePartitionId, + noFailure, + costsForMetrics) + && noFailure; + } + List unSequenceRangeList = rangeLists[0]; + if (unSequenceRangeList != null) { + noFailure = + insertTabletToTsFileProcessor( + insertTabletNode, + unSequenceRangeList, + false, + results, + timePartitionId, + noFailure, + costsForMetrics) + && noFailure; + } + } return noFailure; } @@ -1227,12 +1226,14 @@ private boolean executeInsertTablet( List> deviceEndOffsetPairs = insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount()); int start = loc; + Map[]> splitInfo = new HashMap<>(); for (Pair deviceEndOffsetPair : deviceEndOffsetPairs) { int end = deviceEndOffsetPair.getRight(); - noFailure = - noFailure && splitAndInsert(insertTabletNode, start, end, results, costsForMetrics); + split(insertTabletNode, start, end, splitInfo); start = end; } + noFailure = noFailure && doInsert(insertTabletNode, splitInfo, results, costsForMetrics); + if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() && !insertTabletNode.isGeneratedByRemoteConsensusLeader()) { // disable updating last cache on follower @@ -1309,39 +1310,40 @@ private int checkTTL( * * @param insertTabletNode insert a tablet of a device * @param sequence whether is sequence - * @param start start index of rows to be inserted in insertTabletPlan - * @param end end index of rows to be inserted in insertTabletPlan + * @param rangeList start and end index list of rows to be inserted in insertTabletPlan * @param results result array * @param timePartitionId time partition id * @return false if any failure occurs when inserting the tablet, true otherwise */ private boolean insertTabletToTsFileProcessor( InsertTabletNode insertTabletNode, - int start, - int end, + List rangeList, boolean sequence, TSStatus[] results, long timePartitionId, boolean noFailure, long[] costsForMetrics) { - // return when start >= end or all measurement failed - if (start >= end || insertTabletNode.allMeasurementFailed()) { + if (insertTabletNode.allMeasurementFailed()) { if (logger.isDebugEnabled()) { logger.debug( "Won't insert tablet {}, because {}", insertTabletNode.getSearchIndex(), - start >= end ? "start >= end" : "insertTabletNode allMeasurementFailed"); + "insertTabletNode allMeasurementFailed"); } return true; } TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); if (tsFileProcessor == null) { - for (int i = start; i < end; i++) { - results[i] = - RpcUtils.getStatus( - TSStatusCode.INTERNAL_SERVER_ERROR, - "can not create TsFileProcessor, timePartitionId: " + timePartitionId); + for (int[] rangePair : rangeList) { + int start = rangePair[0]; + int end = rangePair[1]; + for (int i = start; i < end; i++) { + results[i] = + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, + "can not create TsFileProcessor, timePartitionId: " + timePartitionId); + } } return false; } @@ -1351,7 +1353,7 @@ private boolean insertTabletToTsFileProcessor( try { tsFileProcessor.insertTablet( - insertTabletNode, start, end, results, noFailure, costsForMetrics); + insertTabletNode, rangeList, results, noFailure, costsForMetrics); } catch (WriteProcessRejectException e) { logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage()); return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index f165905484b0..c77adc632339 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -451,26 +451,32 @@ private void createNewWorkingMemTable() { private long[] scheduleMemoryBlock( InsertTabletNode insertTabletNode, - int start, - int end, + List rangeList, TSStatus[] results, boolean noFailure, long[] costsForMetrics) throws WriteProcessException { - long[] memIncrements; - try { - long memControlStartTime = System.nanoTime(); - memIncrements = checkMemCost(insertTabletNode, start, end, noFailure, results); - // recordScheduleMemoryBlockCost - costsForMetrics[1] += System.nanoTime() - memControlStartTime; - } catch (WriteProcessException e) { - for (int i = start; i < end; i++) { - results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage()); + long memControlStartTime = System.nanoTime(); + long[] totalMemIncrements = new long[NUM_MEM_TO_ESTIMATE]; + for (int[] range : rangeList) { + int start = range[0]; + int end = range[1]; + try { + long[] memIncrements = checkMemCost(insertTabletNode, start, end, noFailure, results); + for (int i = 0; i < memIncrements.length; i++) { + totalMemIncrements[i] += memIncrements[i]; + } + } catch (WriteProcessException e) { + for (int i = start; i < end; i++) { + results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage()); + } + throw new WriteProcessException(e); } - throw new WriteProcessException(e); } + // recordScheduleMemoryBlockCost + costsForMetrics[1] += System.nanoTime() - memControlStartTime; - return memIncrements; + return totalMemIncrements; } private long[] checkMemCost( @@ -526,14 +532,12 @@ private long[] checkAlignedMemCost( * non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5} * * @param insertTabletNode insert a tablet of a device - * @param start start index of rows to be inserted in insertTabletPlan - * @param end end index of rows to be inserted in insertTabletPlan + * @param rangeList start and end index list of rows to be inserted in insertTabletPlan * @param results result array */ public void insertTablet( InsertTabletNode insertTabletNode, - int start, - int end, + List rangeList, TSStatus[] results, boolean noFailure, long[] costsForMetrics) @@ -542,18 +546,22 @@ public void insertTablet( ensureMemTable(costsForMetrics); long[] memIncrements = - scheduleMemoryBlock(insertTabletNode, start, end, results, noFailure, costsForMetrics); + scheduleMemoryBlock(insertTabletNode, rangeList, results, noFailure, costsForMetrics); long startTime = System.nanoTime(); WALFlushListener walFlushListener; try { - walFlushListener = walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end); + walFlushListener = walNode.log(workMemTable.getMemTableId(), insertTabletNode, rangeList); if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { throw walFlushListener.getCause(); } } catch (Exception e) { - for (int i = start; i < end; i++) { - results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + for (int[] rangePair : rangeList) { + int start = rangePair[0]; + int end = rangePair[1]; + for (int i = start; i < end; i++) { + results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + } } rollbackMemoryInfo(memIncrements); throw new WriteProcessException(e); @@ -576,42 +584,47 @@ public void insertTablet( insertTabletNode, tsFileResource); - try { - if (insertTabletNode.isAligned()) { - workMemTable.insertAlignedTablet(insertTabletNode, start, end, noFailure ? null : results); - } else { - workMemTable.insertTablet(insertTabletNode, start, end); + for (int[] rangePair : rangeList) { + int start = rangePair[0]; + int end = rangePair[1]; + try { + if (insertTabletNode.isAligned()) { + workMemTable.insertAlignedTablet( + insertTabletNode, start, end, noFailure ? null : results); + } else { + workMemTable.insertTablet(insertTabletNode, start, end); + } + } catch (WriteProcessException e) { + for (int i = start; i < end; i++) { + results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + } + throw new WriteProcessException(e); } - } catch (WriteProcessException e) { for (int i = start; i < end; i++) { - results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + results[i] = RpcUtils.SUCCESS_STATUS; } - throw new WriteProcessException(e); - } - for (int i = start; i < end; i++) { - results[i] = RpcUtils.SUCCESS_STATUS; - } - final List> deviceEndOffsetPairs = - insertTabletNode.splitByDevice(start, end); - tsFileResource.updateStartTime( - deviceEndOffsetPairs.get(0).left, insertTabletNode.getTimes()[start]); - if (!sequence) { - // For sequence tsfile, we update the endTime only when the file is prepared to be closed. - // For unsequence tsfile, we have to update the endTime for each insertion. - tsFileResource.updateEndTime( - deviceEndOffsetPairs.get(0).left, - insertTabletNode.getTimes()[deviceEndOffsetPairs.get(0).right - 1]); - } - for (int i = 1; i < deviceEndOffsetPairs.size(); i++) { - // the end offset of i - 1 is the start offset of i + final List> deviceEndOffsetPairs = + insertTabletNode.splitByDevice(start, end); tsFileResource.updateStartTime( - deviceEndOffsetPairs.get(i).left, - insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i - 1).right]); + deviceEndOffsetPairs.get(0).left, insertTabletNode.getTimes()[start]); if (!sequence) { + // For sequence tsfile, we update the endTime only when the file is prepared to be closed. + // For unsequence tsfile, we have to update the endTime for each insertion. tsFileResource.updateEndTime( + deviceEndOffsetPairs.get(0).left, + insertTabletNode.getTimes()[deviceEndOffsetPairs.get(0).right - 1]); + } + for (int i = 1; i < deviceEndOffsetPairs.size(); i++) { + // the end offset of i - 1 is the start offset of i + tsFileResource.updateStartTime( deviceEndOffsetPairs.get(i).left, - insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i).right - 1]); + insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i - 1).right]); + if (!sequence) { + tsFileResource.updateEndTime( + deviceEndOffsetPairs.get(i).left, + insertTabletNode.getTimes()[deviceEndOffsetPairs.get(i).right - 1]); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java index 8b327b4880be..994c0d61279f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java @@ -24,6 +24,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Objects; /** This entry class stores info for persistence. */ @@ -42,19 +46,23 @@ public WALInfoEntry(long memTableId, WALEntryValue value, boolean wait) { public WALInfoEntry(long memTableId, WALEntryValue value) { this(memTableId, value, config.getWalMode() == WALMode.SYNC); if (value instanceof InsertTabletNode) { - tabletInfo = new TabletInfo(0, ((InsertTabletNode) value).getRowCount()); + tabletInfo = + new TabletInfo( + Collections.singletonList(new int[] {0, ((InsertTabletNode) value).getRowCount()})); } } - public WALInfoEntry(long memTableId, InsertTabletNode value, int tabletStart, int tabletEnd) { + public WALInfoEntry(long memTableId, InsertTabletNode value, List tabletRangeList) { this(memTableId, value, config.getWalMode() == WALMode.SYNC); - tabletInfo = new TabletInfo(tabletStart, tabletEnd); + tabletInfo = new TabletInfo(tabletRangeList); } WALInfoEntry(WALEntryType type, long memTableId, WALEntryValue value) { super(type, memTableId, value, false); if (value instanceof InsertTabletNode) { - tabletInfo = new TabletInfo(0, ((InsertTabletNode) value).getRowCount()); + tabletInfo = + new TabletInfo( + Collections.singletonList(new int[] {0, ((InsertTabletNode) value).getRowCount()})); } } @@ -65,18 +73,21 @@ public int serializedSize() { @Override public void serialize(IWALByteBufferView buffer) { - buffer.put(type.getCode()); - buffer.putLong(memTableId); switch (type) { case INSERT_TABLET_NODE: - ((InsertTabletNode) value) - .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd); + for (int[] range : tabletInfo.tabletRangeList) { + buffer.put(type.getCode()); + buffer.putLong(memTableId); + ((InsertTabletNode) value).serializeToWAL(buffer, range[0], range[1]); + } break; case INSERT_ROW_NODE: case INSERT_ROWS_NODE: case DELETE_DATA_NODE: case MEMORY_TABLE_SNAPSHOT: case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE: + buffer.put(type.getCode()); + buffer.putLong(memTableId); value.serializeToWAL(buffer); break; case MEMORY_TABLE_CHECKPOINT: @@ -87,19 +98,16 @@ public void serialize(IWALByteBufferView buffer) { } private static class TabletInfo { - // start row of insert tablet - private final int tabletStart; - // end row of insert tablet - private final int tabletEnd; - - public TabletInfo(int tabletStart, int tabletEnd) { - this.tabletStart = tabletStart; - this.tabletEnd = tabletEnd; + // ranges of insert tablet + private final List tabletRangeList; + + public TabletInfo(List tabletRangeList) { + this.tabletRangeList = new ArrayList<>(tabletRangeList); } @Override public int hashCode() { - return Objects.hash(tabletStart, tabletEnd); + return Objects.hash(tabletRangeList); } @Override @@ -113,8 +121,17 @@ public boolean equals(Object obj) { if (!(obj instanceof TabletInfo)) { return false; } - TabletInfo other = (TabletInfo) obj; - return this.tabletStart == other.tabletStart && this.tabletEnd == other.tabletEnd; + TabletInfo that = (TabletInfo) obj; + if (this.tabletRangeList.size() != that.tabletRangeList.size()) { + return false; + } + + for (int i = 0; i < tabletRangeList.size(); i++) { + if (!Arrays.equals(this.tabletRangeList.get(i), that.tabletRangeList.get(i))) { + return false; + } + } + return true; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java index 2baf45f3ae50..2c8729ee7c97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java @@ -30,6 +30,8 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener; +import java.util.List; + /** This interface provides uniform interface for writing wal and making checkpoints. */ public interface IWALNode extends FlushListener, AutoCloseable, ConsensusReqReader, DataSet { @@ -40,7 +42,7 @@ public interface IWALNode extends FlushListener, AutoCloseable, ConsensusReqRead WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode); /** Log InsertTabletNode. */ - WALFlushListener log(long memTableId, InsertTabletNode insertTabletNode, int start, int end); + WALFlushListener log(long memTableId, InsertTabletNode insertTabletNode, List rangeList); /** Log DeleteDataNode. */ WALFlushListener log(long memTableId, DeleteDataNode deleteDataNode); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java index 38b69f1162b4..8d288a605f9f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java @@ -29,6 +29,8 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener; +import java.util.List; + /** This class provides fake wal node when wal is disabled or exception happens. */ public class WALFakeNode implements IWALNode { private final Status status; @@ -59,7 +61,7 @@ public WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode) { @Override public WALFlushListener log( - long memTableId, InsertTabletNode insertTabletNode, int start, int end) { + long memTableId, InsertTabletNode insertTabletNode, List rangeList) { return getResult(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index 607a99a5673e..befc7cd59659 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -151,12 +151,12 @@ public WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode) { @Override public WALFlushListener log( - long memTableId, InsertTabletNode insertTabletNode, int start, int end) { + long memTableId, InsertTabletNode insertTabletNode, List rangeList) { logger.debug( "WAL node-{} logs insertTabletNode, the search index is {}.", identifier, insertTabletNode.getSearchIndex()); - WALEntry walEntry = new WALInfoEntry(memTableId, insertTabletNode, start, end); + WALEntry walEntry = new WALInfoEntry(memTableId, insertTabletNode, rangeList); return log(walEntry); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java index 5a7e260115ba..7daa2ef837f3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java @@ -303,14 +303,27 @@ public void alignedTvListRamCostTest() this.sgInfo.initTsFileProcessorInfo(processor); SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); // Test Tablet - processor.insertTablet(genInsertTableNode(0, true), 0, 10, new TSStatus[10], true, new long[4]); + processor.insertTablet( + genInsertTableNode(0, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); IMemTable memTable = processor.getWorkMemTable(); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNode(100, true), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNode(100, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNode(200, true), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNode(200, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); Assert.assertEquals(90000, memTable.getTotalPointsNum()); Assert.assertEquals(720360, memTable.memSize()); @@ -341,29 +354,62 @@ public void alignedTvListRamCostTest2() this.sgInfo.initTsFileProcessorInfo(processor); SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); // Test Tablet - processor.insertTablet(genInsertTableNode(0, true), 0, 10, new TSStatus[10], true, new long[4]); + processor.insertTablet( + genInsertTableNode(0, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); IMemTable memTable = processor.getWorkMemTable(); Assert.assertEquals(1596808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNodeFors3000ToS6000(0, true), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNodeFors3000ToS6000(0, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNode(100, true), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNode(100, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNodeFors3000ToS6000(100, true), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNodeFors3000ToS6000(100, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNode(200, true), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNode(200, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNodeFors3000ToS6000(200, true), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNodeFors3000ToS6000(200, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(3192808, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNode(300, true), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNode(300, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(6385616, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNodeFors3000ToS6000(300, true), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNodeFors3000ToS6000(300, true), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(6385616, memTable.getTVListsRamCost()); Assert.assertEquals(240000, memTable.getTotalPointsNum()); @@ -403,14 +449,26 @@ public void nonAlignedTvListRamCostTest() SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); // Test tablet processor.insertTablet( - genInsertTableNode(0, false), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNode(0, false), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); IMemTable memTable = processor.getWorkMemTable(); Assert.assertEquals(3192000, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNode(100, false), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNode(100, false), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(3192000, memTable.getTVListsRamCost()); processor.insertTablet( - genInsertTableNode(200, false), 0, 10, new TSStatus[10], true, new long[4]); + genInsertTableNode(200, false), + Collections.singletonList(new int[] {0, 10}), + new TSStatus[10], + true, + new long[4]); Assert.assertEquals(3192000, memTable.getTVListsRamCost()); Assert.assertEquals(90000, memTable.getTotalPointsNum()); Assert.assertEquals(1440000, memTable.memSize()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/ConsensusReqReaderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/ConsensusReqReaderTest.java index ca102ef8cf52..688e5df205c4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/ConsensusReqReaderTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/ConsensusReqReaderTest.java @@ -98,7 +98,10 @@ private void simulateFileScenario01() throws IllegalPathException { insertRowNode.setSearchIndex(1); walNode.log(0, insertRowNode); // 1 insertTabletNode = getInsertTabletNode(devicePath, new long[] {2}); - walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // -1 + walNode.log( + 0, + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); // -1 walNode.rollWALFile(); // _1-1-1.wal insertRowsNode = getInsertRowsNode(devicePath); @@ -118,17 +121,32 @@ private void simulateFileScenario01() throws IllegalPathException { walNode.log(0, insertRowNode); // 3 insertTabletNode = getInsertTabletNode(devicePath, new long[] {4}); insertTabletNode.setSearchIndex(4); - walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4 + walNode.log( + 0, + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); // 4 walNode.rollWALFile(); // _4-4-1.wal - walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4 + walNode.log( + 0, + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); // 4 walNode.rollWALFile(); // _5-4-1.wal - walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4 - walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4 + walNode.log( + 0, + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); // 4 + walNode.log( + 0, + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); // 4 insertTabletNode = getInsertTabletNode(devicePath, new long[] {5}); insertTabletNode.setSearchIndex(5); - walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 5 + walNode.log( + 0, + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); // 5 walNode.rollWALFile(); // _6-5-1.wal insertRowNode = getInsertRowNode(devicePath); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNodeTest.java index a1f1df61f42e..ca448f2386b7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNodeTest.java @@ -42,7 +42,11 @@ public void testSuccessFakeNode() { // log something List walFlushListeners = new ArrayList<>(); walFlushListeners.add(walNode.log(1, new InsertRowNode(new PlanNodeId("0")))); - walFlushListeners.add(walNode.log(1, new InsertTabletNode(new PlanNodeId("1")), 0, 0)); + walFlushListeners.add( + walNode.log( + 1, + new InsertTabletNode(new PlanNodeId("1")), + Collections.singletonList(new int[] {0, 0}))); walFlushListeners.add( walNode.log(1, new DeleteDataNode(new PlanNodeId("2"), Collections.emptyList(), 0, 0))); // check flush listeners @@ -62,7 +66,11 @@ public void testFailureFakeNode() { // log something List walFlushListeners = new ArrayList<>(); walFlushListeners.add(walNode.log(1, new InsertRowNode(new PlanNodeId("0")))); - walFlushListeners.add(walNode.log(1, new InsertTabletNode(new PlanNodeId("1")), 0, 0)); + walFlushListeners.add( + walNode.log( + 1, + new InsertTabletNode(new PlanNodeId("1")), + Collections.singletonList(new int[] {0, 0}))); walFlushListeners.add( walNode.log(1, new DeleteDataNode(new PlanNodeId("2"), Collections.emptyList(), 0, 0))); // check flush listeners diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java index f72f55a3cb8a..2db5749fe7d9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeTest.java @@ -169,7 +169,10 @@ private void writeInsertTabletNode( getInsertTabletNode(devicePath + memTableId, new long[] {i}); expectedInsertTabletNodes.add(insertTabletNode); WALFlushListener walFlushListener = - walNode.log(memTableId, insertTabletNode, 0, insertTabletNode.getRowCount()); + walNode.log( + memTableId, + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); walFlushListeners.add(walFlushListener); } } @@ -297,7 +300,10 @@ public void testDeleteOutdatedFiles() throws Exception { InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath + memTableId, new long[] {time}); WALFlushListener walFlushListener = - walNode.log(memTableId, insertTabletNode, 0, insertTabletNode.getRowCount()); + walNode.log( + memTableId, + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); walFlushListeners.add(walFlushListener); } walNode.onMemTableFlushed(memTable);