Skip to content

Commit 7d87dfa

Browse files
committed
[Feature] Support dynamic overwrite for insert overwrite
Signed-off-by: meegoo <[email protected]>
1 parent 37858a0 commit 7d87dfa

File tree

20 files changed

+229
-26
lines changed

20 files changed

+229
-26
lines changed

be/src/exec/tablet_sink.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ Status OlapTableSink::init(const TDataSink& t_sink, RuntimeState* state) {
138138
_load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec;
139139
}
140140

141+
if (table_sink.__isset.dynamic_overwrite) {
142+
_dynamic_overwrite = table_sink.dynamic_overwrite;
143+
}
141144
if (table_sink.__isset.ignore_out_of_partition) {
142145
_ignore_out_of_partition = table_sink.ignore_out_of_partition;
143146
}
@@ -175,6 +178,7 @@ void OlapTableSink::_prepare_profile(RuntimeState* state) {
175178
_profile->add_info_string("ReplicatedStorage", fmt::format("{}", _enable_replicated_storage));
176179
_profile->add_info_string("AutomaticPartition", fmt::format("{}", _enable_automatic_partition));
177180
_profile->add_info_string("AutomaticBucketSize", fmt::format("{}", _automatic_bucket_size));
181+
_profile->add_info_string("DynamicOverwrite", fmt::format("{}", _dynamic_overwrite));
178182

179183
_ts_profile = state->obj_pool()->add(new TabletSinkProfile());
180184
_ts_profile->runtime_profile = _profile;
@@ -407,6 +411,9 @@ Status OlapTableSink::_automatic_create_partition() {
407411
request.__set_db_id(_vectorized_partition->db_id());
408412
request.__set_table_id(_vectorized_partition->table_id());
409413
request.__set_partition_values(_partition_not_exist_row_values);
414+
if (_dynamic_overwrite) {
415+
request.__set_is_temp(true);
416+
}
410417

411418
LOG(INFO) << "load_id=" << print_id(_load_id) << ", txn_id: " << std::to_string(_txn_id)
412419
<< " automatic partition rpc begin request " << request;

be/src/exec/tablet_sink.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ class OlapTableSink : public AsyncDataSink {
244244
std::unique_ptr<ThreadPoolToken> _automatic_partition_token;
245245
std::vector<std::vector<std::string>> _partition_not_exist_row_values;
246246
bool _enable_automatic_partition = false;
247+
bool _dynamic_overwrite = false;
247248
bool _has_automatic_partition = false;
248249
std::atomic<bool> _is_automatic_partition_running = false;
249250
Status _automatic_partition_status;

fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2718,6 +2718,32 @@ public void dropTempPartition(String partitionName, boolean needDropTablet) {
27182718
}
27192719
}
27202720

2721+
public void replaceMatchPartitions(List<String> tempPartitionNames) {
2722+
for (String partitionName : tempPartitionNames) {
2723+
Partition partition = tempPartitions.getPartition(partitionName);
2724+
if (partition != null) {
2725+
String oldPartitionName = "p" + partitionName.substring(partitionName.indexOf("_") + 1);
2726+
Partition oldPartition = nameToPartition.get(oldPartitionName);
2727+
if (oldPartition != null) {
2728+
// drop old partition
2729+
dropPartition(-1, oldPartitionName, true);
2730+
}
2731+
// add new partition
2732+
addPartition(partition);
2733+
// drop temp partition
2734+
tempPartitions.dropPartition(partitionName, false);
2735+
// move the range from idToTempRange to idToRange
2736+
partitionInfo.moveRangeFromTempToFormal(partition.getId());
2737+
// rename partition
2738+
renamePartition(partitionName, oldPartitionName);
2739+
}
2740+
}
2741+
2742+
for (Column column : getColumns()) {
2743+
IDictManager.getInstance().removeGlobalDict(this.getId(), column.getColumnId());
2744+
}
2745+
}
2746+
27212747
/*
27222748
* replace partitions in 'partitionNames' with partitions in
27232749
* 'tempPartitionNames'.

fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,21 @@ public class InsertOverwriteJob {
4545
@SerializedName(value = "sourcePartitionNames")
4646
private List<String> sourcePartitionNames;
4747

48+
@SerializedName(value = "dynamicOverwrite")
49+
private boolean dynamicOverwrite = false;
4850

4951
private transient InsertStmt insertStmt;
5052

5153
public InsertOverwriteJob(long jobId, InsertStmt insertStmt, long targetDbId,
52-
long targetTableId, long warehouseId) {
54+
long targetTableId, long warehouseId, boolean dynamicOverwrite) {
5355
this.jobId = jobId;
5456
this.insertStmt = insertStmt;
5557
this.sourcePartitionIds = insertStmt.getTargetPartitionIds();
5658
this.jobState = InsertOverwriteJobState.OVERWRITE_PENDING;
5759
this.targetDbId = targetDbId;
5860
this.targetTableId = targetTableId;
5961
this.warehouseId = warehouseId;
62+
this.dynamicOverwrite = dynamicOverwrite;
6063
}
6164

6265
// used to replay InsertOverwriteJob
@@ -128,4 +131,8 @@ public InsertStmt getInsertStmt() {
128131
public long getWarehouseId() {
129132
return warehouseId;
130133
}
134+
135+
public boolean isDynamicOverwrite() {
136+
return dynamicOverwrite;
137+
}
131138
}

fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.starrocks.sql.ast.RangePartitionDesc;
5252
import com.starrocks.sql.common.DmlException;
5353
import com.starrocks.sql.plan.ExecPlan;
54+
import com.starrocks.transaction.TransactionState;
5455
import org.apache.logging.log4j.LogManager;
5556
import org.apache.logging.log4j.Logger;
5657

@@ -255,6 +256,9 @@ private void createPartitionByValue(InsertStmt insertStmt) {
255256
if (insertStmt.getTargetPartitionNames() == null) {
256257
return;
257258
}
259+
if (insertStmt.isDynamicOverwrite()) {
260+
return;
261+
}
258262
OlapTable olapTable = (OlapTable) insertStmt.getTargetTable();
259263
List<List<String>> partitionValues = Lists.newArrayList();
260264
if (!olapTable.getPartitionInfo().isAutomaticPartition()) {
@@ -300,7 +304,7 @@ private void createPartitionByValue(InsertStmt insertStmt) {
300304
}
301305

302306
try {
303-
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues);
307+
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues, false, null);
304308
} catch (AnalysisException ex) {
305309
LOG.warn(ex.getMessage(), ex);
306310
throw new RuntimeException(ex);
@@ -358,6 +362,9 @@ private void executeInsert() throws Exception {
358362
}
359363

360364
private void createTempPartitions() throws DdlException {
365+
if (job.isDynamicOverwrite()) {
366+
return;
367+
}
361368
long createPartitionStartTimestamp = System.currentTimeMillis();
362369
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
363370
if (db == null) {
@@ -470,7 +477,17 @@ private void doCommit(boolean isReplay) {
470477

471478
PartitionInfo partitionInfo = targetTable.getPartitionInfo();
472479
if (partitionInfo.isRangePartition() || partitionInfo.getType() == PartitionType.LIST) {
473-
targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false);
480+
if (job.isDynamicOverwrite()) {
481+
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
482+
.getTransactionState(dbId, insertStmt.getTxnId());
483+
if (txnState == null) {
484+
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
485+
}
486+
tmpPartitionNames = txnState.getCreatedPartitionNames();
487+
targetTable.replaceMatchPartitions(tmpPartitionNames);
488+
} else {
489+
targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false);
490+
}
474491
} else if (partitionInfo instanceof SinglePartitionInfo) {
475492
targetTable.replacePartition(sourcePartitionNames.get(0), tmpPartitionNames.get(0));
476493
} else {
@@ -535,8 +552,10 @@ private void prepareInsert() {
535552
if (insertStmt.getTargetPartitionNames() == null) {
536553
insertStmt.setPartitionNotSpecifiedInOverwrite(true);
537554
}
538-
insertStmt.setTargetPartitionNames(partitionNames);
539-
insertStmt.setTargetPartitionIds(job.getTmpPartitionIds());
555+
if (!job.isDynamicOverwrite()) {
556+
insertStmt.setTargetPartitionNames(partitionNames);
557+
insertStmt.setTargetPartitionIds(job.getTmpPartitionIds());
558+
}
540559
insertStmt.setOverwrite(false);
541560
insertStmt.setSystem(true);
542561

fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public class OlapTableSink extends DataSink {
143143
private TPartialUpdateMode partialUpdateMode;
144144
private long warehouseId = WarehouseManager.DEFAULT_WAREHOUSE_ID;
145145
private long automaticBucketSize = 0;
146+
private boolean enableDynamicOverwrite = false;
146147

147148
public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
148149
TWriteQuorumType writeQuorum, boolean enableReplicatedStorage,
@@ -229,6 +230,10 @@ public void setPartialUpdateMode(TPartialUpdateMode mode) {
229230
this.partialUpdateMode = mode;
230231
}
231232

233+
public void setDynamicOverwrite(boolean enableDynamicOverwrite) {
234+
this.enableDynamicOverwrite = enableDynamicOverwrite;
235+
}
236+
232237
public void complete(String mergeCondition) throws UserException {
233238
TOlapTableSink tSink = tDataSink.getOlap_table_sink();
234239
if (mergeCondition != null && !mergeCondition.isEmpty()) {
@@ -238,6 +243,10 @@ public void complete(String mergeCondition) throws UserException {
238243
}
239244

240245
public List<Long> getOpenPartitions() {
246+
if (enableAutomaticPartition && enableDynamicOverwrite) {
247+
return new ArrayList<>(Collections.singletonList(
248+
dstTable.getPartition(ExpressionRangePartitionInfo.AUTOMATIC_SHADOW_PARTITION_NAME).getId()));
249+
}
241250
if (!enableAutomaticPartition || Config.max_load_initial_open_partition_number <= 0
242251
|| partitionIds.size() < Config.max_load_initial_open_partition_number) {
243252
return partitionIds;
@@ -267,6 +276,11 @@ public void complete() throws UserException {
267276
long partitionId = optionalPartition.get().getId();
268277
numReplicas = dstTable.getPartitionInfo().getReplicationNum(partitionId);
269278
}
279+
if (enableAutomaticPartition && enableDynamicOverwrite) {
280+
tSink.setDynamic_overwrite(true);
281+
}
282+
LOG.info("enableAutomaticPartition:{}, enableDynamicOverwrite:{}, dynamic_overwrite:{}",
283+
enableAutomaticPartition, enableDynamicOverwrite, tSink.isDynamic_overwrite());
270284
tSink.setNum_replicas(numReplicas);
271285
tSink.setNeed_gen_rollup(dstTable.shouldLoadToNewRollup());
272286
tSink.setSchema(createSchema(tSink.getDb_id(), dstTable, tupleDescriptor));

fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
191191
public static final String MAX_PARALLEL_SCAN_INSTANCE_NUM = "max_parallel_scan_instance_num";
192192
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
193193
public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio";
194+
public static final String DYNAMIC_OVERWRITE = "dynamic_overwrite";
194195
public static final String ENABLE_SPILL = "enable_spill";
195196
public static final String ENABLE_SPILL_TO_REMOTE_STORAGE = "enable_spill_to_remote_storage";
196197
public static final String DISABLE_SPILL_TO_LOCAL_DISK = "disable_spill_to_local_disk";
@@ -1139,6 +1140,9 @@ public static MaterializedViewRewriteMode parse(String str) {
11391140
@VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT)
11401141
private boolean enableInsertStrict = true;
11411142

1143+
@VariableMgr.VarAttr(name = DYNAMIC_OVERWRITE)
1144+
private boolean dynamicOverwrite = false;
1145+
11421146
@VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO)
11431147
private double insertMaxFilterRatio = 0;
11441148

@@ -2769,6 +2773,14 @@ public void setEnableInsertStrict(boolean enableInsertStrict) {
27692773
this.enableInsertStrict = enableInsertStrict;
27702774
}
27712775

2776+
public boolean isDynamicOverwrite() {
2777+
return dynamicOverwrite;
2778+
}
2779+
2780+
public void setDynamicOverwrite(boolean dynamicOverwrite) {
2781+
this.dynamicOverwrite = dynamicOverwrite;
2782+
}
2783+
27722784
public double getInsertMaxFilterRatio() {
27732785
return insertMaxFilterRatio;
27742786
}

fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2035,7 +2035,8 @@ public void handleInsertOverwrite(InsertStmt insertStmt) throws Exception {
20352035
}
20362036
OlapTable olapTable = (OlapTable) insertStmt.getTargetTable();
20372037
InsertOverwriteJob job = new InsertOverwriteJob(GlobalStateMgr.getCurrentState().getNextId(),
2038-
insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId());
2038+
insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId(),
2039+
insertStmt.isDynamicOverwrite());
20392040
if (!locker.lockDatabaseAndCheckExist(db, LockType.WRITE)) {
20402041
throw new DmlException("database:%s does not exist.", db.getFullName());
20412042
}

fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2145,6 +2145,12 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
21452145
long tableId = request.getTable_id();
21462146
TCreatePartitionResult result = new TCreatePartitionResult();
21472147
TStatus errorStatus = new TStatus(RUNTIME_ERROR);
2148+
String partitionNamePrefix = null;
2149+
boolean isTemp = false;
2150+
if (request.isSetIs_temp() && request.isIs_temp()) {
2151+
isTemp = true;
2152+
partitionNamePrefix = "txn" + request.getTxn_id() + "_";
2153+
}
21482154

21492155
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
21502156
if (db == null) {
@@ -2185,7 +2191,7 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
21852191
try (AutoCloseableLock ignore = new AutoCloseableLock(new Locker(), db.getId(), Lists.newArrayList(table.getId()),
21862192
LockType.READ)) {
21872193
addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable,
2188-
request.partition_values);
2194+
request.partition_values, isTemp, partitionNamePrefix);
21892195
PartitionDesc partitionDesc = addPartitionClause.getPartitionDesc();
21902196
if (partitionDesc instanceof RangePartitionDesc) {
21912197
partitionColNames = ((RangePartitionDesc) partitionDesc).getPartitionColNames();
@@ -2294,7 +2300,8 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
22942300
Locker locker = new Locker();
22952301
locker.lockDatabase(db.getId(), LockType.READ);
22962302
try {
2297-
return buildCreatePartitionResponse(olapTable, txnState, partitions, tablets, partitionColNames);
2303+
return buildCreatePartitionResponse(
2304+
olapTable, txnState, partitions, tablets, partitionColNames, isTemp);
22982305
} finally {
22992306
locker.unLockDatabase(db.getId(), LockType.READ);
23002307
}
@@ -2305,7 +2312,8 @@ private static TCreatePartitionResult buildCreatePartitionResponse(OlapTable ola
23052312
TransactionState txnState,
23062313
List<TOlapTablePartition> partitions,
23072314
List<TTabletLocation> tablets,
2308-
List<String> partitionColNames) {
2315+
List<String> partitionColNames,
2316+
boolean isTemp) {
23092317
TCreatePartitionResult result = new TCreatePartitionResult();
23102318
TStatus errorStatus = new TStatus(RUNTIME_ERROR);
23112319
for (String partitionName : partitionColNames) {
@@ -2324,7 +2332,7 @@ private static TCreatePartitionResult buildCreatePartitionResponse(OlapTable ola
23242332
continue;
23252333
}
23262334

2327-
Partition partition = olapTable.getPartition(partitionName);
2335+
Partition partition = olapTable.getPartition(partitionName, isTemp);
23282336
tPartition = new TOlapTablePartition();
23292337
tPartition.setId(partition.getId());
23302338
buildPartitionInfo(olapTable, partitions, partition, tPartition, txnState);

fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,9 @@ public ExecPlan plan(InsertStmt insertStmt, ConnectContext session) {
344344
if (insertStmt.isSystem() && insertStmt.isPartitionNotSpecifiedInOverwrite()) {
345345
Preconditions.checkState(!CollectionUtils.isEmpty(targetPartitionIds));
346346
enableAutomaticPartition = olapTable.supportedAutomaticPartition();
347+
} else if (insertStmt.isDynamicOverwrite()) {
348+
Preconditions.checkState(CollectionUtils.isEmpty(targetPartitionIds));
349+
enableAutomaticPartition = olapTable.supportedAutomaticPartition();
347350
} else if (insertStmt.isSpecifyPartitionNames()) {
348351
Preconditions.checkState(!CollectionUtils.isEmpty(targetPartitionIds));
349352
enableAutomaticPartition = false;
@@ -384,6 +387,14 @@ public ExecPlan plan(InsertStmt insertStmt, ConnectContext session) {
384387
if (olapTable.getAutomaticBucketSize() > 0) {
385388
((OlapTableSink) dataSink).setAutomaticBucketSize(olapTable.getAutomaticBucketSize());
386389
}
390+
LOG.info("isSystem: {}, isPartitionNotSpecifiedInOverwrite: {}, isDynamicOverwrite: {}, " +
391+
"isSpecifyPartitionNames: {}, isStaticKeyPartitionInsert: {}, enableAutomaticPartition: {}",
392+
insertStmt.isSystem(), insertStmt.isPartitionNotSpecifiedInOverwrite(),
393+
insertStmt.isDynamicOverwrite(), insertStmt.isSpecifyPartitionNames(),
394+
insertStmt.isStaticKeyPartitionInsert(), enableAutomaticPartition);
395+
if (insertStmt.isDynamicOverwrite()) {
396+
((OlapTableSink) dataSink).setDynamicOverwrite(true);
397+
}
387398

388399
// if sink is OlapTableSink Assigned to Be execute this sql [cn execute OlapTableSink will crash]
389400
session.getSessionVariable().setPreferComputeNode(false);

0 commit comments

Comments
 (0)