Skip to content

Commit 8b742a6

Browse files
committed
[Feature] Support dynamic overwrite for insert overwrite
Signed-off-by: meegoo <meegoo.sr@gmail.com>
1 parent 37858a0 commit 8b742a6

24 files changed

+270
-38
lines changed

be/src/exec/tablet_sink.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ Status OlapTableSink::init(const TDataSink& t_sink, RuntimeState* state) {
138138
_load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec;
139139
}
140140

141+
if (table_sink.__isset.dynamic_overwrite) {
142+
_dynamic_overwrite = table_sink.dynamic_overwrite;
143+
}
141144
if (table_sink.__isset.ignore_out_of_partition) {
142145
_ignore_out_of_partition = table_sink.ignore_out_of_partition;
143146
}
@@ -175,6 +178,7 @@ void OlapTableSink::_prepare_profile(RuntimeState* state) {
175178
_profile->add_info_string("ReplicatedStorage", fmt::format("{}", _enable_replicated_storage));
176179
_profile->add_info_string("AutomaticPartition", fmt::format("{}", _enable_automatic_partition));
177180
_profile->add_info_string("AutomaticBucketSize", fmt::format("{}", _automatic_bucket_size));
181+
_profile->add_info_string("DynamicOverwrite", fmt::format("{}", _dynamic_overwrite));
178182

179183
_ts_profile = state->obj_pool()->add(new TabletSinkProfile());
180184
_ts_profile->runtime_profile = _profile;
@@ -407,6 +411,9 @@ Status OlapTableSink::_automatic_create_partition() {
407411
request.__set_db_id(_vectorized_partition->db_id());
408412
request.__set_table_id(_vectorized_partition->table_id());
409413
request.__set_partition_values(_partition_not_exist_row_values);
414+
if (_dynamic_overwrite) {
415+
request.__set_is_temp(true);
416+
}
410417

411418
LOG(INFO) << "load_id=" << print_id(_load_id) << ", txn_id: " << std::to_string(_txn_id)
412419
<< " automatic partition rpc begin request " << request;

be/src/exec/tablet_sink.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ class OlapTableSink : public AsyncDataSink {
244244
std::unique_ptr<ThreadPoolToken> _automatic_partition_token;
245245
std::vector<std::vector<std::string>> _partition_not_exist_row_values;
246246
bool _enable_automatic_partition = false;
247+
bool _dynamic_overwrite = false;
247248
bool _has_automatic_partition = false;
248249
std::atomic<bool> _is_automatic_partition_running = false;
249250
Status _automatic_partition_status;

fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2718,6 +2718,32 @@ public void dropTempPartition(String partitionName, boolean needDropTablet) {
27182718
}
27192719
}
27202720

2721+
public void replaceMatchPartitions(List<String> tempPartitionNames) {
2722+
for (String partitionName : tempPartitionNames) {
2723+
Partition partition = tempPartitions.getPartition(partitionName);
2724+
if (partition != null) {
2725+
String oldPartitionName = "p" + partitionName.substring(partitionName.indexOf("_") + 1);
2726+
Partition oldPartition = nameToPartition.get(oldPartitionName);
2727+
if (oldPartition != null) {
2728+
// drop old partition
2729+
dropPartition(-1, oldPartitionName, true);
2730+
}
2731+
// add new partition
2732+
addPartition(partition);
2733+
// drop temp partition
2734+
tempPartitions.dropPartition(partitionName, false);
2735+
// move the range from idToTempRange to idToRange
2736+
partitionInfo.moveRangeFromTempToFormal(partition.getId());
2737+
// rename partition
2738+
renamePartition(partitionName, oldPartitionName);
2739+
}
2740+
}
2741+
2742+
for (Column column : getColumns()) {
2743+
IDictManager.getInstance().removeGlobalDict(this.getId(), column.getColumnId());
2744+
}
2745+
}
2746+
27212747
/*
27222748
* replace partitions in 'partitionNames' with partitions in
27232749
* 'tempPartitionNames'.

fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,27 +45,32 @@ public class InsertOverwriteJob {
4545
@SerializedName(value = "sourcePartitionNames")
4646
private List<String> sourcePartitionNames;
4747

48+
@SerializedName(value = "dynamicOverwrite")
49+
private boolean dynamicOverwrite = false;
4850

4951
private transient InsertStmt insertStmt;
5052

5153
public InsertOverwriteJob(long jobId, InsertStmt insertStmt, long targetDbId,
52-
long targetTableId, long warehouseId) {
54+
long targetTableId, long warehouseId, boolean dynamicOverwrite) {
5355
this.jobId = jobId;
5456
this.insertStmt = insertStmt;
5557
this.sourcePartitionIds = insertStmt.getTargetPartitionIds();
5658
this.jobState = InsertOverwriteJobState.OVERWRITE_PENDING;
5759
this.targetDbId = targetDbId;
5860
this.targetTableId = targetTableId;
5961
this.warehouseId = warehouseId;
62+
this.dynamicOverwrite = dynamicOverwrite;
6063
}
6164

6265
// used to replay InsertOverwriteJob
63-
public InsertOverwriteJob(long jobId, long targetDbId, long targetTableId, List<Long> sourcePartitionIds) {
66+
public InsertOverwriteJob(long jobId, long targetDbId, long targetTableId,
67+
List<Long> sourcePartitionIds, boolean dynamicOverwrite) {
6468
this.jobId = jobId;
6569
this.targetDbId = targetDbId;
6670
this.targetTableId = targetTableId;
6771
this.sourcePartitionIds = sourcePartitionIds;
6872
this.jobState = InsertOverwriteJobState.OVERWRITE_PENDING;
73+
this.dynamicOverwrite = dynamicOverwrite;
6974
}
7075

7176
public long getJobId() {
@@ -128,4 +133,8 @@ public InsertStmt getInsertStmt() {
128133
public long getWarehouseId() {
129134
return warehouseId;
130135
}
136+
137+
public boolean isDynamicOverwrite() {
138+
return dynamicOverwrite;
139+
}
131140
}

fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public boolean hasRunningOverwriteJob(long tableId) {
149149

150150
public void replayCreateInsertOverwrite(CreateInsertOverwriteJobLog jobInfo) {
151151
InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(jobInfo.getJobId(),
152-
jobInfo.getDbId(), jobInfo.getTableId(), jobInfo.getTargetPartitionIds());
152+
jobInfo.getDbId(), jobInfo.getTableId(), jobInfo.getTargetPartitionIds(), jobInfo.isDynamicOverwrite());
153153
boolean registered = registerOverwriteJob(insertOverwriteJob);
154154
if (!registered) {
155155
LOG.warn("register insert overwrite job failed. jobId:{}", insertOverwriteJob.getJobId());

fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.starrocks.sql.ast.RangePartitionDesc;
5252
import com.starrocks.sql.common.DmlException;
5353
import com.starrocks.sql.plan.ExecPlan;
54+
import com.starrocks.transaction.TransactionState;
5455
import org.apache.logging.log4j.LogManager;
5556
import org.apache.logging.log4j.Logger;
5657

@@ -255,6 +256,9 @@ private void createPartitionByValue(InsertStmt insertStmt) {
255256
if (insertStmt.getTargetPartitionNames() == null) {
256257
return;
257258
}
259+
if (insertStmt.isDynamicOverwrite()) {
260+
return;
261+
}
258262
OlapTable olapTable = (OlapTable) insertStmt.getTargetTable();
259263
List<List<String>> partitionValues = Lists.newArrayList();
260264
if (!olapTable.getPartitionInfo().isAutomaticPartition()) {
@@ -300,7 +304,7 @@ private void createPartitionByValue(InsertStmt insertStmt) {
300304
}
301305

302306
try {
303-
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues);
307+
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues, false, null);
304308
} catch (AnalysisException ex) {
305309
LOG.warn(ex.getMessage(), ex);
306310
throw new RuntimeException(ex);
@@ -358,6 +362,9 @@ private void executeInsert() throws Exception {
358362
}
359363

360364
private void createTempPartitions() throws DdlException {
365+
if (job.isDynamicOverwrite()) {
366+
return;
367+
}
361368
long createPartitionStartTimestamp = System.currentTimeMillis();
362369
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
363370
if (db == null) {
@@ -414,6 +421,24 @@ private void gc(boolean isReplay) {
414421
}
415422
}
416423
}
424+
// if dynamic overwrite, drop all runtime created partitions
425+
if (job.isDynamicOverwrite()) {
426+
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
427+
.getTransactionState(dbId, insertStmt.getTxnId());
428+
if (txnState == null) {
429+
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
430+
}
431+
List<String> tmpPartitionNames = txnState.getCreatedPartitionNames();
432+
for (String partitionName : tmpPartitionNames) {
433+
Partition partition = targetTable.getPartition(partitionName, true);
434+
if (partition != null) {
435+
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
436+
sourceTablets.addAll(index.getTablets());
437+
}
438+
targetTable.dropTempPartition(partitionName, true);
439+
}
440+
}
441+
}
417442
if (!isReplay) {
418443
// mark all source tablet ids force delete to drop it directly on BE,
419444
// not to move it to trash
@@ -470,7 +495,17 @@ private void doCommit(boolean isReplay) {
470495

471496
PartitionInfo partitionInfo = targetTable.getPartitionInfo();
472497
if (partitionInfo.isRangePartition() || partitionInfo.getType() == PartitionType.LIST) {
473-
targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false);
498+
if (job.isDynamicOverwrite()) {
499+
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
500+
.getTransactionState(dbId, insertStmt.getTxnId());
501+
if (txnState == null) {
502+
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
503+
}
504+
tmpPartitionNames = txnState.getCreatedPartitionNames();
505+
targetTable.replaceMatchPartitions(tmpPartitionNames);
506+
} else {
507+
targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false);
508+
}
474509
} else if (partitionInfo instanceof SinglePartitionInfo) {
475510
targetTable.replacePartition(sourcePartitionNames.get(0), tmpPartitionNames.get(0));
476511
} else {
@@ -535,8 +570,10 @@ private void prepareInsert() {
535570
if (insertStmt.getTargetPartitionNames() == null) {
536571
insertStmt.setPartitionNotSpecifiedInOverwrite(true);
537572
}
538-
insertStmt.setTargetPartitionNames(partitionNames);
539-
insertStmt.setTargetPartitionIds(job.getTmpPartitionIds());
573+
if (!job.isDynamicOverwrite()) {
574+
insertStmt.setTargetPartitionNames(partitionNames);
575+
insertStmt.setTargetPartitionIds(job.getTmpPartitionIds());
576+
}
540577
insertStmt.setOverwrite(false);
541578
insertStmt.setSystem(true);
542579

fe/fe-core/src/main/java/com/starrocks/persist/CreateInsertOverwriteJobLog.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ public class CreateInsertOverwriteJobLog implements Writable {
3737
@SerializedName(value = "targetPartitionIds")
3838
private List<Long> targetPartitionIds;
3939

40-
public CreateInsertOverwriteJobLog(long jobId, long dbId, long tableId, List<Long> targetPartitionIds) {
40+
@SerializedName(value = "dynamicOverwrite")
41+
private boolean dynamicOverwrite = false;
42+
43+
public CreateInsertOverwriteJobLog(long jobId, long dbId, long tableId,
44+
List<Long> targetPartitionIds, boolean dynamicOverwrite) {
4145
this.jobId = jobId;
4246
this.dbId = dbId;
4347
this.tableId = tableId;
@@ -60,6 +64,10 @@ public List<Long> getTargetPartitionIds() {
6064
return targetPartitionIds;
6165
}
6266

67+
public boolean isDynamicOverwrite() {
68+
return dynamicOverwrite;
69+
}
70+
6371
@Override
6472
public String toString() {
6573
return "CreateInsertOverwriteJobInfo{" +

fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public class OlapTableSink extends DataSink {
143143
private TPartialUpdateMode partialUpdateMode;
144144
private long warehouseId = WarehouseManager.DEFAULT_WAREHOUSE_ID;
145145
private long automaticBucketSize = 0;
146+
private boolean enableDynamicOverwrite = false;
146147

147148
public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
148149
TWriteQuorumType writeQuorum, boolean enableReplicatedStorage,
@@ -229,6 +230,10 @@ public void setPartialUpdateMode(TPartialUpdateMode mode) {
229230
this.partialUpdateMode = mode;
230231
}
231232

233+
public void setDynamicOverwrite(boolean enableDynamicOverwrite) {
234+
this.enableDynamicOverwrite = enableDynamicOverwrite;
235+
}
236+
232237
public void complete(String mergeCondition) throws UserException {
233238
TOlapTableSink tSink = tDataSink.getOlap_table_sink();
234239
if (mergeCondition != null && !mergeCondition.isEmpty()) {
@@ -238,6 +243,10 @@ public void complete(String mergeCondition) throws UserException {
238243
}
239244

240245
public List<Long> getOpenPartitions() {
246+
if (enableAutomaticPartition && enableDynamicOverwrite) {
247+
return new ArrayList<>(Collections.singletonList(
248+
dstTable.getPartition(ExpressionRangePartitionInfo.AUTOMATIC_SHADOW_PARTITION_NAME).getId()));
249+
}
241250
if (!enableAutomaticPartition || Config.max_load_initial_open_partition_number <= 0
242251
|| partitionIds.size() < Config.max_load_initial_open_partition_number) {
243252
return partitionIds;
@@ -267,6 +276,11 @@ public void complete() throws UserException {
267276
long partitionId = optionalPartition.get().getId();
268277
numReplicas = dstTable.getPartitionInfo().getReplicationNum(partitionId);
269278
}
279+
if (enableAutomaticPartition && enableDynamicOverwrite) {
280+
tSink.setDynamic_overwrite(true);
281+
}
282+
LOG.info("enableAutomaticPartition:{}, enableDynamicOverwrite:{}, dynamic_overwrite:{}",
283+
enableAutomaticPartition, enableDynamicOverwrite, tSink.isDynamic_overwrite());
270284
tSink.setNum_replicas(numReplicas);
271285
tSink.setNeed_gen_rollup(dstTable.shouldLoadToNewRollup());
272286
tSink.setSchema(createSchema(tSink.getDb_id(), dstTable, tupleDescriptor));

fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
191191
public static final String MAX_PARALLEL_SCAN_INSTANCE_NUM = "max_parallel_scan_instance_num";
192192
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
193193
public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio";
194+
public static final String DYNAMIC_OVERWRITE = "dynamic_overwrite";
194195
public static final String ENABLE_SPILL = "enable_spill";
195196
public static final String ENABLE_SPILL_TO_REMOTE_STORAGE = "enable_spill_to_remote_storage";
196197
public static final String DISABLE_SPILL_TO_LOCAL_DISK = "disable_spill_to_local_disk";
@@ -1139,6 +1140,9 @@ public static MaterializedViewRewriteMode parse(String str) {
11391140
@VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT)
11401141
private boolean enableInsertStrict = true;
11411142

1143+
@VariableMgr.VarAttr(name = DYNAMIC_OVERWRITE)
1144+
private boolean dynamicOverwrite = false;
1145+
11421146
@VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO)
11431147
private double insertMaxFilterRatio = 0;
11441148

@@ -2769,6 +2773,14 @@ public void setEnableInsertStrict(boolean enableInsertStrict) {
27692773
this.enableInsertStrict = enableInsertStrict;
27702774
}
27712775

2776+
public boolean isDynamicOverwrite() {
2777+
return dynamicOverwrite;
2778+
}
2779+
2780+
public void setDynamicOverwrite(boolean dynamicOverwrite) {
2781+
this.dynamicOverwrite = dynamicOverwrite;
2782+
}
2783+
27722784
public double getInsertMaxFilterRatio() {
27732785
return insertMaxFilterRatio;
27742786
}

fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2035,14 +2035,16 @@ public void handleInsertOverwrite(InsertStmt insertStmt) throws Exception {
20352035
}
20362036
OlapTable olapTable = (OlapTable) insertStmt.getTargetTable();
20372037
InsertOverwriteJob job = new InsertOverwriteJob(GlobalStateMgr.getCurrentState().getNextId(),
2038-
insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId());
2038+
insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId(),
2039+
insertStmt.isDynamicOverwrite());
20392040
if (!locker.lockDatabaseAndCheckExist(db, LockType.WRITE)) {
20402041
throw new DmlException("database:%s does not exist.", db.getFullName());
20412042
}
20422043
try {
20432044
// add an edit log
20442045
CreateInsertOverwriteJobLog info = new CreateInsertOverwriteJobLog(job.getJobId(),
2045-
job.getTargetDbId(), job.getTargetTableId(), job.getSourcePartitionIds());
2046+
job.getTargetDbId(), job.getTargetTableId(), job.getSourcePartitionIds(),
2047+
job.isDynamicOverwrite());
20462048
GlobalStateMgr.getCurrentState().getEditLog().logCreateInsertOverwrite(info);
20472049
} finally {
20482050
locker.unLockDatabase(db.getId(), LockType.WRITE);

0 commit comments

Comments
 (0)