Skip to content

Commit

Permalink
Merge branch 'master' into sub-file
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed May 23, 2024
2 parents e9efcbb + 0ebac6b commit 715005c
Show file tree
Hide file tree
Showing 128 changed files with 6,062 additions and 772 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.it.autocreate;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
@Test
public void testAutoDropInHistoricalTransfer() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

if (!TestUtils.tryExecuteNonQueryWithRetry(
senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed", "true");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.assertDataEventuallyOnEnv(
senderEnv,
"show pipes",
"ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,",
Collections.emptySet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -915,17 +915,17 @@ public void testNegativeTimestamp() throws Exception {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s1) values (-123, 3)",
"insert into root.db.d1(time, s1) values (now(), 3)",
"flush"))) {
// Test the correctness of insertRowsNode transmission
"insert into root.db.d1(time, s1) values (-122, 3)",
"insert into root.db.d1(time, s1) values (-123, 3), (now(), 3)"))) {
return;
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.s1),",
Collections.singleton("5,"));
Collections.singleton("6,"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ public void testExtractorPatternMatch() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", null);
extractorAttributes.put("extractor.inclusion", "data");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -491,7 +491,7 @@ public void testMatchingMultipleDatabases() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db1");
extractorAttributes.put("extractor.inclusion", "data");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
Expand Down Expand Up @@ -589,7 +589,7 @@ public void testHistoryAndRealtime() throws Exception {
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

extractorAttributes.put("extractor.inclusion", "data");
extractorAttributes.put("extractor.inclusion", "data.insert");
extractorAttributes.put("extractor.pattern", "root.db.d2");
extractorAttributes.put("extractor.history.enable", "false");
extractorAttributes.put("extractor.realtime.enable", "true");
Expand Down Expand Up @@ -632,8 +632,7 @@ public void testHistoryAndRealtime() throws Exception {
"insert into root.db.d1 (time, at1) values (2, 11)",
"insert into root.db.d2 (time, at1) values (2, 21)",
"insert into root.db.d3 (time, at1) values (2, 31)",
"insert into root.db.d4 (time, at1) values (2, 41)",
"flush"))) {
"insert into root.db.d4 (time, at1) values (2, 41), (3, 51)"))) {
return;
}

Expand All @@ -646,7 +645,7 @@ public void testHistoryAndRealtime() throws Exception {
receiverEnv,
"select count(*) from root.** where time >= 2",
"count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
Collections.singleton("1,1,0,"));
Collections.singleton("2,1,0,"));
}
}

Expand Down Expand Up @@ -675,7 +674,7 @@ public void testHistoryStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws E
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d1");
extractorAttributes.put("extractor.inclusion", "data");
extractorAttributes.put("extractor.inclusion", "data.insert");
extractorAttributes.put("extractor.history.enable", "true");
// 1970-01-01T08:00:02+08:00
extractorAttributes.put("extractor.history.start-time", "2000");
Expand Down Expand Up @@ -825,7 +824,7 @@ public void testSourceStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws Ex
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("source.pattern", "root.db.d1");
extractorAttributes.put("source.inclusion", "data");
extractorAttributes.put("source.inclusion", "data.insert");
extractorAttributes.put("source.start-time", "1970-01-01T08:00:02+08:00");
// 1970-01-01T08:00:04+08:00
extractorAttributes.put("source.end-time", "4000");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

@RunWith(IoTDBTestRunner.class)
Expand Down Expand Up @@ -144,6 +145,54 @@ public void testAuthExclusion() throws Exception {
}
}

@Test
public void testAuthInclusionWithPattern() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "auth");
extractorAttributes.put("path", "root.ln.**");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"create user `ln_write_user` 'write_pwd'",
"GRANT READ_DATA, WRITE_DATA ON root.** TO USER ln_write_user;"))) {
return;
}

TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"LIST PRIVILEGES OF USER ln_write_user",
"ROLE,PATH,PRIVILEGES,GRANT OPTION,",
new HashSet<>(
Arrays.asList(",root.ln.**,READ_DATA,false,", ",root.ln.**,WRITE_DATA,false,")));
}
}

