diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 44562aa8bbb6..34399998d774 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -594,6 +594,13 @@ private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan pla return; // pipe consensus pipe task will not change } + if (pipeMeta.getStaticMeta().isSourceExternal()) { + // external source pipe tasks are not balanced here since non-leaders + // don't know about RegionLeader Map and will be balanced in the meta + // sync procedure + return; + } + final Map consensusGroupIdToTaskMetaMap = pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index 0e5ba5bf9dd1..d8b03991a4ed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -520,6 +520,10 @@ public static PipeMeta copyAndFilterOutNonWorkingDataRegionPipeTasks(PipeMeta or .entrySet() .removeIf( consensusGroupId2TaskMeta -> { + if (originalPipeMeta.getStaticMeta().isSourceExternal()) { + // should keep the external source tasks + return false; + } final String database; try { database = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java index c8a734a9e35a..e44eeeb1ae25 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java @@ -20,15 +20,20 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.runtime; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan; import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; +import org.apache.iotdb.confignode.procedure.impl.pipe.util.ExternalLoadBalancer; import org.apache.iotdb.confignode.procedure.state.ProcedureLockState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.confignode.service.ConfigNode; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -40,11 +45,16 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTERNAL_EXTRACTOR_PARALLELISM_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTERNAL_EXTRACTOR_PARALLELISM_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTERNAL_SOURCE_PARALLELISM_KEY; + public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 { private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncProcedure.class); @@ -100,8 +110,53 @@ public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) { @Override public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { LOGGER.info("PipeMetaSyncProcedure: executeFromCalculateInfoForTask"); - - // Do nothing + // Re-balance the external source tasks here in case of any changes in the dataRegion + pipeTaskInfo + .get() + .getPipeMetaList() + .forEach( + pipeMeta -> { + if (pipeMeta.getStaticMeta().isSourceExternal()) { + final ExternalLoadBalancer loadBalancer = + new ExternalLoadBalancer( + pipeMeta + .getStaticMeta() + .getExtractorParameters() + .getStringOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY, + PipeExtractorConstant.EXTERNAL_SOURCE_BALANCE_STRATEGY_KEY), + PipeExtractorConstant + .EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY)); + + final int parallelism = + pipeMeta + .getStaticMeta() + .getExtractorParameters() + .getIntOrDefault( + Arrays.asList( + EXTERNAL_EXTRACTOR_PARALLELISM_KEY, + EXTERNAL_SOURCE_PARALLELISM_KEY), + EXTERNAL_EXTRACTOR_PARALLELISM_DEFAULT_VALUE); + final Map consensusGroupIdToTaskMetaMap = + pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap(); + loadBalancer + .balance( + parallelism, + pipeMeta.getStaticMeta(), + ConfigNode.getInstance().getConfigManager()) + .forEach( + (taskIndex, newLeader) -> { + if (consensusGroupIdToTaskMetaMap.containsKey(taskIndex)) { + consensusGroupIdToTaskMetaMap.get(taskIndex).setLeaderNodeId(newLeader); + } else { + consensusGroupIdToTaskMetaMap.put( + taskIndex, + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader)); + } + }); + } + }); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java index 8b59759e38bb..0c1386d0dda4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java @@ -162,40 +162,50 @@ public void executeFromCalculateInfoForTask(final ConfigNodeProcedureEnv env) { final ConcurrentMap updatedConsensusGroupIdToTaskMetaMap = new ConcurrentHashMap<>(); - // data regions & schema regions - env.getConfigManager() - .getLoadManager() - .getRegionLeaderMap() - .forEach( - (regionGroupId, regionLeaderNodeId) -> { - final String databaseName = - env.getConfigManager().getPartitionManager().getRegionDatabase(regionGroupId); - final PipeTaskMeta currentPipeTaskMeta = - currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId()); - if (databaseName != null - && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE) - && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".") - && currentPipeTaskMeta != null - && currentPipeTaskMeta.getLeaderNodeId() == regionLeaderNodeId) { - // Pipe only collect user's data, filter metric database here. - updatedConsensusGroupIdToTaskMetaMap.put( - regionGroupId.getId(), - new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), regionLeaderNodeId)); - } - }); - - final PipeTaskMeta configRegionTaskMeta = - currentConsensusGroupId2PipeTaskMeta.get(Integer.MIN_VALUE); - if (Objects.nonNull(configRegionTaskMeta)) { - // config region - updatedConsensusGroupIdToTaskMetaMap.put( - // 0 is the consensus group id of the config region, but data region id and schema region - // id also start from 0, so we use Integer.MIN_VALUE to represent the config region - Integer.MIN_VALUE, - new PipeTaskMeta( - configRegionTaskMeta.getProgressIndex(), - // The leader of the config region is the config node itself - ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId())); + if (currentPipeStaticMeta.isSourceExternal()) { + currentConsensusGroupId2PipeTaskMeta.forEach( + (regionGroupId, pipeTaskMeta) -> { + updatedConsensusGroupIdToTaskMetaMap.put( + regionGroupId, + new PipeTaskMeta(pipeTaskMeta.getProgressIndex(), pipeTaskMeta.getLeaderNodeId())); + }); + } else { + // data regions & schema regions + env.getConfigManager() + .getLoadManager() + .getRegionLeaderMap() + .forEach( + (regionGroupId, regionLeaderNodeId) -> { + final String databaseName = + env.getConfigManager().getPartitionManager().getRegionDatabase(regionGroupId); + final PipeTaskMeta currentPipeTaskMeta = + currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId()); + if (databaseName != null + && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE) + && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".") + && currentPipeTaskMeta != null + && currentPipeTaskMeta.getLeaderNodeId() == regionLeaderNodeId) { + // Pipe only collect user's data, filter metric database here. + updatedConsensusGroupIdToTaskMetaMap.put( + regionGroupId.getId(), + new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), regionLeaderNodeId)); + } + }); + + final PipeTaskMeta configRegionTaskMeta = + currentConsensusGroupId2PipeTaskMeta.get(Integer.MIN_VALUE); + if (Objects.nonNull(configRegionTaskMeta)) { + // config region + updatedConsensusGroupIdToTaskMetaMap.put( + // 0 is the consensus group id of the config region, but data region id and schema + // region + // id also start from 0, so we use Integer.MIN_VALUE to represent the config region + Integer.MIN_VALUE, + new PipeTaskMeta( + configRegionTaskMeta.getProgressIndex(), + // The leader of the config region is the config node itself + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId())); + } } updatedPipeRuntimeMeta = new PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index 5b4dcb0e1c12..3f8c1055b96b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; +import org.apache.iotdb.confignode.procedure.impl.pipe.util.ExternalLoadBalancer; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.consensus.exception.ConsensusException; @@ -66,6 +67,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTERNAL_EXTRACTOR_PARALLELISM_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTERNAL_EXTRACTOR_PARALLELISM_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTERNAL_SOURCE_PARALLELISM_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY; @@ -269,6 +273,31 @@ public void executeFromCalculateInfoForTask(final ConfigNodeProcedureEnv env) { new PipeTaskMeta( new RecoverProgressIndex(senderDataNodeId, new SimpleProgressIndex(0, 0)), senderDataNodeId)); + } else if (pipeStaticMeta.isSourceExternal()) { + // external source + final ExternalLoadBalancer loadBalancer = + new ExternalLoadBalancer( + pipeStaticMeta + .getExtractorParameters() + .getStringOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY, + PipeExtractorConstant.EXTERNAL_SOURCE_BALANCE_STRATEGY_KEY), + PipeExtractorConstant.EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY)); + final int parallelism = + pipeStaticMeta + .getExtractorParameters() + .getIntOrDefault( + Arrays.asList( + EXTERNAL_EXTRACTOR_PARALLELISM_KEY, EXTERNAL_SOURCE_PARALLELISM_KEY), + EXTERNAL_EXTRACTOR_PARALLELISM_DEFAULT_VALUE); + loadBalancer + .balance(parallelism, pipeStaticMeta, env.getConfigManager()) + .forEach( + (taskIndex, leaderNodeId) -> { + consensusGroupIdToTaskMetaMap.put( + taskIndex, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, leaderNodeId)); + }); } else { // data regions & schema regions env.getConfigManager() diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/ExternalLoadBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/ExternalLoadBalancer.java new file mode 100644 index 000000000000..6eaffd935c61 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/ExternalLoadBalancer.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.pipe.util; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; +import org.apache.iotdb.confignode.manager.ConfigManager; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * The ExternalLoadBalancer is responsible for assigning parallel extraction tasks from an external + * source to available DataNodes in the cluster. + */ +public class ExternalLoadBalancer { + private final BalanceStrategy strategy; + + public ExternalLoadBalancer(final String balanceStrategy) { + switch (balanceStrategy) { + case PipeExtractorConstant.EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY: + this.strategy = new ProportionalBalanceStrategy(); + break; + default: + throw new IllegalArgumentException("Unknown load balance strategy: " + balanceStrategy); + } + } + + /** + * Balances the given number of parallel tasks across available nodes. + * + * @param parallelCount number of external source tasks to distribute + * @param pipeStaticMeta metadata about the pipe extractor + * @param configManager reference to ConfigManager for cluster information + * @return a mapping from task index to leader node id + */ + public Map balance( + final int parallelCount, + final PipeStaticMeta pipeStaticMeta, + final ConfigManager configManager) { + return strategy.balance(parallelCount, pipeStaticMeta, configManager); + } + + public interface BalanceStrategy { + Map balance( + final int parallelCount, + final PipeStaticMeta pipeStaticMeta, + final ConfigManager configManager); + } + + public static class ProportionalBalanceStrategy implements BalanceStrategy { + @Override + public Map balance( + final int parallelCount, + final PipeStaticMeta pipeStaticMeta, + final ConfigManager configManager) { + final Map regionLeaderMap = + configManager.getLoadManager().getRegionLeaderMap(); + final Map parallelAssignment = new HashMap<>(); + + // Check if the external extractor is single instance per node + if (pipeStaticMeta + .getExtractorParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY, + PipeExtractorConstant.EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY), + PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE)) { + final List runningDataNodes = + configManager.getLoadManager().filterDataNodeThroughStatus(NodeStatus.Running).stream() + .sorted() + .collect(Collectors.toList()); + if (runningDataNodes.isEmpty()) { + throw new RuntimeException("No available datanode to assign tasks"); + } + final int numNodes = runningDataNodes.size(); + for (int i = 1; i <= Math.min(numNodes, parallelCount); i++) { + final int datanodeId = runningDataNodes.get(i - 1); + parallelAssignment.put(-i, datanodeId); + } + return parallelAssignment; + } + + // Count how many DataRegions each DataNode leads + final Map leaderRegionId2DataRegionCountMap = new HashMap<>(); + regionLeaderMap.entrySet().stream() + .filter(e -> e.getKey().getType() == TConsensusGroupType.DataRegion && e.getValue() != -1) + .forEach( + e -> { + final int leaderId = e.getValue(); + leaderRegionId2DataRegionCountMap.put( + leaderId, leaderRegionId2DataRegionCountMap.getOrDefault(leaderId, 0) + 1); + }); + + // distribute evenly if no dataRegion exists + if (leaderRegionId2DataRegionCountMap.isEmpty()) { + List runningDataNodes = + configManager.getLoadManager().filterDataNodeThroughStatus(NodeStatus.Running).stream() + .sorted() + .collect(Collectors.toList()); + if (runningDataNodes.isEmpty()) { + throw new RuntimeException("No available datanode to assign tasks"); + } + final int numNodes = runningDataNodes.size(); + final int quotient = parallelCount / numNodes; + final int remainder = parallelCount % numNodes; + int taskIndex = 1; + for (int i = 0; i < numNodes; i++) { + int tasksForNode = quotient + (i < remainder ? 1 : 0); + int datanodeId = runningDataNodes.get(i); + for (int j = 0; j < tasksForNode; j++) { + parallelAssignment.put(-taskIndex, datanodeId); + taskIndex++; + } + } + return parallelAssignment; + } + + final int totalRegions = + leaderRegionId2DataRegionCountMap.values().stream().mapToInt(Integer::intValue).sum(); + + // Calculate exact and floor share of each leader + final Map leaderRegionId2ExactShareMap = new HashMap<>(); + final Map leaderRegionId2AssignedCountMap = new HashMap<>(); + for (Map.Entry entry : leaderRegionId2DataRegionCountMap.entrySet()) { + final double share = (parallelCount * entry.getValue()) / (double) totalRegions; + leaderRegionId2ExactShareMap.put(entry.getKey(), share); + leaderRegionId2AssignedCountMap.put(entry.getKey(), (int) Math.floor(share)); + } + + // Distribute remainder tasks based on largest fractional parts + final int remainder = + parallelCount + - leaderRegionId2AssignedCountMap.values().stream().mapToInt(Integer::intValue).sum(); + + final List sortedLeaders = + leaderRegionId2ExactShareMap.keySet().stream() + .sorted( + (l1, l2) -> { + final double diff = + (leaderRegionId2ExactShareMap.get(l2) + - Math.floor(leaderRegionId2ExactShareMap.get(l2))) + - (leaderRegionId2ExactShareMap.get(l1) + - Math.floor(leaderRegionId2ExactShareMap.get(l1))); + return diff > 0 ? 1 : (diff < 0 ? -1 : Integer.compare(l1, l2)); + }) + .collect(Collectors.toList()); + for (int i = 0; i < remainder; i++) { + final int leaderId = sortedLeaders.get(i % sortedLeaders.size()); + leaderRegionId2AssignedCountMap.put( + leaderId, leaderRegionId2AssignedCountMap.get(leaderId) + 1); + } + + final List stableLeaders = new ArrayList<>(leaderRegionId2AssignedCountMap.keySet()); + Collections.sort(stableLeaders); + int taskIndex = 1; + for (final Integer leader : stableLeaders) { + final int count = leaderRegionId2AssignedCountMap.get(leader); + for (int i = 0; i < count; i++) { + parallelAssignment.put(-taskIndex, leader); + taskIndex++; + } + } + return parallelAssignment; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java index 088d19966b47..24efdd7b63ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java @@ -215,4 +215,16 @@ public void validate( schemaRegionAgent.validate( pipeName, extractorAttributes, processorAttributes, connectorAttributes); } + + public boolean checkIfPluginSameType(final String newPluginName, final String oldPluginName) { + PipePluginMeta newPipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(newPluginName); + PipePluginMeta oldPipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(oldPluginName); + if (newPipePluginMeta == null) { + throw new PipeException(String.format("plugin %s is not registered.", newPluginName)); + } + if (oldPipePluginMeta == null) { + throw new PipeException(String.format("plugin %s is not registered.", oldPluginName)); + } + return newPipePluginMeta.getClassName().equals(oldPipePluginMeta.getClassName()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java index ddf6cdb6b8b8..2fbc9935a25b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionExtractorConstructor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeExtractorConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; +import org.apache.iotdb.db.pipe.extractor.mqtt.MQTTExtractor; class PipeDataRegionExtractorConstructor extends PipeExtractorConstructor { @@ -42,5 +43,9 @@ protected void initConstructors() { BuiltinPipePlugin.DO_NOTHING_SOURCE.getPipePluginName(), DoNothingExtractor::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName(), IoTDBDataRegionExtractor::new); + + pluginConstructors.put( + BuiltinPipePlugin.MQTT_EXTRACTOR.getPipePluginName(), MQTTExtractor::new); + pluginConstructors.put(BuiltinPipePlugin.MQTT_SOURCE.getPipePluginName(), MQTTExtractor::new); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 7e2191f8293b..5d2d0e62e02f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -118,6 +118,9 @@ protected boolean isShutdown() { @Override protected Map buildPipeTasks(final PipeMeta pipeMetaFromConfigNode) throws IllegalPathException { + if (pipeMetaFromConfigNode.getStaticMeta().isSourceExternal()) { + return new PipeDataNodeBuilder(pipeMetaFromConfigNode).buildExternalPipeTasks(); + } return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build(); } @@ -192,7 +195,8 @@ protected void createPipeTask( .isEmpty(); // Advance the extractor parameters parsing logic to avoid creating un-relevant pipeTasks - if (needConstructDataRegionTask || needConstructSchemaRegionTask) { + // consensusGroupId < 0 means an external source task, should create it + if (needConstructDataRegionTask || needConstructSchemaRegionTask || consensusGroupId < 0) { final PipeDataNodeTask pipeTask = new PipeDataNodeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build(); pipeTask.create(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java index 3d639215258b..f87c878db8b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java @@ -84,4 +84,22 @@ public Map build() throws IllegalPathException { } return consensusGroupIdToPipeTaskMap; } + + public Map buildExternalPipeTasks() { + final Map taskIdToPipeTaskMap = new HashMap<>(); + final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta(); + final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta(); + + for (Map.Entry taskIdToPipeTaskMeta : + pipeRuntimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) { + final int taskId = taskIdToPipeTaskMeta.getKey(); + final PipeTaskMeta pipeTaskMeta = taskIdToPipeTaskMeta.getValue(); + if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) { + taskIdToPipeTaskMap.put( + taskId, new PipeDataNodeTaskBuilder(pipeStaticMeta, taskId, pipeTaskMeta).build()); + } + } + + return taskIdToPipeTaskMap; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index 1b517419c985..c9bdeacd1526 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -21,10 +21,12 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType; import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask; import org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor; @@ -172,6 +174,31 @@ private PipeParameters blendUserAndSystemParameters(final PipeParameters userPar private void checkConflict( final PipeParameters extractorParameters, final PipeParameters connectorParameters) { + final boolean isExternalSource = + !BuiltinPipePlugin.BUILTIN_SOURCES.contains( + extractorParameters + .getStringOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_KEY, PipeExtractorConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase()); + final String pluginName = + connectorParameters + .getStringOrDefault( + Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY), + BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName()) + .toLowerCase(); + final boolean isWriteBackSink = + BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName().equals(pluginName) + || BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName().equals(pluginName); + + if (isExternalSource && isWriteBackSink) { + connectorParameters.addAttribute( + PipeConnectorConstant.CONNECTOR_USE_EVENT_USER_NAME_KEY, "true"); + LOGGER.info( + "PipeDataNodeTaskBuilder: When the extractor is an external source, the write-back sink will use the user name in the enriched event by default."); + } + try { final Pair insertionDeletionListeningOptionPair = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(extractorParameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskExtractorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskExtractorStage.java index f6a2030e5fa7..eb81827968fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskExtractorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskExtractorStage.java @@ -49,6 +49,9 @@ public PipeTaskExtractorStage( PipeTaskMeta pipeTaskMeta) { pipeExtractor = StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) + // regionId that is less than 0 means an external pipe source, use + // dataRegionExtractor + || regionId < 0 ? PipeDataNodeAgent.plugin().dataRegion().reflectExtractor(extractorParameters) : PipeDataNodeAgent.plugin().schemaRegion().reflectExtractor(extractorParameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java index 3be40341f030..184e6daad266 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java @@ -77,6 +77,9 @@ public PipeTaskProcessorStage( pipeName, creationTime, regionId, pipeTaskMeta)); final PipeProcessor pipeProcessor = StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) + // regionId that is less than 0 means an external pipe source, use + // dataRegionProcessor + || regionId < 0 ? PipeDataNodeAgent.plugin() .dataRegion() .getConfiguredProcessor( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java index 86ba75c37fc0..c7c7f115dfd9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java @@ -78,8 +78,10 @@ public synchronized String register( final boolean isDataRegionConnector = StorageEngine.getInstance() - .getAllDataRegionIds() - .contains(new DataRegionId(environment.getRegionId())); + .getAllDataRegionIds() + .contains(new DataRegionId(environment.getRegionId())) + // regionId that is less than 0 means an external pipe source, use dataRegionConnector + || environment.getRegionId() < 0; final int connectorNum; boolean realTimeFirst = false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index 403527666304..93bc00740f55 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -89,7 +89,9 @@ public PipeProcessorSubtask( this.subtaskCreationTime = System.currentTimeMillis(); // Only register dataRegions - if (StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))) { + if (StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) + // regionId that is less than 0 means an external pipe source, should register it + || regionId < 0) { PipeProcessorMetrics.getInstance().register(this); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java index 0bc4f76d25ce..49995a001703 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2; +import org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.protocol.session.IClientSession; @@ -79,9 +80,12 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SKIP_IF_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_USE_EVENT_USER_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_SKIP_IF_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_USE_EVENT_USER_NAME_KEY; import static org.apache.iotdb.db.exception.metadata.DatabaseNotSetException.DATABASE_NOT_SET; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause; @@ -100,6 +104,7 @@ public class WriteBackConnector implements PipeConnector { // Temporary, used to separate private IClientSession treeSession; private boolean skipIfNoPrivileges; + private boolean useEventUserName; private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null; @@ -168,6 +173,11 @@ public void customize( throw new PipeParameterNotValidException( String.format("Parameters in set %s are not allowed in 'skipif'", skipIfOptionSet)); } + + useEventUserName = + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_USE_EVENT_USER_NAME_KEY, SINK_USE_EVENT_USER_NAME_KEY), + CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE); } @Override @@ -240,8 +250,10 @@ private void doTransfer( final TSStatus status = insertBaseStatement.isWriteToTable() - ? executeStatementForTableModel(insertBaseStatement, dataBaseName) - : executeStatementForTreeModel(insertBaseStatement); + ? executeStatementForTableModel( + insertBaseStatement, dataBaseName, pipeInsertNodeTabletInsertionEvent.getUserName()) + : executeStatementForTreeModel( + insertBaseStatement, pipeInsertNodeTabletInsertionEvent.getUserName()); if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -281,9 +293,10 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion final TSStatus status = insertTabletStatement.isWriteToTable() - ? executeStatementForTableModel(insertTabletStatement, dataBaseName) - : executeStatementForTreeModel(insertTabletStatement); - + ? executeStatementForTableModel( + insertTabletStatement, dataBaseName, pipeRawTabletInsertionEvent.getUserName()) + : executeStatementForTreeModel( + insertTabletStatement, pipeRawTabletInsertionEvent.getUserName()); if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && !(skipIfNoPrivileges @@ -297,7 +310,45 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion @Override public void transfer(final Event event) throws Exception { - // Ignore the event except TabletInsertionEvent + // only transfer PipeStatementInsertionEvent + if (event instanceof PipeStatementInsertionEvent) { + doTransferWrapper((PipeStatementInsertionEvent) event); + } + } + + private void doTransferWrapper(final PipeStatementInsertionEvent pipeStatementInsertionEvent) + throws PipeException { + // We increase the reference count for this event to determine if the event may be released. + if (!pipeStatementInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName())) { + return; + } + try { + doTransfer(pipeStatementInsertionEvent); + } finally { + pipeStatementInsertionEvent.decreaseReferenceCount(WriteBackConnector.class.getName(), false); + } + } + + private void doTransfer(final PipeStatementInsertionEvent pipeStatementInsertionEvent) + throws PipeException { + + final TSStatus status = + pipeStatementInsertionEvent.isTableModelEvent() + ? executeStatementForTableModel( + pipeStatementInsertionEvent.getStatement(), + pipeStatementInsertionEvent.getTableModelDatabaseName(), + pipeStatementInsertionEvent.getUserName()) + : executeStatementForTreeModel( + pipeStatementInsertionEvent.getStatement(), + pipeStatementInsertionEvent.getUserName()); + + if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException( + String.format( + "Write back PipeStatementInsertionEvent %s error, result status %s", + pipeStatementInsertionEvent, status)); + } } @Override @@ -310,9 +361,14 @@ public void close() throws Exception { } } - private TSStatus executeStatementForTableModel(Statement statement, String dataBaseName) { + private TSStatus executeStatementForTableModel( + Statement statement, String dataBaseName, final String userName) { session.setDatabaseName(dataBaseName); session.setSqlDialect(IClientSession.SqlDialect.TABLE); + final String originalUerName = session.getDatabaseName(); + if (useEventUserName && userName != null) { + session.setUsername(userName); + } SESSION_MANAGER.registerSession(session); try { autoCreateDatabaseIfNecessary(dataBaseName); @@ -366,6 +422,9 @@ private TSStatus executeStatementForTableModel(Statement statement, String dataB throw e; } finally { SESSION_MANAGER.removeCurrSession(); + if (useEventUserName) { + session.setUsername(originalUerName); + } } } @@ -410,9 +469,13 @@ private void autoCreateDatabaseIfNecessary(final String database) { ALREADY_CREATED_DATABASES.add(database); } - private TSStatus executeStatementForTreeModel(final Statement statement) { + private TSStatus executeStatementForTreeModel(final Statement statement, final String userName) { treeSession.setDatabaseName(null); treeSession.setSqlDialect(IClientSession.SqlDialect.TREE); + final String originalUerName = treeSession.getUsername(); + if (useEventUserName && userName != null) { + treeSession.setUsername(userName); + } SESSION_MANAGER.registerSession(treeSession); try { return Coordinator.getInstance() @@ -428,6 +491,9 @@ private TSStatus executeStatementForTreeModel(final Statement statement) { .status; } finally { SESSION_MANAGER.removeCurrSession(); + if (useEventUserName) { + treeSession.setUsername(originalUerName); + } } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java new file mode 100644 index 000000000000..e7a3cf1d6dd5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.statement; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; +import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; +import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; +import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class PipeStatementInsertionEvent extends PipeInsertionEvent + implements ReferenceTrackableEvent, AutoCloseable { + + // For better calculation + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(PipeStatementInsertionEvent.class); + private InsertBaseStatement statement; + private boolean needToReport; + private final PipeTabletMemoryBlock allocatedMemoryBlock; + private volatile ProgressIndex progressIndex; + + public PipeStatementInsertionEvent( + String pipeName, + long creationTime, + PipeTaskMeta pipeTaskMeta, + TreePattern treePattern, + TablePattern tablePattern, + String userName, + boolean skipIfNoPrivileges, + Boolean isTableModelEvent, + String databaseNameFromDataRegion, + InsertBaseStatement statement) { + super( + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + userName, + skipIfNoPrivileges, + Long.MIN_VALUE, + Long.MAX_VALUE, + isTableModelEvent, + databaseNameFromDataRegion, + null, + null); + this.statement = statement; + // Allocate empty memory block, will be resized later. + this.allocatedMemoryBlock = + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); + } + + @Override + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { + PipeDataNodeResourceManager.memory() + .forceResize( + allocatedMemoryBlock, + PipeMemoryWeightUtil.calculateInsertBaseStatementSizeInBytes(statement) + + INSTANCE_SIZE); + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .increaseTabletEventCount(pipeName, creationTime); + } + return true; + } + + @Override + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { + if (Objects.nonNull(pipeName)) { + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .decreaseTabletEventCount(pipeName, creationTime); + } + allocatedMemoryBlock.close(); + + statement = null; + return true; + } + + @Override + public void bindProgressIndex(final ProgressIndex progressIndex) { + // Normally not all events need to report progress, but if the progressIndex + // is given, indicating that the progress needs to be reported. + if (Objects.nonNull(progressIndex)) { + markAsNeedToReport(); + } + + this.progressIndex = progressIndex; + } + + @Override + public ProgressIndex getProgressIndex() { + return progressIndex == null ? null : progressIndex; + } + + @Override + public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + String pipeName, + long creationTime, + PipeTaskMeta pipeTaskMeta, + TreePattern treePattern, + TablePattern tablePattern, + String userName, + boolean skipIfNoPrivileges, + long startTime, + long endTime) { + return null; + } + + @Override + public boolean isGeneratedByPipe() { + return false; + } + + @Override + public boolean mayEventTimeOverlappedWithTimeRange() { + throw new UnsupportedOperationException("isGeneratedByPipe() is not supported!"); + } + + @Override + public boolean mayEventPathsOverlappedWithPattern() { + return false; + } + + public void markAsNeedToReport() { + this.needToReport = true; + } + + public Statement getStatement() { + return statement; + } + + /////////////////////////// Object /////////////////////////// + + @Override + public String toString() { + return String.format( + "PipeStatementInsertionEvent{statement=%s, needToReport=%s, allocatedMemoryBlock=%s}", + statement, needToReport, allocatedMemoryBlock) + + " - " + + super.toString(); + } + + @Override + public String coreReportMessage() { + return String.format( + "PipeStatementInsertionEvent{statement=%s, needToReport=%s, allocatedMemoryBlock=%s}", + statement, needToReport, allocatedMemoryBlock) + + " - " + + super.coreReportMessage(); + } + + /////////////////////////// ReferenceTrackableEvent /////////////////////////// + @Override + protected void trackResource() { + PipeDataNodeResourceManager.ref().trackPipeEventResource(this, eventResourceBuilder()); + } + + @Override + public PipePhantomReferenceManager.PipeEventResource eventResourceBuilder() { + return new PipeStatementInsertionEventResource( + this.isReleased, this.referenceCount, this.allocatedMemoryBlock); + } + + private static class PipeStatementInsertionEventResource + extends PipePhantomReferenceManager.PipeEventResource { + + private final PipeTabletMemoryBlock allocatedMemoryBlock; + + private PipeStatementInsertionEventResource( + final AtomicBoolean isReleased, + final AtomicInteger referenceCount, + final PipeTabletMemoryBlock allocatedMemoryBlock) { + super(isReleased, referenceCount); + this.allocatedMemoryBlock = allocatedMemoryBlock; + } + + @Override + protected void finalizeResource() { + allocatedMemoryBlock.close(); + } + } + + /////////////////////////// AutoCloseable /////////////////////////// + + @Override + public void close() throws Exception {} +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 10b664cf4b57..d998eb5ea8e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -142,6 +142,39 @@ public PipeRawTabletInsertionEvent( Long.MAX_VALUE); } + public PipeRawTabletInsertionEvent( + final Boolean isTableModelEvent, + final String databaseName, + final String tableModelDataBaseName, + final String treeModelDataBaseName, + final Tablet tablet, + final boolean isAligned, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final EnrichedEvent sourceEvent, + final boolean needToReport, + final String userName) { + this( + isTableModelEvent, + databaseName, + tableModelDataBaseName, + treeModelDataBaseName, + tablet, + isAligned, + sourceEvent, + needToReport, + pipeName, + creationTime, + pipeTaskMeta, + null, + null, + userName, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + } + @TestOnly public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned) { this( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTExtractor.java new file mode 100644 index 000000000000..41f6ad1f2f23 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTExtractor.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.extractor.mqtt; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; +import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; +import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator; +import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; + +import io.moquette.BrokerConstants; +import io.moquette.broker.Server; +import io.moquette.broker.config.IConfig; +import io.moquette.broker.config.MemoryConfig; +import io.moquette.broker.security.IAuthenticator; +import io.moquette.interception.InterceptHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * MQTTExtractor is an external Extractor that uses the MQTT protocol to receive data. It starts an + * MQTT broker and listens for incoming messages, which are then processed and passed to the pending + * queue. + */ +@TreeModel +@TableModel +public class MQTTExtractor implements PipeExtractor { + + private static final Logger LOGGER = LoggerFactory.getLogger(MQTTExtractor.class); + + protected String pipeName; + protected long creationTime; + protected PipeTaskMeta pipeTaskMeta; + protected final UnboundedBlockingPendingQueue pendingQueue = + new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter()); + + protected IConfig config; + protected List handlers; + protected IAuthenticator authenticator; + private final Server server = new Server(); + + protected final AtomicBoolean isClosed = new AtomicBoolean(false); + + @Override + public void validate(final PipeParameterValidator validator) throws Exception { + if (!validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY, + PipeExtractorConstant.EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY), + PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE)) { + throw new PipeParameterNotValidException("single mode should be true in MQTT extractor"); + } + } + + @Override + public void customize( + final PipeParameters parameters, final PipeExtractorRuntimeConfiguration configuration) + throws Exception { + final PipeTaskExtractorRuntimeEnvironment environment = + (PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment(); + pipeName = environment.getPipeName(); + creationTime = environment.getCreationTime(); + pipeTaskMeta = environment.getPipeTaskMeta(); + config = createBrokerConfig(parameters); + handlers = new ArrayList<>(1); + handlers.add(new MQTTPublishHandler(parameters, environment, pendingQueue)); + authenticator = new BrokerAuthenticator(); + } + + @Override + public void start() throws Exception { + try { + server.startServer(config, handlers, null, authenticator, null); + } catch (IOException e) { + throw new RuntimeException("Exception while starting server", e); + } + + LOGGER.info( + "Start MQTT Extractor successfully,listening on ip {} port {}", + config.getProperty(BrokerConstants.HOST_PROPERTY_NAME), + config.getProperty(BrokerConstants.PORT_PROPERTY_NAME)); + + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + LOGGER.info("Stopping IoTDB MQTT Extractor..."); + shutdown(); + LOGGER.info("MQTT Extractor stopped."); + })); + } + + @Override + public Event supply() throws Exception { + if (isClosed.get()) { + return null; + } + EnrichedEvent event = pendingQueue.directPoll(); + return event; + } + + @Override + public void close() throws Exception { + if (!isClosed.get()) { + shutdown(); + isClosed.set(true); + } + } + + private IConfig createBrokerConfig(final PipeParameters pipeParameters) { + final Properties properties = new Properties(); + properties.setProperty( + BrokerConstants.HOST_PROPERTY_NAME, + pipeParameters.getStringOrDefault( + PipeExtractorConstant.MQTT_BROKER_HOST_KEY, + PipeExtractorConstant.MQTT_BROKER_HOST_DEFAULT_VALUE)); + properties.setProperty( + BrokerConstants.PORT_PROPERTY_NAME, + pipeParameters.getStringOrDefault( + PipeExtractorConstant.MQTT_BROKER_PORT_KEY, + PipeExtractorConstant.MQTT_BROKER_PORT_DEFAULT_VALUE)); + properties.setProperty( + BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, + pipeParameters.getStringOrDefault( + PipeExtractorConstant.MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_KEY, + String.valueOf( + PipeExtractorConstant.MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_DEFAULT_VALUE))); + properties.setProperty( + BrokerConstants.DATA_PATH_PROPERTY_NAME, + pipeParameters.getStringOrDefault( + PipeExtractorConstant.MQTT_DATA_PATH_PROPERTY_NAME_KEY, + PipeExtractorConstant.MQTT_DATA_PATH_PROPERTY_NAME_DEFAULT_VALUE)); + properties.setProperty( + BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, + pipeParameters.getStringOrDefault( + PipeExtractorConstant.MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_KEY, + String.valueOf( + PipeExtractorConstant.MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_DEFAULT_VALUE))); + properties.setProperty( + BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, + pipeParameters.getStringOrDefault( + PipeExtractorConstant.MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_KEY, + String.valueOf( + PipeExtractorConstant.MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_DEFAULT_VALUE))); + properties.setProperty( + BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, + pipeParameters.getStringOrDefault( + PipeExtractorConstant.MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_KEY, + String.valueOf( + PipeExtractorConstant.MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_DEFAULT_VALUE))); + properties.setProperty( + BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME, + pipeParameters.getStringOrDefault( + PipeExtractorConstant.MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_KEY, + String.valueOf( + PipeExtractorConstant.MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_DEFAULT_VALUE))); + return new MemoryConfig(properties); + } + + public void shutdown() { + server.stopServer(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java new file mode 100644 index 000000000000..75b54c68b4ed --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.extractor.mqtt; + +import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; +import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent; +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatManager; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; +import org.apache.iotdb.db.protocol.mqtt.TreeMessage; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.MqttClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.utils.CommonUtils; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; + +import io.moquette.interception.AbstractInterceptHandler; +import io.moquette.interception.messages.InterceptConnectMessage; +import io.moquette.interception.messages.InterceptDisconnectMessage; +import io.moquette.interception.messages.InterceptPublishMessage; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.ZoneId; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** PublishHandler handle the messages from MQTT clients. */ +public class MQTTPublishHandler extends AbstractInterceptHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(MQTTPublishHandler.class); + + private final SessionManager sessionManager = SessionManager.getInstance(); + + private final ConcurrentHashMap clientIdToSessionMap = + new ConcurrentHashMap<>(); + private final PayloadFormatter payloadFormat; + private final boolean useTableInsert; + private final UnboundedBlockingPendingQueue pendingQueue; + private final String pipeName; + private final long creationTime; + private final PipeTaskMeta pipeTaskMeta; + + public MQTTPublishHandler( + final PipeParameters pipeParameters, + final PipeTaskExtractorRuntimeEnvironment environment, + final UnboundedBlockingPendingQueue pendingQueue) { + this.payloadFormat = + PayloadFormatManager.getPayloadFormat( + pipeParameters.getStringOrDefault( + PipeExtractorConstant.MQTT_PAYLOAD_FORMATTER_KEY, + PipeExtractorConstant.MQTT_PAYLOAD_FORMATTER_DEFAULT_VALUE)); + useTableInsert = PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType()); + pipeName = environment.getPipeName(); + creationTime = environment.getCreationTime(); + pipeTaskMeta = environment.getPipeTaskMeta(); + this.pendingQueue = pendingQueue; + } + + @Override + public String getID() { + return "mqtt-source-broker-listener"; + } + + @Override + public void onConnect(InterceptConnectMessage msg) { + if (!clientIdToSessionMap.containsKey(msg.getClientID())) { + final MqttClientSession session = new MqttClientSession(msg.getClientID()); + sessionManager.login( + session, + msg.getUsername(), + new String(msg.getPassword()), + ZoneId.systemDefault().toString(), + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, + ClientVersion.V_1_0, + useTableInsert ? IClientSession.SqlDialect.TABLE : IClientSession.SqlDialect.TREE); + sessionManager.registerSession(session); + clientIdToSessionMap.put(msg.getClientID(), session); + } + } + + @Override + public void onDisconnect(InterceptDisconnectMessage msg) { + final MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID()); + if (null != session) { + sessionManager.removeCurrSession(); + sessionManager.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution); + } + } + + @Override + public void onPublish(InterceptPublishMessage msg) { + try { + final String clientId = msg.getClientID(); + if (!clientIdToSessionMap.containsKey(clientId)) { + return; + } + final MqttClientSession session = clientIdToSessionMap.get(msg.getClientID()); + final ByteBuf payload = msg.getPayload(); + final String topic = msg.getTopicName(); + final String username = msg.getUsername(); + final MqttQoS qos = msg.getQos(); + + LOGGER.debug( + "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}", + clientId, + username, + qos, + topic, + payload); + + final List messages = payloadFormat.format(payload); + if (messages == null) { + return; + } + + for (Message message : messages) { + if (message == null) { + continue; + } + if (useTableInsert) { + final TableMessage tableMessage = (TableMessage) message; + // '/' previously defined as a database name + final String database = + !msg.getTopicName().contains("/") + ? msg.getTopicName() + : msg.getTopicName().substring(0, msg.getTopicName().indexOf("/")); + tableMessage.setDatabase(database.toLowerCase()); + extractTable(tableMessage, session); + } else { + extractTree((TreeMessage) message, session); + } + } + } catch (Throwable t) { + LOGGER.warn("onPublish execution exception, msg is [{}], error is ", msg, t); + } finally { + // release the payload of the message + super.onPublish(msg); + } + } + + /** Inserting table using tablet */ + private void extractTable(final TableMessage message, final MqttClientSession session) { + try { + TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp()); + InsertTabletStatement insertTabletStatement = constructInsertTabletStatement(message); + session.setDatabaseName(message.getDatabase().toLowerCase()); + session.setSqlDialect(IClientSession.SqlDialect.TABLE); + final EnrichedEvent event = + new PipeStatementInsertionEvent( + pipeName, + creationTime, + pipeTaskMeta, + null, + null, + session.getUsername(), + true, + true, + session.getDatabaseName(), + insertTabletStatement); + if (!event.increaseReferenceCount(MQTTPublishHandler.class.getName())) { + LOGGER.warn("The reference count of the event {} cannot be increased, skipping it.", event); + return; + } + pendingQueue.waitedOffer(event); + } catch (Exception e) { + LOGGER.warn( + "meet error when polling mqtt source message database {}, table {}, tags {}, attributes {}, fields {}, at time {}, because ", + message.getDatabase(), + message.getTable(), + message.getTagKeys(), + message.getAttributeKeys(), + message.getFields(), + message.getTimestamp(), + e); + } + } + + private InsertTabletStatement constructInsertTabletStatement(TableMessage message) + throws IllegalPathException { + InsertTabletStatement statement = new InsertTabletStatement(); + statement.setDevicePath( + DataNodeDevicePathCache.getInstance().getPartialPath(message.getTable())); + List measurements = + Stream.of(message.getFields(), message.getTagKeys(), message.getAttributeKeys()) + .flatMap(List::stream) + .collect(Collectors.toList()); + statement.setMeasurements(measurements.toArray(new String[0])); + long[] timestamps = new long[] {message.getTimestamp()}; + statement.setTimes(timestamps); + int columnSize = measurements.size(); + int rowSize = 1; + + BitMap[] bitMaps = new BitMap[columnSize]; + Object[] columns = + Stream.of(message.getValues(), message.getTagValues(), message.getAttributeValues()) + .flatMap(List::stream) + .toArray(Object[]::new); + statement.setColumns(columns); + statement.setBitMaps(bitMaps); + statement.setRowCount(rowSize); + statement.setAligned(false); + statement.setWriteToTable(true); + TSDataType[] dataTypes = new TSDataType[measurements.size()]; + TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[measurements.size()]; + for (int i = 0; i < message.getFields().size(); i++) { + dataTypes[i] = message.getDataTypes().get(i); + columnCategories[i] = TsTableColumnCategory.FIELD; + } + for (int i = message.getFields().size(); + i < message.getFields().size() + message.getTagKeys().size(); + i++) { + dataTypes[i] = TSDataType.STRING; + columnCategories[i] = TsTableColumnCategory.TAG; + } + for (int i = message.getFields().size() + message.getTagKeys().size(); + i + < message.getFields().size() + + message.getTagKeys().size() + + message.getAttributeKeys().size(); + i++) { + dataTypes[i] = TSDataType.STRING; + columnCategories[i] = TsTableColumnCategory.ATTRIBUTE; + } + statement.setDataTypes(dataTypes); + statement.setColumnCategories(columnCategories); + + return statement; + } + + private void extractTree(final TreeMessage message, final MqttClientSession session) { + try { + InsertRowStatement statement = new InsertRowStatement(); + statement.setDevicePath( + DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice())); + TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp()); + statement.setTime(message.getTimestamp()); + statement.setMeasurements(message.getMeasurements().toArray(new String[0])); + if (message.getDataTypes() == null) { + statement.setDataTypes(new TSDataType[message.getMeasurements().size()]); + statement.setValues(message.getValues().toArray(new Object[0])); + statement.setNeedInferType(true); + } else { + List dataTypes = message.getDataTypes(); + List values = message.getValues(); + Object[] inferredValues = new Object[values.size()]; + for (int i = 0; i < values.size(); ++i) { + inferredValues[i] = + values.get(i) == null + ? null + : CommonUtils.parseValue(dataTypes.get(i), values.get(i)); + } + statement.setDataTypes(dataTypes.toArray(new TSDataType[0])); + statement.setValues(inferredValues); + } + statement.setAligned(false); + final EnrichedEvent event = + new PipeStatementInsertionEvent( + pipeName, + creationTime, + pipeTaskMeta, + null, + null, + session.getUsername(), + true, + false, + message.getDevice(), + statement); + if (!event.increaseReferenceCount(MQTTPublishHandler.class.getName())) { + LOGGER.warn("The reference count of the event {} cannot be increased, skipping it.", event); + return; + } + pendingQueue.waitedOffer(event); + } catch (Exception e) { + LOGGER.warn( + "meet error when polling mqtt source device {}, measurements {}, at time {}, because ", + message.getDevice(), + message.getMeasurements(), + message.getTimestamp(), + e); + } + } + + @Override + public void onSessionLoopError(Throwable throwable) { + // TODO: Implement something sensible here ... + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index 5f28fedf14a7..99ed066b7dbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.pipe.event.common.row.PipeRow; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.utils.MemUtils; import org.apache.tsfile.enums.TSDataType; @@ -350,6 +351,11 @@ public static long calculateAlignedChunkMetaBytesUsed( return size; } + public static long calculateInsertBaseStatementSizeInBytes( + InsertBaseStatement insertBaseStatement) { + return insertBaseStatement.ramBytesUsed(); + } + /** * Rounds up the given integer num to the nearest multiple of n. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java index bbc9e62f3890..258c98d63645 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java @@ -27,6 +27,7 @@ import com.google.gson.JsonParseException; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; +import org.apache.tsfile.enums.TSDataType; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -45,6 +46,7 @@ public class JSONPayloadFormatter implements PayloadFormatter { private static final String JSON_KEY_TIMESTAMPS = "timestamps"; private static final String JSON_KEY_MEASUREMENTS = "measurements"; private static final String JSON_KEY_VALUES = "values"; + private static final String JSON_KEY_DATATYPE = "datatypes"; private static final Gson GSON = new GsonBuilder().create(); @Override @@ -88,6 +90,11 @@ private List formatJson(JsonObject jsonObject) { jsonObject.get(JSON_KEY_MEASUREMENTS), new TypeToken>() {}.getType())); message.setValues( GSON.fromJson(jsonObject.get(JSON_KEY_VALUES), new TypeToken>() {}.getType())); + if (jsonObject.has(JSON_KEY_DATATYPE)) { + message.setDataTypes( + GSON.fromJson( + jsonObject.get(JSON_KEY_DATATYPE), new TypeToken>() {}.getType())); + } return Lists.newArrayList(message); } @@ -103,6 +110,11 @@ private List formatBatchJson(JsonObject jsonObject) { List> values = GSON.fromJson( jsonObject.get(JSON_KEY_VALUES), new TypeToken>>() {}.getType()); + List types = + jsonObject.has(JSON_KEY_DATATYPE) + ? GSON.fromJson( + jsonObject.get(JSON_KEY_DATATYPE), new TypeToken>() {}.getType()) + : null; List ret = new ArrayList<>(timestamps.size()); for (int i = 0; i < timestamps.size(); i++) { @@ -111,6 +123,7 @@ private List formatBatchJson(JsonObject jsonObject) { message.setTimestamp(timestamps.get(i)); message.setMeasurements(measurements); message.setValues(values.get(i)); + message.setDataTypes(types); ret.add(message); } return ret; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 8b0966a66eb0..9240eac6271c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -2073,7 +2073,18 @@ public SettableFuture alterPipe(final AlterPipeStatement alter final Map processorAttributes; final Map connectorAttributes; try { + if (!alterPipeStatement.getExtractorAttributes().isEmpty()) { + // We don't allow to change the extractor type + if (alterPipeStatement + .getExtractorAttributes() + .containsKey(PipeExtractorConstant.EXTRACTOR_KEY) + || alterPipeStatement + .getExtractorAttributes() + .containsKey(PipeExtractorConstant.SOURCE_KEY)) + checkIfSameSourceType( + new PipeParameters(alterPipeStatement.getExtractorAttributes()), + pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters()); if (alterPipeStatement.isReplaceAllExtractorAttributes()) { extractorAttributes = alterPipeStatement.getExtractorAttributes(); } else { @@ -2220,6 +2231,30 @@ private static void checkSinkType( } } + private static void checkIfSameSourceType( + PipeParameters newPipeParameters, PipeParameters oldPipeParameters) { + final String newPluginName = + newPipeParameters + .getStringOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_KEY, PipeExtractorConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase(); + final String oldPluginName = + oldPipeParameters + .getStringOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_KEY, PipeExtractorConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase(); + if (!PipeDataNodeAgent.plugin().checkIfPluginSameType(newPluginName, oldPluginName)) { + throw new SemanticException( + String.format( + "Failed to alter pipe, the source type of the pipe cannot be changed from %s to %s", + oldPluginName, newPluginName)); + } + } + @Override public SettableFuture startPipe(final StartPipeStatement startPipeStatement) { final SettableFuture future = SettableFuture.create(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 0f2e39f24505..8540776541df 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -33,6 +33,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.writeback.WriteBackConnector; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.donothing.DoNothingExtractor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.iotdb.IoTDBExtractor; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.mqtt.MQTTExtractor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor; @@ -59,6 +60,9 @@ public enum BuiltinPipePlugin { DO_NOTHING_SOURCE("do-nothing-source", DoNothingExtractor.class), IOTDB_SOURCE("iotdb-source", IoTDBExtractor.class), + MQTT_EXTRACTOR("mqtt-extractor", MQTTExtractor.class), + MQTT_SOURCE("mqtt-source", MQTTExtractor.class), + // processors DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class), TUMBLING_TIME_SAMPLING_PROCESSOR( @@ -129,6 +133,16 @@ public String getClassName() { return className; } + // used to distinguish between builtin and external sources + public static final Set BUILTIN_SOURCES = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + DO_NOTHING_EXTRACTOR.getPipePluginName().toLowerCase(), + IOTDB_EXTRACTOR.getPipePluginName().toLowerCase(), + DO_NOTHING_SOURCE.getPipePluginName().toLowerCase(), + IOTDB_SOURCE.getPipePluginName().toLowerCase()))); + public static final Set SHOW_PIPE_PLUGINS_BLACKLIST = Collections.unmodifiableSet( new HashSet<>( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/extractor/mqtt/MQTTExtractor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/extractor/mqtt/MQTTExtractor.java new file mode 100644 index 000000000000..7bc261d13aa8 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/extractor/mqtt/MQTTExtractor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.mqtt; + +import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; + +/** + * This class is a placeholder and should not be initialized. It represents an external Extractor + * that uses the MQTT protocol to receive data. There is a real implementation in the server module + * but cannot be imported here. The pipe agent in the server module will replace this class with the + * real implementation when initializing the MQTTExtractor. + */ +@TreeModel +@TableModel +public class MQTTExtractor implements PipeExtractor { + + private static final String PLACEHOLDER_ERROR_MSG = + "This class is a placeholder and should not be used."; + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + } + + @Override + public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) + throws Exception { + throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + } + + @Override + public void start() throws Exception { + throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + } + + @Override + public Event supply() throws Exception { + throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + } + + @Override + public void close() throws Exception { + throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java index 5178aeb26431..dcd984416c8a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.pipe.agent.task.meta; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -33,6 +35,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -89,6 +92,16 @@ public PipeType getPipeType() { return PipeType.getPipeType(pipeName); } + public boolean isSourceExternal() { + return !BuiltinPipePlugin.BUILTIN_SOURCES.contains( + extractorParameters + .getStringOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_KEY, PipeExtractorConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase()); + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index 6bfc127e50bb..eec6f45f6801 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -261,6 +261,10 @@ public class PipeConnectorConstant { public static final String CONNECTOR_OPC_DA_PROGID_KEY = "connector.opcda.progid"; public static final String SINK_OPC_DA_PROGID_KEY = "sink.opcda.progid"; + public static final String CONNECTOR_USE_EVENT_USER_NAME_KEY = "connector.use-event-user-name"; + public static final String SINK_USE_EVENT_USER_NAME_KEY = "sink.use-event-user-name"; + public static final boolean CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE = false; + private PipeConnectorConstant() { throw new IllegalStateException("Utility class"); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index b1804b7e18b9..dd51b8d331b6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -19,6 +19,11 @@ package org.apache.iotdb.commons.pipe.config.constant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + public class PipeExtractorConstant { public static final String EXTRACTOR_KEY = "extractor"; @@ -147,6 +152,41 @@ public class PipeExtractorConstant { public static final String SOURCE_SKIP_IF_KEY = "source.skipif"; public static final String EXTRACTOR_IOTDB_SKIP_IF_NO_PRIVILEGES = "no-privileges"; + ////////////////// external sources //////////////// + public static final String EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY = "extractor.balance-strategy"; + public static final String EXTERNAL_SOURCE_BALANCE_STRATEGY_KEY = "source.balance-strategy"; + public static final String EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY = "proportion"; + public static final Set EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_SET = + Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY))); + public static final String EXTERNAL_EXTRACTOR_PARALLELISM_KEY = "extractor.parallelism"; + public static final String EXTERNAL_SOURCE_PARALLELISM_KEY = "source.parallelism"; + public static final int EXTERNAL_EXTRACTOR_PARALLELISM_DEFAULT_VALUE = 1; + public static final String EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY = + "extractor.single-mode"; + public static final String EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY = "source.single-mode"; + public static final boolean EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE = true; + + public static final String MQTT_BROKER_HOST_KEY = "mqtt.host"; + public static final String MQTT_BROKER_HOST_DEFAULT_VALUE = "127.0.0.1"; + public static final String MQTT_BROKER_PORT_KEY = "mqtt.port"; + public static final String MQTT_BROKER_PORT_DEFAULT_VALUE = "1883"; + public static final String MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_KEY = "mqtt.pool-size"; + public static final int MQTT_BROKER_INTERCEPTOR_THREAD_POOL_SIZE_DEFAULT_VALUE = 1; + public static final String MQTT_DATA_PATH_PROPERTY_NAME_KEY = "mqtt.data-path"; + public static final String MQTT_DATA_PATH_PROPERTY_NAME_DEFAULT_VALUE = "data/"; + public static final String MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_KEY = "mqtt.immediate-flush"; + public static final boolean MQTT_IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME_DEFAULT_VALUE = true; + public static final String MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_KEY = "mqtt.allow-anonymous"; + public static final boolean MQTT_ALLOW_ANONYMOUS_PROPERTY_NAME_DEFAULT_VALUE = false; + public static final String MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_KEY = + "mqtt.allow-zero-byte-client-id"; + public static final boolean MQTT_ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME_DEFAULT_VALUE = true; + public static final String MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_KEY = "mqtt.max-message-size"; + public static final long MQTT_NETTY_MAX_BYTES_PROPERTY_NAME_DEFAULT_VALUE = 1048576; + public static final String MQTT_PAYLOAD_FORMATTER_KEY = "mqtt.payload-formatter"; + public static final String MQTT_PAYLOAD_FORMATTER_DEFAULT_VALUE = "json"; + ///////////////////// pipe consensus ///////////////////// public static final String EXTRACTOR_CONSENSUS_GROUP_ID_KEY = "extractor.consensus.group-id";