Skip to content

Commit

Permalink
[Feature] Support dynamic overwrite for insert overwrite (#51615)
Browse files Browse the repository at this point in the history
Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo authored Nov 8, 2024
1 parent 6425ac1 commit f73f983
Show file tree
Hide file tree
Showing 24 changed files with 277 additions and 38 deletions.
7 changes: 7 additions & 0 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ Status OlapTableSink::init(const TDataSink& t_sink, RuntimeState* state) {
_load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec;
}

if (table_sink.__isset.dynamic_overwrite) {
_dynamic_overwrite = table_sink.dynamic_overwrite;
}
if (table_sink.__isset.ignore_out_of_partition) {
_ignore_out_of_partition = table_sink.ignore_out_of_partition;
}
Expand Down Expand Up @@ -175,6 +178,7 @@ void OlapTableSink::_prepare_profile(RuntimeState* state) {
_profile->add_info_string("ReplicatedStorage", fmt::format("{}", _enable_replicated_storage));
_profile->add_info_string("AutomaticPartition", fmt::format("{}", _enable_automatic_partition));
_profile->add_info_string("AutomaticBucketSize", fmt::format("{}", _automatic_bucket_size));
_profile->add_info_string("DynamicOverwrite", fmt::format("{}", _dynamic_overwrite));

_ts_profile = state->obj_pool()->add(new TabletSinkProfile());
_ts_profile->runtime_profile = _profile;
Expand Down Expand Up @@ -407,6 +411,9 @@ Status OlapTableSink::_automatic_create_partition() {
request.__set_db_id(_vectorized_partition->db_id());
request.__set_table_id(_vectorized_partition->table_id());
request.__set_partition_values(_partition_not_exist_row_values);
if (_dynamic_overwrite) {
request.__set_is_temp(true);
}

LOG(INFO) << "load_id=" << print_id(_load_id) << ", txn_id: " << std::to_string(_txn_id)
<< " automatic partition rpc begin request " << request;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class OlapTableSink : public AsyncDataSink {
std::unique_ptr<ThreadPoolToken> _automatic_partition_token;
std::vector<std::vector<std::string>> _partition_not_exist_row_values;
bool _enable_automatic_partition = false;
bool _dynamic_overwrite = false;
bool _has_automatic_partition = false;
std::atomic<bool> _is_automatic_partition_running = false;
Status _automatic_partition_status;
Expand Down
27 changes: 27 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2721,6 +2721,33 @@ public void dropTempPartition(String partitionName, boolean needDropTablet) {
}
}

public void replaceMatchPartitions(List<String> tempPartitionNames) {
for (String partitionName : tempPartitionNames) {
Partition partition = tempPartitions.getPartition(partitionName);
if (partition != null) {
String oldPartitionName = partitionName.substring(
partitionName.indexOf(AnalyzerUtils.PARTITION_NAME_PREFIX_SPLIT) + 1);
Partition oldPartition = nameToPartition.get(oldPartitionName);
if (oldPartition != null) {
// drop old partition
dropPartition(-1, oldPartitionName, true);
}
// add new partition
addPartition(partition);
// drop temp partition
tempPartitions.dropPartition(partitionName, false);
// move the range from idToTempRange to idToRange
partitionInfo.moveRangeFromTempToFormal(partition.getId());
// rename partition
renamePartition(partitionName, oldPartitionName);
}
}

for (Column column : getColumns()) {
IDictManager.getInstance().removeGlobalDict(this.getId(), column.getColumnId());
}
}

/*
* replace partitions in 'partitionNames' with partitions in
* 'tempPartitionNames'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,35 @@ public class InsertOverwriteJob {
@SerializedName(value = "sourcePartitionNames")
private List<String> sourcePartitionNames;

@SerializedName(value = "dynamicOverwrite")
private boolean dynamicOverwrite = false;

private transient InsertStmt insertStmt;

public InsertOverwriteJob() {
}

public InsertOverwriteJob(long jobId, InsertStmt insertStmt, long targetDbId,
long targetTableId, long warehouseId) {
long targetTableId, long warehouseId, boolean dynamicOverwrite) {
this.jobId = jobId;
this.insertStmt = insertStmt;
this.sourcePartitionIds = insertStmt.getTargetPartitionIds();
this.jobState = InsertOverwriteJobState.OVERWRITE_PENDING;
this.targetDbId = targetDbId;
this.targetTableId = targetTableId;
this.warehouseId = warehouseId;
this.dynamicOverwrite = dynamicOverwrite;
}

// used to replay InsertOverwriteJob
public InsertOverwriteJob(long jobId, long targetDbId, long targetTableId, List<Long> sourcePartitionIds) {
public InsertOverwriteJob(long jobId, long targetDbId, long targetTableId,
List<Long> sourcePartitionIds, boolean dynamicOverwrite) {
this.jobId = jobId;
this.targetDbId = targetDbId;
this.targetTableId = targetTableId;
this.sourcePartitionIds = sourcePartitionIds;
this.jobState = InsertOverwriteJobState.OVERWRITE_PENDING;
this.dynamicOverwrite = dynamicOverwrite;
}

public long getJobId() {
Expand Down Expand Up @@ -128,4 +136,8 @@ public InsertStmt getInsertStmt() {
public long getWarehouseId() {
return warehouseId;
}

public boolean isDynamicOverwrite() {
return dynamicOverwrite;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public boolean hasRunningOverwriteJob(long tableId) {

public void replayCreateInsertOverwrite(CreateInsertOverwriteJobLog jobInfo) {
InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(jobInfo.getJobId(),
jobInfo.getDbId(), jobInfo.getTableId(), jobInfo.getTargetPartitionIds());
jobInfo.getDbId(), jobInfo.getTableId(), jobInfo.getTargetPartitionIds(), jobInfo.isDynamicOverwrite());
boolean registered = registerOverwriteJob(insertOverwriteJob);
if (!registered) {
LOG.warn("register insert overwrite job failed. jobId:{}", insertOverwriteJob.getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ private void createPartitionByValue(InsertStmt insertStmt) {
if (insertStmt.getTargetPartitionNames() == null) {
return;
}
if (insertStmt.isDynamicOverwrite()) {
return;
}
OlapTable olapTable = (OlapTable) insertStmt.getTargetTable();
List<List<String>> partitionValues = Lists.newArrayList();
if (!olapTable.getPartitionInfo().isAutomaticPartition()) {
Expand Down Expand Up @@ -305,7 +308,7 @@ private void createPartitionByValue(InsertStmt insertStmt) {
}

try {
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues);
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues, false, null);
} catch (AnalysisException ex) {
LOG.warn(ex.getMessage(), ex);
throw new RuntimeException(ex);
Expand Down Expand Up @@ -363,6 +366,9 @@ private void executeInsert() throws Exception {
}

private void createTempPartitions() throws DdlException {
if (job.isDynamicOverwrite()) {
return;
}
long createPartitionStartTimestamp = System.currentTimeMillis();
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
if (db == null) {
Expand Down Expand Up @@ -421,6 +427,24 @@ private void gc(boolean isReplay) {
}
}
}
// if dynamic overwrite, drop all runtime created partitions
if (job.isDynamicOverwrite()) {
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.getTransactionState(dbId, insertStmt.getTxnId());
if (txnState == null) {
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
}
List<String> tmpPartitionNames = txnState.getCreatedPartitionNames();
for (String partitionName : tmpPartitionNames) {
Partition partition = targetTable.getPartition(partitionName, true);
if (partition != null) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
sourceTablets.addAll(index.getTablets());
}
targetTable.dropTempPartition(partitionName, true);
}
}
}
if (!isReplay) {
// mark all source tablet ids force delete to drop it directly on BE,
// not to move it to trash
Expand Down Expand Up @@ -486,7 +510,17 @@ private void doCommit(boolean isReplay) {

PartitionInfo partitionInfo = targetTable.getPartitionInfo();
if (partitionInfo.isRangePartition() || partitionInfo.getType() == PartitionType.LIST) {
targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false);
if (job.isDynamicOverwrite()) {
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.getTransactionState(dbId, insertStmt.getTxnId());
if (txnState == null) {
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
}
tmpPartitionNames = txnState.getCreatedPartitionNames();
targetTable.replaceMatchPartitions(tmpPartitionNames);
} else {
targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false);
}
} else if (partitionInfo instanceof SinglePartitionInfo) {
targetTable.replacePartition(sourcePartitionNames.get(0), tmpPartitionNames.get(0));
} else {
Expand Down Expand Up @@ -559,8 +593,10 @@ private void prepareInsert() {
if (insertStmt.getTargetPartitionNames() == null) {
insertStmt.setPartitionNotSpecifiedInOverwrite(true);
}
insertStmt.setTargetPartitionNames(partitionNames);
insertStmt.setTargetPartitionIds(job.getTmpPartitionIds());
if (!job.isDynamicOverwrite()) {
insertStmt.setTargetPartitionNames(partitionNames);
insertStmt.setTargetPartitionIds(job.getTmpPartitionIds());
}
insertStmt.setOverwrite(false);
insertStmt.setSystem(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ public class CreateInsertOverwriteJobLog implements Writable {
@SerializedName(value = "targetPartitionIds")
private List<Long> targetPartitionIds;

public CreateInsertOverwriteJobLog(long jobId, long dbId, long tableId, List<Long> targetPartitionIds) {
@SerializedName(value = "dynamicOverwrite")
private boolean dynamicOverwrite = false;

public CreateInsertOverwriteJobLog() {
}

public CreateInsertOverwriteJobLog(long jobId, long dbId, long tableId,
List<Long> targetPartitionIds, boolean dynamicOverwrite) {
this.jobId = jobId;
this.dbId = dbId;
this.tableId = tableId;
Expand All @@ -60,6 +67,10 @@ public List<Long> getTargetPartitionIds() {
return targetPartitionIds;
}

public boolean isDynamicOverwrite() {
return dynamicOverwrite;
}

@Override
public String toString() {
return "CreateInsertOverwriteJobInfo{" +
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class OlapTableSink extends DataSink {
private TPartialUpdateMode partialUpdateMode;
private long warehouseId = WarehouseManager.DEFAULT_WAREHOUSE_ID;
private long automaticBucketSize = 0;
private boolean enableDynamicOverwrite = false;

public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
TWriteQuorumType writeQuorum, boolean enableReplicatedStorage,
Expand Down Expand Up @@ -229,6 +230,10 @@ public void setPartialUpdateMode(TPartialUpdateMode mode) {
this.partialUpdateMode = mode;
}

public void setDynamicOverwrite(boolean enableDynamicOverwrite) {
this.enableDynamicOverwrite = enableDynamicOverwrite;
}

public void complete(String mergeCondition) throws UserException {
TOlapTableSink tSink = tDataSink.getOlap_table_sink();
if (mergeCondition != null && !mergeCondition.isEmpty()) {
Expand All @@ -238,6 +243,10 @@ public void complete(String mergeCondition) throws UserException {
}

public List<Long> getOpenPartitions() {
if (enableAutomaticPartition && enableDynamicOverwrite) {
return new ArrayList<>(Collections.singletonList(
dstTable.getPartition(ExpressionRangePartitionInfo.AUTOMATIC_SHADOW_PARTITION_NAME).getId()));
}
if (!enableAutomaticPartition || Config.max_load_initial_open_partition_number <= 0
|| partitionIds.size() < Config.max_load_initial_open_partition_number) {
return partitionIds;
Expand Down Expand Up @@ -267,6 +276,9 @@ public void complete() throws UserException {
long partitionId = optionalPartition.get().getId();
numReplicas = dstTable.getPartitionInfo().getReplicationNum(partitionId);
}
if (enableAutomaticPartition && enableDynamicOverwrite) {
tSink.setDynamic_overwrite(true);
}
tSink.setNum_replicas(numReplicas);
tSink.setNeed_gen_rollup(dstTable.shouldLoadToNewRollup());
tSink.setSchema(createSchema(tSink.getDb_id(), dstTable, tupleDescriptor));
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String MAX_PARALLEL_SCAN_INSTANCE_NUM = "max_parallel_scan_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio";
public static final String DYNAMIC_OVERWRITE = "dynamic_overwrite";
public static final String ENABLE_SPILL = "enable_spill";
public static final String ENABLE_SPILL_TO_REMOTE_STORAGE = "enable_spill_to_remote_storage";
public static final String DISABLE_SPILL_TO_LOCAL_DISK = "disable_spill_to_local_disk";
Expand Down Expand Up @@ -1156,6 +1157,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT)
private boolean enableInsertStrict = true;

@VariableMgr.VarAttr(name = DYNAMIC_OVERWRITE)
private boolean dynamicOverwrite = false;

@VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO)
private double insertMaxFilterRatio = 0;

Expand Down Expand Up @@ -2831,6 +2835,14 @@ public void setEnableInsertStrict(boolean enableInsertStrict) {
this.enableInsertStrict = enableInsertStrict;
}

public boolean isDynamicOverwrite() {
return dynamicOverwrite;
}

public void setDynamicOverwrite(boolean dynamicOverwrite) {
this.dynamicOverwrite = dynamicOverwrite;
}

public double getInsertMaxFilterRatio() {
return insertMaxFilterRatio;
}
Expand Down
6 changes: 4 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2083,14 +2083,16 @@ public void handleInsertOverwrite(InsertStmt insertStmt) throws Exception {
}
OlapTable olapTable = (OlapTable) insertStmt.getTargetTable();
InsertOverwriteJob job = new InsertOverwriteJob(GlobalStateMgr.getCurrentState().getNextId(),
insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId());
insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId(),
insertStmt.isDynamicOverwrite());
if (!locker.lockDatabaseAndCheckExist(db, LockType.WRITE)) {
throw new DmlException("database:%s does not exist.", db.getFullName());
}
try {
// add an edit log
CreateInsertOverwriteJobLog info = new CreateInsertOverwriteJobLog(job.getJobId(),
job.getTargetDbId(), job.getTargetTableId(), job.getSourcePartitionIds());
job.getTargetDbId(), job.getTargetTableId(), job.getSourcePartitionIds(),
job.isDynamicOverwrite());
GlobalStateMgr.getCurrentState().getEditLog().logCreateInsertOverwrite(info);
} finally {
locker.unLockDatabase(db.getId(), LockType.WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,12 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
long tableId = request.getTable_id();
TCreatePartitionResult result = new TCreatePartitionResult();
TStatus errorStatus = new TStatus(RUNTIME_ERROR);
String partitionNamePrefix = null;
boolean isTemp = false;
if (request.isSetIs_temp() && request.isIs_temp()) {
isTemp = true;
partitionNamePrefix = "txn" + request.getTxn_id();
}

Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
if (db == null) {
Expand Down Expand Up @@ -2237,7 +2243,7 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
try (AutoCloseableLock ignore = new AutoCloseableLock(new Locker(), db.getId(), Lists.newArrayList(table.getId()),
LockType.READ)) {
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable,
request.partition_values);
request.partition_values, isTemp, partitionNamePrefix);
PartitionDesc partitionDesc = addPartitionClause.getPartitionDesc();
if (partitionDesc instanceof RangePartitionDesc) {
partitionColNames = ((RangePartitionDesc) partitionDesc).getPartitionColNames();
Expand Down Expand Up @@ -2346,7 +2352,8 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
return buildCreatePartitionResponse(olapTable, txnState, partitions, tablets, partitionColNames);
return buildCreatePartitionResponse(
olapTable, txnState, partitions, tablets, partitionColNames, isTemp);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}
Expand All @@ -2357,7 +2364,8 @@ private static TCreatePartitionResult buildCreatePartitionResponse(OlapTable ola
TransactionState txnState,
List<TOlapTablePartition> partitions,
List<TTabletLocation> tablets,
List<String> partitionColNames) {
List<String> partitionColNames,
boolean isTemp) {
TCreatePartitionResult result = new TCreatePartitionResult();
TStatus errorStatus = new TStatus(RUNTIME_ERROR);
for (String partitionName : partitionColNames) {
Expand All @@ -2376,7 +2384,7 @@ private static TCreatePartitionResult buildCreatePartitionResponse(OlapTable ola
continue;
}

Partition partition = olapTable.getPartition(partitionName);
Partition partition = olapTable.getPartition(partitionName, isTemp);
tPartition = new TOlapTablePartition();
tPartition.setId(partition.getId());
buildPartitionInfo(olapTable, partitions, partition, tPartition, txnState);
Expand Down
Loading

0 comments on commit f73f983

Please sign in to comment.