@Test
public void testPureDeleteInclusion() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
Expand Down Expand Up @@ -180,7 +229,7 @@ public void testPureDeleteInclusion() throws Exception {
senderEnv,
Arrays.asList(
"create timeseries root.ln.wf01.wt01.status with datatype=BOOLEAN,encoding=PLAIN",
"insert into root.ln.wf01.wt01(time, status) values(0, 1)",
"insert into root.ln.wf01.wt01(time, status) values(0, true)",
"flush"))) {
return;
}
Expand All @@ -191,7 +240,7 @@ public void testPureDeleteInclusion() throws Exception {
receiverEnv,
Arrays.asList(
"create timeseries root.ln.wf01.wt01.status1 with datatype=BOOLEAN,encoding=PLAIN",
"insert into root.ln.wf01.wt01(time, status1) values(0, 1)",
"insert into root.ln.wf01.wt01(time, status1) values(0, true)",
"flush"))) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,17 @@ public void testTemplateInclusion() throws Exception {
senderEnv,
Arrays.asList(
"create database root.ln",
"create database root.db",
"set ttl to root.ln 3600000",
"create user `thulab` 'passwd'",
"create role `admin`",
"grant role `admin` to `thulab`",
"grant read on root.** to role `admin`",
"create schema template t1 (temperature FLOAT encoding=RLE, status BOOLEAN encoding=PLAIN compression=SNAPPY)",
"set schema template t1 to root.ln.wf01",
"set schema template t1 to root.db.wf01",
"create timeseries using schema template on root.ln.wf01.wt01",
"create timeseries using schema template on root.db.wf01.wt01",
"create timeseries root.ln.wf02.wt01.status with datatype=BOOLEAN,encoding=PLAIN",
// Insert large timestamp to avoid deletion by ttl
"insert into root.ln.wf01.wt01(time, temperature, status) values (1800000000000, 23, true)"))) {
Expand All @@ -116,8 +119,8 @@ public void testTemplateInclusion() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "data, schema");
extractorAttributes.put(
"extractor.inclusion.exclusion", "schema.timeseries.ordinary, schema.ttl");
extractorAttributes.put("extractor.inclusion.exclusion", "schema.timeseries.ordinary");
extractorAttributes.put("extractor.path", "root.ln.**");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
Expand Down Expand Up @@ -150,7 +153,7 @@ public void testTemplateInclusion() throws Exception {
"Database,TTL,SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,",
// Receiver's SchemaReplicationFactor/DataReplicationFactor shall be 3/2 regardless of the
// sender
Collections.singleton("root.ln,null,3,2,604800000,"));
Collections.singleton("root.ln,3600000,3,2,604800000,"));
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.**",
Expand Down
3 changes: 2 additions & 1 deletion iotdb-client/client-py/requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ flake8==3.9.0
black==24.3.0
# Testcontainer
testcontainers==3.4.2
requests<2.32.0
# For releases
twine==3.4.1
wheel==0.38.1
wheel==0.38.1
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
regionDisk.putAll(heartbeatResp.getRegionDisk());
}
if (heartbeatResp.getPipeMetaList() != null) {
pipeRuntimeCoordinator.parseHeartbeat(nodeId, heartbeatResp.getPipeMetaList());
pipeRuntimeCoordinator.parseHeartbeat(
nodeId, heartbeatResp.getPipeMetaList(), heartbeatResp.getPipeCompletedList());
}
if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
loadManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteLogicalViewPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeSetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
Expand Down Expand Up @@ -492,6 +493,9 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case PipeDeactivateTemplate:
plan = new PipeDeactivateTemplatePlan();
break;
case PipeSetTTL:
plan = new PipeSetTTLPlan();
break;
case GetRegionId:
plan = new GetRegionIdPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public enum ConfigPhysicalPlanType {
PipeDeleteTimeSeries((short) 1702),
PipeDeleteLogicalView((short) 1703),
PipeDeactivateTemplate((short) 1704),
PipeSetTTL((short) 1705),

/** Subscription */
CreateTopic((short) 1800),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.confignode.consensus.request.auth;

import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
Expand Down Expand Up @@ -51,7 +50,7 @@ public class AuthorPlan extends ConfigPhysicalPlan {
private String userName;
private boolean grantOpt;

public AuthorPlan(ConfigPhysicalPlanType type) {
public AuthorPlan(final ConfigPhysicalPlanType type) {
super(type);
authorType = type;
}
Expand All @@ -67,18 +66,16 @@ public AuthorPlan(ConfigPhysicalPlanType type) {
* @param permissions permissions
* @param grantOpt with grant option, only grant statement can set grantOpt = true
* @param nodeNameList node name in Path structure
* @throws AuthException Authentication Exception
*/
public AuthorPlan(
ConfigPhysicalPlanType authorType,
String userName,
String roleName,
String password,
String newPassword,
Set<Integer> permissions,
boolean grantOpt,
List<PartialPath> nodeNameList)
throws AuthException {
final ConfigPhysicalPlanType authorType,
final String userName,
final String roleName,
final String password,
final String newPassword,
final Set<Integer> permissions,
final boolean grantOpt,
final List<PartialPath> nodeNameList) {
this(authorType);
this.authorType = authorType;
this.userName = userName;
Expand Down
Loading

0 comments on commit 715005c

Please sign in to comment.