diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index fcadb9b2c9cda..3c98b49fa4ac1 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -138,6 +138,9 @@ Status OlapTableSink::init(const TDataSink& t_sink, RuntimeState* state) { _load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; } + if (table_sink.__isset.dynamic_overwrite) { + _dynamic_overwrite = table_sink.dynamic_overwrite; + } if (table_sink.__isset.ignore_out_of_partition) { _ignore_out_of_partition = table_sink.ignore_out_of_partition; } @@ -175,6 +178,7 @@ void OlapTableSink::_prepare_profile(RuntimeState* state) { _profile->add_info_string("ReplicatedStorage", fmt::format("{}", _enable_replicated_storage)); _profile->add_info_string("AutomaticPartition", fmt::format("{}", _enable_automatic_partition)); _profile->add_info_string("AutomaticBucketSize", fmt::format("{}", _automatic_bucket_size)); + _profile->add_info_string("DynamicOverwrite", fmt::format("{}", _dynamic_overwrite)); _ts_profile = state->obj_pool()->add(new TabletSinkProfile()); _ts_profile->runtime_profile = _profile; @@ -407,6 +411,9 @@ Status OlapTableSink::_automatic_create_partition() { request.__set_db_id(_vectorized_partition->db_id()); request.__set_table_id(_vectorized_partition->table_id()); request.__set_partition_values(_partition_not_exist_row_values); + if (_dynamic_overwrite) { + request.__set_is_temp(true); + } LOG(INFO) << "load_id=" << print_id(_load_id) << ", txn_id: " << std::to_string(_txn_id) << " automatic partition rpc begin request " << request; diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 936352b4cdcde..7b8ce5d0a0133 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -244,6 +244,7 @@ class OlapTableSink : public AsyncDataSink { std::unique_ptr _automatic_partition_token; std::vector> _partition_not_exist_row_values; bool _enable_automatic_partition = false; + bool _dynamic_overwrite = false; bool _has_automatic_partition = false; std::atomic _is_automatic_partition_running = false; Status _automatic_partition_status; diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java index 4098250970e85..2667e2581fc7c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java @@ -2721,6 +2721,33 @@ public void dropTempPartition(String partitionName, boolean needDropTablet) { } } + public void replaceMatchPartitions(List tempPartitionNames) { + for (String partitionName : tempPartitionNames) { + Partition partition = tempPartitions.getPartition(partitionName); + if (partition != null) { + String oldPartitionName = partitionName.substring( + partitionName.indexOf(AnalyzerUtils.PARTITION_NAME_PREFIX_SPLIT) + 1); + Partition oldPartition = nameToPartition.get(oldPartitionName); + if (oldPartition != null) { + // drop old partition + dropPartition(-1, oldPartitionName, true); + } + // add new partition + addPartition(partition); + // drop temp partition + tempPartitions.dropPartition(partitionName, false); + // move the range from idToTempRange to idToRange + partitionInfo.moveRangeFromTempToFormal(partition.getId()); + // rename partition + renamePartition(partitionName, oldPartitionName); + } + } + + for (Column column : getColumns()) { + IDictManager.getInstance().removeGlobalDict(this.getId(), column.getColumnId()); + } + } + /* * replace partitions in 'partitionNames' with partitions in * 'tempPartitionNames'. diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java index e6715231debac..fda7dd8ffb8ad 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java @@ -45,11 +45,16 @@ public class InsertOverwriteJob { @SerializedName(value = "sourcePartitionNames") private List sourcePartitionNames; + @SerializedName(value = "dynamicOverwrite") + private boolean dynamicOverwrite = false; private transient InsertStmt insertStmt; + public InsertOverwriteJob() { + } + public InsertOverwriteJob(long jobId, InsertStmt insertStmt, long targetDbId, - long targetTableId, long warehouseId) { + long targetTableId, long warehouseId, boolean dynamicOverwrite) { this.jobId = jobId; this.insertStmt = insertStmt; this.sourcePartitionIds = insertStmt.getTargetPartitionIds(); @@ -57,15 +62,18 @@ public InsertOverwriteJob(long jobId, InsertStmt insertStmt, long targetDbId, this.targetDbId = targetDbId; this.targetTableId = targetTableId; this.warehouseId = warehouseId; + this.dynamicOverwrite = dynamicOverwrite; } // used to replay InsertOverwriteJob - public InsertOverwriteJob(long jobId, long targetDbId, long targetTableId, List sourcePartitionIds) { + public InsertOverwriteJob(long jobId, long targetDbId, long targetTableId, + List sourcePartitionIds, boolean dynamicOverwrite) { this.jobId = jobId; this.targetDbId = targetDbId; this.targetTableId = targetTableId; this.sourcePartitionIds = sourcePartitionIds; this.jobState = InsertOverwriteJobState.OVERWRITE_PENDING; + this.dynamicOverwrite = dynamicOverwrite; } public long getJobId() { @@ -128,4 +136,8 @@ public InsertStmt getInsertStmt() { public long getWarehouseId() { return warehouseId; } + + public boolean isDynamicOverwrite() { + return dynamicOverwrite; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java index 87900b468ef32..bb017fd5a5962 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java @@ -149,7 +149,7 @@ public boolean hasRunningOverwriteJob(long tableId) { public void replayCreateInsertOverwrite(CreateInsertOverwriteJobLog jobInfo) { InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(jobInfo.getJobId(), - jobInfo.getDbId(), jobInfo.getTableId(), jobInfo.getTargetPartitionIds()); + jobInfo.getDbId(), jobInfo.getTableId(), jobInfo.getTargetPartitionIds(), jobInfo.isDynamicOverwrite()); boolean registered = registerOverwriteJob(insertOverwriteJob); if (!registered) { LOG.warn("register insert overwrite job failed. jobId:{}", insertOverwriteJob.getJobId()); diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java index f59fe35a912f8..40898400a0b18 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java @@ -260,6 +260,9 @@ private void createPartitionByValue(InsertStmt insertStmt) { if (insertStmt.getTargetPartitionNames() == null) { return; } + if (insertStmt.isDynamicOverwrite()) { + return; + } OlapTable olapTable = (OlapTable) insertStmt.getTargetTable(); List> partitionValues = Lists.newArrayList(); if (!olapTable.getPartitionInfo().isAutomaticPartition()) { @@ -305,7 +308,7 @@ private void createPartitionByValue(InsertStmt insertStmt) { } try { - addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues); + addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, partitionValues, false, null); } catch (AnalysisException ex) { LOG.warn(ex.getMessage(), ex); throw new RuntimeException(ex); @@ -363,6 +366,9 @@ private void executeInsert() throws Exception { } private void createTempPartitions() throws DdlException { + if (job.isDynamicOverwrite()) { + return; + } long createPartitionStartTimestamp = System.currentTimeMillis(); Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId); if (db == null) { @@ -421,6 +427,24 @@ private void gc(boolean isReplay) { } } } + // if dynamic overwrite, drop all runtime created partitions + if (job.isDynamicOverwrite()) { + TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr() + .getTransactionState(dbId, insertStmt.getTxnId()); + if (txnState == null) { + throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId()); + } + List tmpPartitionNames = txnState.getCreatedPartitionNames(); + for (String partitionName : tmpPartitionNames) { + Partition partition = targetTable.getPartition(partitionName, true); + if (partition != null) { + for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { + sourceTablets.addAll(index.getTablets()); + } + targetTable.dropTempPartition(partitionName, true); + } + } + } if (!isReplay) { // mark all source tablet ids force delete to drop it directly on BE, // not to move it to trash @@ -486,7 +510,17 @@ private void doCommit(boolean isReplay) { PartitionInfo partitionInfo = targetTable.getPartitionInfo(); if (partitionInfo.isRangePartition() || partitionInfo.getType() == PartitionType.LIST) { - targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false); + if (job.isDynamicOverwrite()) { + TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr() + .getTransactionState(dbId, insertStmt.getTxnId()); + if (txnState == null) { + throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId()); + } + tmpPartitionNames = txnState.getCreatedPartitionNames(); + targetTable.replaceMatchPartitions(tmpPartitionNames); + } else { + targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false); + } } else if (partitionInfo instanceof SinglePartitionInfo) { targetTable.replacePartition(sourcePartitionNames.get(0), tmpPartitionNames.get(0)); } else { @@ -559,8 +593,10 @@ private void prepareInsert() { if (insertStmt.getTargetPartitionNames() == null) { insertStmt.setPartitionNotSpecifiedInOverwrite(true); } - insertStmt.setTargetPartitionNames(partitionNames); - insertStmt.setTargetPartitionIds(job.getTmpPartitionIds()); + if (!job.isDynamicOverwrite()) { + insertStmt.setTargetPartitionNames(partitionNames); + insertStmt.setTargetPartitionIds(job.getTmpPartitionIds()); + } insertStmt.setOverwrite(false); insertStmt.setSystem(true); diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/CreateInsertOverwriteJobLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/CreateInsertOverwriteJobLog.java index 373c98be983d3..eb3ce9a361b2c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/CreateInsertOverwriteJobLog.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/CreateInsertOverwriteJobLog.java @@ -37,7 +37,14 @@ public class CreateInsertOverwriteJobLog implements Writable { @SerializedName(value = "targetPartitionIds") private List targetPartitionIds; - public CreateInsertOverwriteJobLog(long jobId, long dbId, long tableId, List targetPartitionIds) { + @SerializedName(value = "dynamicOverwrite") + private boolean dynamicOverwrite = false; + + public CreateInsertOverwriteJobLog() { + } + + public CreateInsertOverwriteJobLog(long jobId, long dbId, long tableId, + List targetPartitionIds, boolean dynamicOverwrite) { this.jobId = jobId; this.dbId = dbId; this.tableId = tableId; @@ -60,6 +67,10 @@ public List getTargetPartitionIds() { return targetPartitionIds; } + public boolean isDynamicOverwrite() { + return dynamicOverwrite; + } + @Override public String toString() { return "CreateInsertOverwriteJobInfo{" + diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java b/fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java index ccd70356c4b8b..9fc26fe7139f8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/OlapTableSink.java @@ -143,6 +143,7 @@ public class OlapTableSink extends DataSink { private TPartialUpdateMode partialUpdateMode; private long warehouseId = WarehouseManager.DEFAULT_WAREHOUSE_ID; private long automaticBucketSize = 0; + private boolean enableDynamicOverwrite = false; public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds, TWriteQuorumType writeQuorum, boolean enableReplicatedStorage, @@ -229,6 +230,10 @@ public void setPartialUpdateMode(TPartialUpdateMode mode) { this.partialUpdateMode = mode; } + public void setDynamicOverwrite(boolean enableDynamicOverwrite) { + this.enableDynamicOverwrite = enableDynamicOverwrite; + } + public void complete(String mergeCondition) throws UserException { TOlapTableSink tSink = tDataSink.getOlap_table_sink(); if (mergeCondition != null && !mergeCondition.isEmpty()) { @@ -238,6 +243,10 @@ public void complete(String mergeCondition) throws UserException { } public List getOpenPartitions() { + if (enableAutomaticPartition && enableDynamicOverwrite) { + return new ArrayList<>(Collections.singletonList( + dstTable.getPartition(ExpressionRangePartitionInfo.AUTOMATIC_SHADOW_PARTITION_NAME).getId())); + } if (!enableAutomaticPartition || Config.max_load_initial_open_partition_number <= 0 || partitionIds.size() < Config.max_load_initial_open_partition_number) { return partitionIds; @@ -267,6 +276,9 @@ public void complete() throws UserException { long partitionId = optionalPartition.get().getId(); numReplicas = dstTable.getPartitionInfo().getReplicationNum(partitionId); } + if (enableAutomaticPartition && enableDynamicOverwrite) { + tSink.setDynamic_overwrite(true); + } tSink.setNum_replicas(numReplicas); tSink.setNeed_gen_rollup(dstTable.shouldLoadToNewRollup()); tSink.setSchema(createSchema(tSink.getDb_id(), dstTable, tupleDescriptor)); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 01e3c1e1f9d0c..af20dbc68a033 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -191,6 +191,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable { public static final String MAX_PARALLEL_SCAN_INSTANCE_NUM = "max_parallel_scan_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio"; + public static final String DYNAMIC_OVERWRITE = "dynamic_overwrite"; public static final String ENABLE_SPILL = "enable_spill"; public static final String ENABLE_SPILL_TO_REMOTE_STORAGE = "enable_spill_to_remote_storage"; public static final String DISABLE_SPILL_TO_LOCAL_DISK = "disable_spill_to_local_disk"; @@ -1156,6 +1157,9 @@ public static MaterializedViewRewriteMode parse(String str) { @VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT) private boolean enableInsertStrict = true; + @VariableMgr.VarAttr(name = DYNAMIC_OVERWRITE) + private boolean dynamicOverwrite = false; + @VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO) private double insertMaxFilterRatio = 0; @@ -2831,6 +2835,14 @@ public void setEnableInsertStrict(boolean enableInsertStrict) { this.enableInsertStrict = enableInsertStrict; } + public boolean isDynamicOverwrite() { + return dynamicOverwrite; + } + + public void setDynamicOverwrite(boolean dynamicOverwrite) { + this.dynamicOverwrite = dynamicOverwrite; + } + public double getInsertMaxFilterRatio() { return insertMaxFilterRatio; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index 550e122aefe05..01b383b301187 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -2083,14 +2083,16 @@ public void handleInsertOverwrite(InsertStmt insertStmt) throws Exception { } OlapTable olapTable = (OlapTable) insertStmt.getTargetTable(); InsertOverwriteJob job = new InsertOverwriteJob(GlobalStateMgr.getCurrentState().getNextId(), - insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId()); + insertStmt, db.getId(), olapTable.getId(), context.getCurrentWarehouseId(), + insertStmt.isDynamicOverwrite()); if (!locker.lockDatabaseAndCheckExist(db, LockType.WRITE)) { throw new DmlException("database:%s does not exist.", db.getFullName()); } try { // add an edit log CreateInsertOverwriteJobLog info = new CreateInsertOverwriteJobLog(job.getJobId(), - job.getTargetDbId(), job.getTargetTableId(), job.getSourcePartitionIds()); + job.getTargetDbId(), job.getTargetTableId(), job.getSourcePartitionIds(), + job.isDynamicOverwrite()); GlobalStateMgr.getCurrentState().getEditLog().logCreateInsertOverwrite(info); } finally { locker.unLockDatabase(db.getId(), LockType.WRITE); diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 2a404ffbad143..ae205cef4d291 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -2197,6 +2197,12 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq long tableId = request.getTable_id(); TCreatePartitionResult result = new TCreatePartitionResult(); TStatus errorStatus = new TStatus(RUNTIME_ERROR); + String partitionNamePrefix = null; + boolean isTemp = false; + if (request.isSetIs_temp() && request.isIs_temp()) { + isTemp = true; + partitionNamePrefix = "txn" + request.getTxn_id(); + } Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId); if (db == null) { @@ -2237,7 +2243,7 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq try (AutoCloseableLock ignore = new AutoCloseableLock(new Locker(), db.getId(), Lists.newArrayList(table.getId()), LockType.READ)) { addPartitionClause = AnalyzerUtils.getAddPartitionClauseFromPartitionValues(olapTable, - request.partition_values); + request.partition_values, isTemp, partitionNamePrefix); PartitionDesc partitionDesc = addPartitionClause.getPartitionDesc(); if (partitionDesc instanceof RangePartitionDesc) { partitionColNames = ((RangePartitionDesc) partitionDesc).getPartitionColNames(); @@ -2346,7 +2352,8 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq Locker locker = new Locker(); locker.lockDatabase(db.getId(), LockType.READ); try { - return buildCreatePartitionResponse(olapTable, txnState, partitions, tablets, partitionColNames); + return buildCreatePartitionResponse( + olapTable, txnState, partitions, tablets, partitionColNames, isTemp); } finally { locker.unLockDatabase(db.getId(), LockType.READ); } @@ -2357,7 +2364,8 @@ private static TCreatePartitionResult buildCreatePartitionResponse(OlapTable ola TransactionState txnState, List partitions, List tablets, - List partitionColNames) { + List partitionColNames, + boolean isTemp) { TCreatePartitionResult result = new TCreatePartitionResult(); TStatus errorStatus = new TStatus(RUNTIME_ERROR); for (String partitionName : partitionColNames) { @@ -2376,7 +2384,7 @@ private static TCreatePartitionResult buildCreatePartitionResponse(OlapTable ola continue; } - Partition partition = olapTable.getPartition(partitionName); + Partition partition = olapTable.getPartition(partitionName, isTemp); tPartition = new TOlapTablePartition(); tPartition.setId(partition.getId()); buildPartitionInfo(olapTable, partitions, partition, tPartition, txnState); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java index e7479695661c0..8b8df97436bab 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java @@ -345,6 +345,9 @@ public ExecPlan plan(InsertStmt insertStmt, ConnectContext session) { if (insertStmt.isSystem() && insertStmt.isPartitionNotSpecifiedInOverwrite()) { Preconditions.checkState(!CollectionUtils.isEmpty(targetPartitionIds)); enableAutomaticPartition = olapTable.supportedAutomaticPartition(); + } else if (insertStmt.isDynamicOverwrite()) { + Preconditions.checkState(CollectionUtils.isEmpty(targetPartitionIds)); + enableAutomaticPartition = olapTable.supportedAutomaticPartition(); } else if (insertStmt.isSpecifyPartitionNames()) { Preconditions.checkState(!CollectionUtils.isEmpty(targetPartitionIds)); enableAutomaticPartition = false; @@ -385,6 +388,9 @@ public ExecPlan plan(InsertStmt insertStmt, ConnectContext session) { if (olapTable.getAutomaticBucketSize() > 0) { ((OlapTableSink) dataSink).setAutomaticBucketSize(olapTable.getAutomaticBucketSize()); } + if (insertStmt.isDynamicOverwrite()) { + ((OlapTableSink) dataSink).setDynamicOverwrite(true); + } // if sink is OlapTableSink Assigned to Be execute this sql [cn execute OlapTableSink will crash] session.getSessionVariable().setPreferComputeNode(false); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AnalyzerUtils.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AnalyzerUtils.java index c79c48509aaab..5074617228acf 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AnalyzerUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AnalyzerUtils.java @@ -154,6 +154,10 @@ public class AnalyzerUtils { public static final Set MV_DATE_TRUNC_SUPPORTED_PARTITION_FORMAT = ImmutableSet.of("hour", "day", "week", "month", "year"); + public static final String DEFAULT_PARTITION_NAME_PREFIX = "p"; + + public static final String PARTITION_NAME_PREFIX_SPLIT = "_"; + public static String getOrDefaultDatabase(String dbName, ConnectContext context) { if (Strings.isNullOrEmpty(dbName)) { dbName = context.getDatabase(); @@ -1308,13 +1312,15 @@ public static PartitionMeasure checkAndGetPartitionMeasure( } public static AddPartitionClause getAddPartitionClauseFromPartitionValues(OlapTable olapTable, - List> partitionValues) + List> partitionValues, + boolean isTemp, + String partitionNamePrefix) throws AnalysisException { PartitionInfo partitionInfo = olapTable.getPartitionInfo(); if (partitionInfo instanceof ExpressionRangePartitionInfo) { PartitionMeasure measure = checkAndGetPartitionMeasure(olapTable.getIdToColumn(), (ExpressionRangePartitionInfo) partitionInfo); - return getAddPartitionClauseForRangePartition(olapTable, partitionValues, measure, + return getAddPartitionClauseForRangePartition(olapTable, partitionValues, isTemp, partitionNamePrefix, measure, (ExpressionRangePartitionInfo) partitionInfo); } else if (partitionInfo instanceof ListPartitionInfo) { Short replicationNum = olapTable.getTableProperty().getReplicationNum(); @@ -1322,7 +1328,6 @@ public static AddPartitionClause getAddPartitionClauseFromPartitionValues(OlapTa .toDistributionDesc(olapTable.getIdToColumn()); Map partitionProperties = ImmutableMap.of("replication_num", String.valueOf(replicationNum)); - String partitionPrefix = "p"; List partitionColNames = Lists.newArrayList(); List partitionDescs = Lists.newArrayList(); @@ -1332,11 +1337,17 @@ public static AddPartitionClause getAddPartitionClauseFromPartitionValues(OlapTa String formatValue = getFormatPartitionValue(value); formattedPartitionValue.add(formatValue); } - String partitionName = partitionPrefix + Joiner.on("_").join(formattedPartitionValue); + String partitionName = DEFAULT_PARTITION_NAME_PREFIX + Joiner.on("_").join(formattedPartitionValue); if (partitionName.length() > FeConstants.MAX_LIST_PARTITION_NAME_LENGTH) { partitionName = partitionName.substring(0, FeConstants.MAX_LIST_PARTITION_NAME_LENGTH) + "_" + Integer.toHexString(partitionName.hashCode()); } + if (partitionNamePrefix != null) { + if (partitionNamePrefix.contains(PARTITION_NAME_PREFIX_SPLIT)) { + throw new AnalysisException("partition name prefix can not contain " + PARTITION_NAME_PREFIX_SPLIT); + } + partitionName = partitionNamePrefix + PARTITION_NAME_PREFIX_SPLIT + partitionName; + } if (!partitionColNames.contains(partitionName)) { MultiItemListPartitionDesc multiItemListPartitionDesc = new MultiItemListPartitionDesc(true, partitionName, Collections.singletonList(partitionValue), partitionProperties); @@ -1348,7 +1359,7 @@ public static AddPartitionClause getAddPartitionClauseFromPartitionValues(OlapTa ListPartitionDesc listPartitionDesc = new ListPartitionDesc(partitionColNames, partitionDescs); listPartitionDesc.setSystem(true); return new AddPartitionClause(listPartitionDesc, distributionDesc, - partitionProperties, false); + partitionProperties, isTemp); } else { throw new AnalysisException("automatic partition only support partition by value."); } @@ -1379,13 +1390,17 @@ public static String getFormatPartitionValue(String value) { private static AddPartitionClause getAddPartitionClauseForRangePartition( OlapTable olapTable, List> partitionValues, + boolean isTemp, + String partitionPrefix, PartitionMeasure measure, ExpressionRangePartitionInfo expressionRangePartitionInfo) throws AnalysisException { String granularity = measure.getGranularity(); long interval = measure.getInterval(); Type firstPartitionColumnType = expressionRangePartitionInfo.getPartitionColumns(olapTable.getIdToColumn()) .get(0).getType(); - String partitionPrefix = "p"; + if (partitionPrefix == null) { + partitionPrefix = DEFAULT_PARTITION_NAME_PREFIX; + } Short replicationNum = olapTable.getTableProperty().getReplicationNum(); DistributionDesc distributionDesc = olapTable.getDefaultDistributionInfo() .toDistributionDesc(olapTable.getIdToColumn()); @@ -1450,7 +1465,7 @@ private static AddPartitionClause getAddPartitionClauseForRangePartition( } RangePartitionDesc rangePartitionDesc = new RangePartitionDesc(partitionColNames, partitionDescs); rangePartitionDesc.setSystem(true); - return new AddPartitionClause(rangePartitionDesc, distributionDesc, partitionProperties, false); + return new AddPartitionClause(rangePartitionDesc, distributionDesc, partitionProperties, isTemp); } private static PartitionKeyDesc createPartitionKeyDesc(Type partitionType, LocalDateTime beginTime, diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java index 092b700885105..2fa0e60347961 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java @@ -57,6 +57,8 @@ import com.starrocks.sql.ast.ValuesRelation; import com.starrocks.sql.common.MetaUtils; import org.apache.iceberg.SnapshotRef; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -71,6 +73,7 @@ import static com.starrocks.sql.common.UnsupportedException.unsupportedException; public class InsertAnalyzer { + private static final Logger LOG = LogManager.getLogger(InsertAnalyzer.class); private static final ImmutableSet PUSH_DOWN_PROPERTIES_SET = new ImmutableSet.Builder() .add(LoadStmt.STRICT_MODE) .build(); @@ -160,13 +163,17 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext } else if (insertStmt.isStaticKeyPartitionInsert()) { checkStaticKeyPartitionInsert(insertStmt, table, targetPartitionNames); } else { - for (Partition partition : olapTable.getPartitions()) { - targetPartitionIds.add(partition.getId()); - } - if (targetPartitionIds.isEmpty()) { - throw new SemanticException("data cannot be inserted into table with empty partition." + - "Use `SHOW PARTITIONS FROM %s` to see the currently partitions of this table. ", - olapTable.getName()); + if ((insertStmt.isOverwrite() && session.getSessionVariable().isDynamicOverwrite())) { + insertStmt.setIsDynamicOverwrite(true); + } else { + for (Partition partition : olapTable.getPartitions()) { + targetPartitionIds.add(partition.getId()); + } + if (targetPartitionIds.isEmpty()) { + throw new SemanticException("data cannot be inserted into table with empty partition." + + "Use `SHOW PARTITIONS FROM %s` to see the currently partitions of this table. ", + olapTable.getName()); + } } } insertStmt.setTargetPartitionIds(targetPartitionIds); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/InsertStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/InsertStmt.java index ed2560daee038..1c6fd3ed65368 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/InsertStmt.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/InsertStmt.java @@ -101,6 +101,9 @@ public class InsertStmt extends DmlStmt { // column match by position or name private ColumnMatchPolicy columnMatchPolicy = ColumnMatchPolicy.POSITION; + // create partition if not exists + private boolean isDynamicOverwrite = false; + public InsertStmt(TableName tblName, PartitionNames targetPartitionNames, String label, List cols, QueryStatement queryStatement, boolean isOverwrite, Map insertProperties, NodePosition pos) { @@ -187,6 +190,14 @@ public boolean isVersionOverwrite() { return isVersionOverwrite; } + public void setIsDynamicOverwrite(boolean isDynamicOverwrite) { + this.isDynamicOverwrite = isDynamicOverwrite; + } + + public boolean isDynamicOverwrite() { + return isDynamicOverwrite; + } + public QueryStatement getQueryStatement() { return queryStatement; } diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java index 5b91fbb335034..b56fc2b7989b9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java @@ -77,6 +77,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; @@ -339,6 +340,8 @@ public String toString() { private ConcurrentMap partitionNameToTPartition = Maps.newConcurrentMap(); private ConcurrentMap tabletIdToTTabletLocation = Maps.newConcurrentMap(); + private List createdPartitionNames = Lists.newArrayList(); + private final ReentrantReadWriteLock txnLock = new ReentrantReadWriteLock(true); public void writeLock() { @@ -1035,7 +1038,12 @@ public ConcurrentMap getTabletIdToTTabletLocation() { return tabletIdToTTabletLocation; } + public List getCreatedPartitionNames() { + return createdPartitionNames; + } + public void clearAutomaticPartitionSnapshot() { + createdPartitionNames = partitionNameToTPartition.keySet().stream().collect(Collectors.toList()); partitionNameToTPartition.clear(); tabletIdToTTabletLocation.clear(); } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java index c84e4eabe08bd..5563704101b33 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java @@ -75,7 +75,7 @@ public void setUp() { @Test public void testBasic() throws Exception { - InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(1100L, 100L, 110L, targetPartitionIds); + InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(1100L, 100L, 110L, targetPartitionIds, false); insertOverwriteJobManager.registerOverwriteJob(insertOverwriteJob); Assert.assertEquals(1, insertOverwriteJobManager.getJobNum()); @@ -114,7 +114,7 @@ public void testReplay() throws Exception { }; CreateInsertOverwriteJobLog jobInfo = new CreateInsertOverwriteJobLog( - 1100L, 100L, 110L, targetPartitionIds); + 1100L, 100L, 110L, targetPartitionIds, false); Assert.assertEquals(1100L, jobInfo.getJobId()); Assert.assertEquals(100L, jobInfo.getDbId()); Assert.assertEquals(110L, jobInfo.getTableId()); @@ -143,7 +143,7 @@ public void testReplay() throws Exception { @Test public void testSerialization() throws IOException { - InsertOverwriteJob insertOverwriteJob1 = new InsertOverwriteJob(1000L, 100L, 110L, targetPartitionIds); + InsertOverwriteJob insertOverwriteJob1 = new InsertOverwriteJob(1000L, 100L, 110L, targetPartitionIds, false); insertOverwriteJobManager.registerOverwriteJob(insertOverwriteJob1); Assert.assertEquals(1, insertOverwriteJobManager.getJobNum()); diff --git a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java index d57f47c65f1b7..d46e466d00e33 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java @@ -96,13 +96,13 @@ public void testReplayInsertOverwrite() { Assert.assertTrue(table instanceof OlapTable); OlapTable olapTable = (OlapTable) table; InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(100L, database.getId(), olapTable.getId(), - Lists.newArrayList(olapTable.getPartition("t1").getId())); + Lists.newArrayList(olapTable.getPartition("t1").getId()), false); InsertOverwriteJobRunner runner = new InsertOverwriteJobRunner(insertOverwriteJob); runner.cancel(); Assert.assertEquals(InsertOverwriteJobState.OVERWRITE_FAILED, insertOverwriteJob.getJobState()); InsertOverwriteJob insertOverwriteJob2 = new InsertOverwriteJob(100L, database.getId(), olapTable.getId(), - Lists.newArrayList(olapTable.getPartition("t1").getId())); + Lists.newArrayList(olapTable.getPartition("t1").getId()), false); InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(100L, InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING, Lists.newArrayList(2000L), null, Lists.newArrayList(2001L)); @@ -136,7 +136,7 @@ public void testInsertOverwrite() throws Exception { Assert.assertTrue(table instanceof OlapTable); OlapTable olapTable = (OlapTable) table; InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(100L, insertStmt, database.getId(), olapTable.getId(), - WarehouseManager.DEFAULT_WAREHOUSE_ID); + WarehouseManager.DEFAULT_WAREHOUSE_ID, false); InsertOverwriteJobRunner runner = new InsertOverwriteJobRunner(insertOverwriteJob, connectContext, executor); Assert.assertFalse(runner.isFinished()); } @@ -155,7 +155,7 @@ public void testInsertOverwriteConcurrencyWithSamePartitions() throws Exception Assert.assertTrue(table instanceof OlapTable); OlapTable olapTable = (OlapTable) table; InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(100L, database.getId(), olapTable.getId(), - Lists.newArrayList(olapTable.getPartition("t1").getId())); + Lists.newArrayList(olapTable.getPartition("t1").getId()), false); InsertOverwriteJobRunner runner = new InsertOverwriteJobRunner(insertOverwriteJob); connectContext.getSessionVariable().setOptimizerExecuteTimeout(300000000); diff --git a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobTest.java b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobTest.java index f254260d74ff0..410e963a32ef8 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobTest.java @@ -35,7 +35,7 @@ public void testBasic(@Injectable InsertStmt insertStmt) { } }; InsertOverwriteJob insertOverwriteJob1 = new InsertOverwriteJob(100L, insertStmt, 110L, 120L, - WarehouseManager.DEFAULT_WAREHOUSE_ID); + WarehouseManager.DEFAULT_WAREHOUSE_ID, false); Assert.assertEquals(100L, insertOverwriteJob1.getJobId()); Assert.assertEquals(110L, insertOverwriteJob1.getTargetDbId()); Assert.assertEquals(120L, insertOverwriteJob1.getTargetTableId()); @@ -46,7 +46,7 @@ public void testBasic(@Injectable InsertStmt insertStmt) { Assert.assertTrue(insertOverwriteJob1.isFinished()); List targetPartitionIds = Lists.newArrayList(10L, 20L, 30L); - InsertOverwriteJob insertOverwriteJob2 = new InsertOverwriteJob(100L, 110L, 120L, targetPartitionIds); + InsertOverwriteJob insertOverwriteJob2 = new InsertOverwriteJob(100L, 110L, 120L, targetPartitionIds, false); Assert.assertEquals(100L, insertOverwriteJob2.getJobId()); Assert.assertEquals(110L, insertOverwriteJob2.getTargetDbId()); Assert.assertEquals(120L, insertOverwriteJob2.getTargetTableId()); diff --git a/fe/fe-core/src/test/java/com/starrocks/persist/CreateInsertOverwriteJobLogTest.java b/fe/fe-core/src/test/java/com/starrocks/persist/CreateInsertOverwriteJobLogTest.java index c430592406dc0..5900aa54eee22 100644 --- a/fe/fe-core/src/test/java/com/starrocks/persist/CreateInsertOverwriteJobLogTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/persist/CreateInsertOverwriteJobLogTest.java @@ -33,7 +33,7 @@ public class CreateInsertOverwriteJobLogTest { @Test public void testBasic() throws IOException { List targetPartitionIds = Lists.newArrayList(10L, 20L); - CreateInsertOverwriteJobLog jobInfo = new CreateInsertOverwriteJobLog(100L, 101L, 102L, targetPartitionIds); + CreateInsertOverwriteJobLog jobInfo = new CreateInsertOverwriteJobLog(100L, 101L, 102L, targetPartitionIds, false); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(outputStream); jobInfo.write(dataOutputStream); diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 3beafa2840483..d3f41e842ce1b 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -233,6 +233,7 @@ struct TOlapTableSink { 29: optional bool write_txn_log 30: optional bool ignore_out_of_partition 31: optional binary encryption_meta; + 32: optional bool dynamic_overwrite } struct TSchemaTableSink { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 07f3f47aed733..537bddc3cce3b 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1399,6 +1399,7 @@ struct TCreatePartitionRequest { 3: optional i64 table_id // for each partition column's partition values 4: optional list> partition_values + 5: optional bool is_temp } struct TCreatePartitionResult { diff --git a/test/sql/test_dynamic_overwrite/R/test_overwrite b/test/sql/test_dynamic_overwrite/R/test_overwrite new file mode 100644 index 0000000000000..c2f40b411f22c --- /dev/null +++ b/test/sql/test_dynamic_overwrite/R/test_overwrite @@ -0,0 +1,47 @@ +-- name: test_variable +show variables like '%dynamic_overwrite%'; +-- result: +dynamic_overwrite false +-- !result +set dynamic_overwrite=true; +-- result: +-- !result +show variables like '%dynamic_overwrite%'; +-- result: +dynamic_overwrite true +-- !result +-- name: test_dynamic_overwrite +create table t(k int, v int) partition by (k); +-- result: +-- !result +insert into t values(1,1); +-- result: +-- !result +insert into t values(2,1); +-- result: +-- !result +select * from t; +-- result: +2 1 +1 1 +-- !result +insert overwrite t values(2,2),(3,1); +-- result: +-- !result +select * from t; +-- result: +3 1 +2 2 +-- !result +set dynamic_overwrite=true; +-- result: +-- !result +insert overwrite t values(4,1),(3,2); +-- result: +-- !result +select * from t; +-- result: +4 1 +2 2 +3 2 +-- !result \ No newline at end of file diff --git a/test/sql/test_dynamic_overwrite/T/test_overwrite b/test/sql/test_dynamic_overwrite/T/test_overwrite new file mode 100644 index 0000000000000..443eb553e8a8a --- /dev/null +++ b/test/sql/test_dynamic_overwrite/T/test_overwrite @@ -0,0 +1,15 @@ +-- name: test_variable +show variables like '%dynamic_overwrite%'; +set dynamic_overwrite=true; +show variables like '%dynamic_overwrite%'; + +-- name: test_dynamic_overwrite +create table t(k int, v int) partition by (k); +insert into t values(1,1); +insert into t values(2,1); +select * from t; +insert overwrite t values(2,2),(3,1); +select * from t; +set dynamic_overwrite=true; +insert overwrite t values(4,1),(3,2); +select * from t;