Skip to content

Commit

Permalink
[Enhancement](lock) Optimize CatalogRecycleBin lock granularity to im…
Browse files Browse the repository at this point in the history
…prove concurrency
  • Loading branch information
felixwluo committed Feb 18, 2025
1 parent 69ff4f3 commit 9310805
Showing 1 changed file with 120 additions and 73 deletions.
193 changes: 120 additions & 73 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -68,6 +71,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable, GsonPos
// to avoid erase log ahead of drop log
private static final long minEraseLatency = 10 * 60 * 1000; // 10 min

private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private Map<Long, RecycleDatabaseInfo> idToDatabase;
private Map<Long, RecycleTableInfo> idToTable;
private Map<Long, RecyclePartitionInfo> idToPartition;
Expand Down Expand Up @@ -186,23 +190,28 @@ public synchronized boolean recycleTable(long dbId, Table table, boolean isRepla
return true;
}

public synchronized boolean recyclePartition(long dbId, long tableId, String tableName, Partition partition,
public boolean recyclePartition(long dbId, long tableId, String tableName, Partition partition,
Range<PartitionKey> range, PartitionItem listPartitionItem,
DataProperty dataProperty, ReplicaAllocation replicaAlloc,
boolean isInMemory, boolean isMutable) {
if (idToPartition.containsKey(partition.getId())) {
LOG.error("partition[{}] already in recycle bin.", partition.getId());
return false;
}
lock.writeLock().lock();
try {
if (idToPartition.containsKey(partition.getId())) {
LOG.error("partition[{}] already in recycle bin.", partition.getId());
return false;
}

// recycle partition
RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition,
range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable);
idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
idToPartition.put(partition.getId(), partitionInfo);
LOG.info("recycle partition[{}-{}] of table [{}-{}]", partition.getId(), partition.getName(),
tableId, tableName);
return true;
// recycle partition
RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition,
range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable);
idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
idToPartition.put(partition.getId(), partitionInfo);
LOG.info("recycle partition[{}-{}] of table [{}-{}]", partition.getId(), partition.getName(),
tableId, tableName);
return true;
} finally {
lock.writeLock().unlock();
}
}

