Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support dynamic overwrite for insert overwrite #51615

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ Status OlapTableSink::init(const TDataSink& t_sink, RuntimeState* state) {
_load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec;
}

if (table_sink.__isset.dynamic_overwrite) {
_dynamic_overwrite = table_sink.dynamic_overwrite;
}
if (table_sink.__isset.ignore_out_of_partition) {
_ignore_out_of_partition = table_sink.ignore_out_of_partition;
}
Expand Down Expand Up @@ -175,6 +178,7 @@ void OlapTableSink::_prepare_profile(RuntimeState* state) {
_profile->add_info_string("ReplicatedStorage", fmt::format("{}", _enable_replicated_storage));
_profile->add_info_string("AutomaticPartition", fmt::format("{}", _enable_automatic_partition));
_profile->add_info_string("AutomaticBucketSize", fmt::format("{}", _automatic_bucket_size));
_profile->add_info_string("DynamicOverwrite", fmt::format("{}", _dynamic_overwrite));

_ts_profile = state->obj_pool()->add(new TabletSinkProfile());
_ts_profile->runtime_profile = _profile;
Expand Down Expand Up @@ -407,6 +411,9 @@ Status OlapTableSink::_automatic_create_partition() {
request.__set_db_id(_vectorized_partition->db_id());
request.__set_table_id(_vectorized_partition->table_id());
request.__set_partition_values(_partition_not_exist_row_values);
if (_dynamic_overwrite) {
request.__set_is_temp(true);
}

LOG(INFO) << "load_id=" << print_id(_load_id) << ", txn_id: " << std::to_string(_txn_id)
<< " automatic partition rpc begin request " << request;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class OlapTableSink : public AsyncDataSink {
std::unique_ptr<ThreadPoolToken> _automatic_partition_token;
std::vector<std::vector<std::string>> _partition_not_exist_row_values;
bool _enable_automatic_partition = false;
bool _dynamic_overwrite = false;
bool _has_automatic_partition = false;
std::atomic<bool> _is_automatic_partition_running = false;
Status _automatic_partition_status;
Expand Down
27 changes: 27 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2721,6 +2721,33 @@ public void dropTempPartition(String partitionName, boolean needDropTablet) {
}
}

public void replaceMatchPartitions(List<String> tempPartitionNames) {
for (String partitionName : tempPartitionNames) {
Partition partition = tempPartitions.getPartition(partitionName);
if (partition != null) {
String oldPartitionName = partitionName.substring(
partitionName.indexOf(AnalyzerUtils.PARTITION_NAME_PREFIX_SPLIT) + 1);
Partition oldPartition = nameToPartition.get(oldPartitionName);
if (oldPartition != null) {
// drop old partition
dropPartition(-1, oldPartitionName, true);
}
// add new partition
addPartition(partition);
// drop temp partition
tempPartitions.dropPartition(partitionName, false);
// move the range from idToTempRange to idToRange
partitionInfo.moveRangeFromTempToFormal(partition.getId());
// rename partition
renamePartition(partitionName, oldPartitionName);
}
}

for (Column column : getColumns()) {
IDictManager.getInstance().removeGlobalDict(this.getId(), column.getColumnId());
}
}

/*
* replace partitions in 'partitionNames' with partitions in
* 'tempPartitionNames'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,35 @@ public class InsertOverwriteJob {
@SerializedName(value = "sourcePartitionNames")
private List<String> sourcePartitionNames;

@SerializedName(value = "dynamicOverwrite")
private boolean dynamicOverwrite = false;

private transient InsertStmt insertStmt;

meegoo marked this conversation as resolved.
Show resolved Hide resolved
public InsertOverwriteJob() {
}

public InsertOverwriteJob(long jobId, InsertStmt insertStmt, long targetDbId,
long targetTableId, long warehouseId) {
long targetTableId, long warehouseId, boolean dynamicOverwrite) {
this.jobId = jobId;
this.insertStmt = insertStmt;
this.sourcePartitionIds = insertStmt.getTargetPartitionIds();
this.jobState = InsertOverwriteJobState.OVERWRITE_PENDING;
this.targetDbId = targetDbId;
this.targetTableId = targetTableId;
this.warehouseId = warehouseId;
this.dynamicOverwrite = dynamicOverwrite;
}

// used to replay InsertOverwriteJob
public InsertOverwriteJob(long jobId, long targetDbId, long targetTableId, List<Long> sourcePartitionIds) {
public InsertOverwriteJob(long jobId, long targetDbId, long targetTableId,
List<Long> sourcePartitionIds, boolean dynamicOverwrite) {
this.jobId = jobId;
this.targetDbId = targetDbId;
this.targetTableId = targetTableId;
this.sourcePartitionIds = sourcePartitionIds;
this.jobState = InsertOverwriteJobState.OVERWRITE_PENDING;
this.dynamicOverwrite = dynamicOverwrite;
}

public long getJobId() {
Expand Down Expand Up @@ -128,4 +136,8 @@ public InsertStmt getInsertStmt() {
public long getWarehouseId() {
return warehouseId;
}

public boolean isDynamicOverwrite() {
return dynamicOverwrite;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public boolean hasRunningOverwriteJob(long tableId) {

public void replayCreateInsertOverwrite(CreateInsertOverwriteJobLog jobInfo) {
InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(jobInfo.getJobId(),
jobInfo.getDbId(), jobInfo.getTableId(), jobInfo.getTargetPartitionIds());
jobInfo.getDbId(), jobInfo.getTableId(), jobInfo.getTargetPartitionIds(), jobInfo.isDynamicOverwrite());
boolean registered = registerOverwriteJob(insertOverwriteJob);
if (!registered) {
LOG.warn("register insert overwrite job failed. jobId:{}", insertOverwriteJob.getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ private void createPartitionByValue(InsertStmt insertStmt) {
if (insertStmt.getTargetPartitionNames() == null) {
return;
}
if (insertStmt.isDynamicOverwrite()) {
return;
}
OlapTable olapTable = (OlapTable) insertStmt.getTargetTable();
List<List<String>> partitionValues = Lists.newArrayList();
if (!olapTable.getPartitionInfo().isAutomaticPartition()) {
Expand Down Expand Up @@ -305,7 +308,7 @@ private void createPartitionByValue(InsertStmt insertStmt) {
}

try {
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues);
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues, false, null);
} catch (AnalysisException ex) {
LOG.warn(ex.getMessage(), ex);
throw new RuntimeException(ex);
Expand Down Expand Up @@ -363,6 +366,9 @@ private void executeInsert() throws Exception {
}

private void createTempPartitions() throws DdlException {
if (job.isDynamicOverwrite()) {
return;
}
long createPartitionStartTimestamp = System.currentTimeMillis();
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
if (db == null) {
Expand Down Expand Up @@ -421,6 +427,24 @@ private void gc(boolean isReplay) {
}
}
}
// if dynamic overwrite, drop all runtime created partitions
if (job.isDynamicOverwrite()) {
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.getTransactionState(dbId, insertStmt.getTxnId());
if (txnState == null) {
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
}
List<String> tmpPartitionNames = txnState.getCreatedPartitionNames();
for (String partitionName : tmpPartitionNames) {
Partition partition = targetTable.getPartition(partitionName, true);
if (partition != null) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
sourceTablets.addAll(index.getTablets());
}
targetTable.dropTempPartition(partitionName, true);
}
}
}
if (!isReplay) {
// mark all source tablet ids force delete to drop it directly on BE,
// not to move it to trash
Expand Down Expand Up @@ -486,7 +510,17 @@ private void doCommit(boolean isReplay) {

PartitionInfo partitionInfo = targetTable.getPartitionInfo();
if (partitionInfo.isRangePartition() || partitionInfo.getType() == PartitionType.LIST) {
targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false);
if (job.isDynamicOverwrite()) {
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.getTransactionState(dbId, insertStmt.getTxnId());
if (txnState == null) {
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
}
tmpPartitionNames = txnState.getCreatedPartitionNames();
targetTable.replaceMatchPartitions(tmpPartitionNames);
} else {
targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false);
}
} else if (partitionInfo instanceof SinglePartitionInfo) {
targetTable.replacePartition(sourcePartitionNames.get(0), tmpPartitionNames.get(0));
} else {
Expand Down Expand Up @@ -559,8 +593,10 @@ private void prepareInsert() {
if (insertStmt.getTargetPartitionNames() == null) {
insertStmt.setPartitionNotSpecifiedInOverwrite(true);
}
insertStmt.setTargetPartitionNames(partitionNames);
insertStmt.setTargetPartitionIds(job.getTmpPartitionIds());
if (!job.isDynamicOverwrite()) {
insertStmt.setTargetPartitionNames(partitionNames);
insertStmt.setTargetPartitionIds(job.getTmpPartitionIds());
}
insertStmt.setOverwrite(false);
insertStmt.setSystem(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ public class CreateInsertOverwriteJobLog implements Writable {
@SerializedName(value = "targetPartitionIds")
private List<Long> targetPartitionIds;

public CreateInsertOverwriteJobLog(long jobId, long dbId, long tableId, List<Long> targetPartitionIds) {
@SerializedName(value = "dynamicOverwrite")
private boolean dynamicOverwrite = false;

meegoo marked this conversation as resolved.
Show resolved Hide resolved
public CreateInsertOverwriteJobLog() {
}

public CreateInsertOverwriteJobLog(long jobId, long dbId, long tableId,
List<Long> targetPartitionIds, boolean dynamicOverwrite) {
this.jobId = jobId;
this.dbId = dbId;
this.tableId = tableId;
Expand All @@ -60,6 +67,10 @@ public List<Long> getTargetPartitionIds() {
return targetPartitionIds;
}

public boolean isDynamicOverwrite() {
return dynamicOverwrite;
}

@Override
public String toString() {
return "CreateInsertOverwriteJobInfo{" +
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class OlapTableSink extends DataSink {
private TPartialUpdateMode partialUpdateMode;
private long warehouseId = WarehouseManager.DEFAULT_WAREHOUSE_ID;
private long automaticBucketSize = 0;
private boolean enableDynamicOverwrite = false;

public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
TWriteQuorumType writeQuorum, boolean enableReplicatedStorage,
Expand Down Expand Up @@ -229,6 +230,10 @@ public void setPartialUpdateMode(TPartialUpdateMode mode) {
this.partialUpdateMode = mode;
}

public void setDynamicOverwrite(boolean enableDynamicOverwrite) {
this.enableDynamicOverwrite = enableDynamicOverwrite;
}

public void complete(String mergeCondition) throws UserException {
TOlapTableSink tSink = tDataSink.getOlap_table_sink();
if (mergeCondition != null && !mergeCondition.isEmpty()) {
Expand All @@ -238,6 +243,10 @@ public void complete(String mergeCondition) throws UserException {
}

public List<Long> getOpenPartitions() {
if (enableAutomaticPartition && enableDynamicOverwrite) {
return new ArrayList<>(Collections.singletonList(
dstTable.getPartition(ExpressionRangePartitionInfo.AUTOMATIC_SHADOW_PARTITION_NAME).getId()));
}
if (!enableAutomaticPartition || Config.max_load_initial_open_partition_number <= 0
|| partitionIds.size() < Config.max_load_initial_open_partition_number) {
return partitionIds;
Expand Down Expand Up @@ -267,6 +276,9 @@ public void complete() throws UserException {
long partitionId = optionalPartition.get().getId();
numReplicas = dstTable.getPartitionInfo().getReplicationNum(partitionId);
}
if (enableAutomaticPartition && enableDynamicOverwrite) {
tSink.setDynamic_overwrite(true);
}
meegoo marked this conversation as resolved.
Show resolved Hide resolved
tSink.setNum_replicas(numReplicas);
tSink.setNeed_gen_rollup(dstTable.shouldLoadToNewRollup());
tSink.setSchema(createSchema(tSink.getDb_id(), dstTable, tupleDescriptor));
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String MAX_PARALLEL_SCAN_INSTANCE_NUM = "max_parallel_scan_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio";
public static final String DYNAMIC_OVERWRITE = "dynamic_overwrite";
public static final String ENABLE_SPILL = "enable_spill";
public static final String ENABLE_SPILL_TO_REMOTE_STORAGE = "enable_spill_to_remote_storage";
public static final String DISABLE_SPILL_TO_LOCAL_DISK = "disable_spill_to_local_disk";
Expand Down Expand Up @@ -1151,6 +1152,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT)
private boolean enableInsertStrict = true;

@VariableMgr.VarAttr(name = DYNAMIC_OVERWRITE)
private boolean dynamicOverwrite = false;

@VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO)
private double insertMaxFilterRatio = 0;

Expand Down Expand Up @@ -2817,6 +2821,14 @@ public void setEnableInsertStrict(boolean enableInsertStrict) {
this.enableInsertStrict = enableInsertStrict;
}

public boolean isDynamicOverwrite() {
return dynamicOverwrite;
}

public void setDynamicOverwrite(boolean dynamicOverwrite) {
this.dynamicOverwrite = dynamicOverwrite;
}

public double getInsertMaxFilterRatio() {
return insertMaxFilterRatio;
}
Expand Down
6 changes: 4 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2083,14 +2083,16 @@ public void handleInsertOverwrite(InsertStmt insertStmt) throws Exception {
}
OlapTable olapTable = (OlapTable) insertStmt.getTargetTable();
InsertOverwriteJob job = new InsertOverwriteJob(GlobalStateMgr.getCurrentState().getNextId(),
insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId());
insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId(),
insertStmt.isDynamicOverwrite());
if (!locker.lockDatabaseAndCheckExist(db, LockType.WRITE)) {
throw new DmlException("database:%s does not exist.", db.getFullName());
}
try {
// add an edit log
CreateInsertOverwriteJobLog info = new CreateInsertOverwriteJobLog(job.getJobId(),
job.getTargetDbId(), job.getTargetTableId(), job.getSourcePartitionIds());
job.getTargetDbId(), job.getTargetTableId(), job.getSourcePartitionIds(),
job.isDynamicOverwrite());
GlobalStateMgr.getCurrentState().getEditLog().logCreateInsertOverwrite(info);
} finally {
locker.unLockDatabase(db.getId(), LockType.WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2162,6 +2162,12 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
long tableId = request.getTable_id();
TCreatePartitionResult result = new TCreatePartitionResult();
TStatus errorStatus = new TStatus(RUNTIME_ERROR);
String partitionNamePrefix = null;
boolean isTemp = false;
if (request.isSetIs_temp() && request.isIs_temp()) {
isTemp = true;
partitionNamePrefix = "txn" + request.getTxn_id();
}

Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
if (db == null) {
Expand Down Expand Up @@ -2202,7 +2208,7 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
try (AutoCloseableLock ignore = new AutoCloseableLock(new Locker(), db.getId(), Lists.newArrayList(table.getId()),
LockType.READ)) {
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable,
request.partition_values);
request.partition_values, isTemp, partitionNamePrefix);
PartitionDesc partitionDesc = addPartitionClause.getPartitionDesc();
if (partitionDesc instanceof RangePartitionDesc) {
partitionColNames = ((RangePartitionDesc) partitionDesc).getPartitionColNames();
Expand Down Expand Up @@ -2311,7 +2317,8 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
return buildCreatePartitionResponse(olapTable, txnState, partitions, tablets, partitionColNames);
return buildCreatePartitionResponse(
olapTable, txnState, partitions, tablets, partitionColNames, isTemp);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}
Expand All @@ -2322,7 +2329,8 @@ private static TCreatePartitionResult buildCreatePartitionResponse(OlapTable ola
TransactionState txnState,
List<TOlapTablePartition> partitions,
List<TTabletLocation> tablets,
List<String> partitionColNames) {
List<String> partitionColNames,
boolean isTemp) {
TCreatePartitionResult result = new TCreatePartitionResult();
TStatus errorStatus = new TStatus(RUNTIME_ERROR);
for (String partitionName : partitionColNames) {
Expand All @@ -2341,7 +2349,7 @@ private static TCreatePartitionResult buildCreatePartitionResponse(OlapTable ola
continue;
}

Partition partition = olapTable.getPartition(partitionName);
Partition partition = olapTable.getPartition(partitionName, isTemp);
tPartition = new TOlapTablePartition();
tPartition.setId(partition.getId());
buildPartitionInfo(olapTable, partitions, partition, tPartition, txnState);
Expand Down
Loading
Loading