public synchronized Long getRecycleTimeById(long id) {
Expand Down Expand Up @@ -474,47 +483,66 @@ public synchronized void replayEraseTable(long tableId) {
LOG.info("replay erase table[{}]", tableId);
}

private synchronized void erasePartition(long currentTimeMs, int keepNum) {
private void erasePartition(long currentTimeMs, int keepNum) {
int eraseNum = 0;
StopWatch watch = StopWatch.createStarted();
try {
// 1. erase expired partitions
Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
RecyclePartitionInfo partitionInfo = entry.getValue();
Partition partition = partitionInfo.getPartition();
List<Partition> partitionsToErase = new ArrayList<>();

long partitionId = entry.getKey();
if (isExpire(partitionId, currentTimeMs)) {
Env.getCurrentEnv().onErasePartition(partition);
// erase partition
iterator.remove();
idToRecycleTime.remove(partitionId);
// log
Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
LOG.info("erase partition[{}]. reason: expired", partitionId);
eraseNum++;
try {
// 1. First collect expired partitions under write lock
lock.writeLock().lock();
try {
Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
RecyclePartitionInfo partitionInfo = entry.getValue();
Partition partition = partitionInfo.getPartition();
long partitionId = entry.getKey();

if (isExpire(partitionId, currentTimeMs)) {
partitionsToErase.add(partition);
iterator.remove();
idToRecycleTime.remove(partitionId);
eraseNum++;
}
}
} // end for partitions

// 2. erase exceed number
if (keepNum < 0) {
return;
} finally {
lock.writeLock().unlock();
}
com.google.common.collect.Table<Long, Long, Set<String>> dbTblId2PartitionNames = HashBasedTable.create();
for (RecyclePartitionInfo partitionInfo : idToPartition.values()) {
Set<String> partitionNames = dbTblId2PartitionNames.get(partitionInfo.dbId, partitionInfo.tableId);
if (partitionNames == null) {
partitionNames = Sets.newHashSet();
dbTblId2PartitionNames.put(partitionInfo.dbId, partitionInfo.tableId, partitionNames);
}
partitionNames.add(partitionInfo.getPartition().getName());

// 2. Then erase partitions outside of lock
for (Partition partition : partitionsToErase) {
Env.getCurrentEnv().onErasePartition(partition);
// log
Env.getCurrentEnv().getEditLog().logErasePartition(partition.getId());
LOG.info("erase partition[{}]. reason: expired", partition.getId());
}
for (Cell<Long, Long, Set<String>> cell : dbTblId2PartitionNames.cellSet()) {
for (String partitionName : cell.getValue()) {
erasePartitionWithSameName(cell.getRowKey(), cell.getColumnKey(), partitionName, currentTimeMs,
keepNum);

// 3. Handle exceed number case
if (keepNum >= 0) {
lock.readLock().lock();
try {
com.google.common.collect.Table<Long, Long, Set<String>> dbTblId2PartitionNames =
HashBasedTable.create();
for (RecyclePartitionInfo partitionInfo : idToPartition.values()) {
Set<String> partitionNames =
dbTblId2PartitionNames.get(partitionInfo.dbId, partitionInfo.tableId);
if (partitionNames == null) {
partitionNames = Sets.newHashSet();
dbTblId2PartitionNames.put(partitionInfo.dbId, partitionInfo.tableId, partitionNames);
}
partitionNames.add(partitionInfo.getPartition().getName());
}

for (Cell<Long, Long, Set<String>> cell : dbTblId2PartitionNames.cellSet()) {
for (String partitionName : cell.getValue()) {
erasePartitionWithSameName(cell.getRowKey(), cell.getColumnKey(),
partitionName, currentTimeMs,
keepNum);
}
}
} finally {
lock.readLock().unlock();
}
}
} finally {
Expand All @@ -523,26 +551,31 @@ private synchronized void erasePartition(long currentTimeMs, int keepNum) {
}
}

private synchronized List<Long> getSameNamePartitionIdListToErase(long dbId, long tableId, String partitionName,
private List<Long> getSameNamePartitionIdListToErase(long dbId, long tableId, String partitionName,
int maxSameNameTrashNum) {
Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
List<List<Long>> partitionRecycleTimeLists = Lists.newArrayList();
while (iterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
RecyclePartitionInfo partitionInfo = entry.getValue();
if (partitionInfo.getDbId() != dbId || partitionInfo.getTableId() != tableId) {
continue;
}

Partition partition = partitionInfo.getPartition();
if (partition.getName().equals(partitionName)) {
List<Long> partitionRecycleTimeInfo = Lists.newArrayList();
partitionRecycleTimeInfo.add(entry.getKey());
partitionRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey()));
lock.readLock().lock();
try {
for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
RecyclePartitionInfo partitionInfo = entry.getValue();
if (partitionInfo.getDbId() != dbId || partitionInfo.getTableId() != tableId) {
continue;
}

partitionRecycleTimeLists.add(partitionRecycleTimeInfo);
Partition partition = partitionInfo.getPartition();
if (partition.getName().equals(partitionName)) {
List<Long> partitionRecycleTimeInfo = Lists.newArrayList();
partitionRecycleTimeInfo.add(entry.getKey());
partitionRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey()));

partitionRecycleTimeLists.add(partitionRecycleTimeInfo);
}
}
} finally {
lock.readLock().unlock();
}

List<Long> partitionIdToErase = Lists.newArrayList();
if (partitionRecycleTimeLists.size() <= maxSameNameTrashNum) {
return partitionIdToErase;
Expand All @@ -559,20 +592,34 @@ private synchronized List<Long> getSameNamePartitionIdListToErase(long dbId, lon

private synchronized void erasePartitionWithSameName(long dbId, long tableId, String partitionName,
long currentTimeMs, int maxSameNameTrashNum) {
List<Long> partitionIdToErase = getSameNamePartitionIdListToErase(dbId, tableId, partitionName,
maxSameNameTrashNum);
for (Long partitionId : partitionIdToErase) {
RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId);
if (!isExpireMinLatency(partitionId, currentTimeMs)) {
continue;
List<Long> partitionIdToErase;
List<Partition> partitionsToErase = new ArrayList<>();

// First get partitions to erase under write lock
lock.writeLock().lock();
try {
partitionIdToErase = getSameNamePartitionIdListToErase(dbId, tableId, partitionName, maxSameNameTrashNum);
for (Long partitionId : partitionIdToErase) {
RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId);
if (!isExpireMinLatency(partitionId, currentTimeMs)) {
continue;
}
Partition partition = partitionInfo.getPartition();
partitionsToErase.add(partition);

idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);
}
Partition partition = partitionInfo.getPartition();
} finally {
lock.writeLock().unlock();
}

// Then erase partitions outside of lock
for (Partition partition : partitionsToErase) {
Env.getCurrentEnv().onErasePartition(partition);
idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);
Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
LOG.info("erase partition[{}] name: {} from table[{}] from db[{}]", partitionId, partitionName, tableId,
Env.getCurrentEnv().getEditLog().logErasePartition(partition.getId());
LOG.info("erase partition[{}] name: {} from table[{}] from db[{}]",
partition.getId(), partitionName, tableId,
dbId);
}
}
Expand Down

0 comments on commit 9310805

Please sign in to comment.