Skip to content

Commit

Permalink
Pipe: strict check for synonym pipe parameters to avoid ambiguity (#1…
Browse files Browse the repository at this point in the history
VGalaxies authored Jan 14, 2025
1 parent b51f63c commit c65b826
Showing 5 changed files with 121 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -435,7 +435,6 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
extractorAttributes.put("database-name", "test.*");
extractorAttributes.put("table-name", "test.*");
extractorAttributes.put("inclusion", "data.insert");
extractorAttributes.put("mode.streaming", "true");
extractorAttributes.put("mode.snapshot", "false");
extractorAttributes.put("mode.strict", "true");

Original file line number Diff line number Diff line change
@@ -23,6 +23,10 @@
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PipeParameterValidator {

@@ -36,6 +40,40 @@ public PipeParameters getParameters() {
return parameters;
}

/**
* Validates whether the attributes entered by the user contain at least one attribute from
* lhsAttributes or rhsAttributes (if required), but not both.
*
* @param lhsAttributes list of left-hand side synonym attributes
* @param rhsAttributes list of right-hand side synonym attributes
* @param isRequired specifies whether at least one attribute from lhsAttributes or rhsAttributes
* must be provided
* @throws PipeParameterNotValidException if both lhsAttributes and rhsAttributes are provided
* @throws PipeAttributeNotProvidedException if isRequired is true and neither lhsAttributes nor
* rhsAttributes are provided
* @return the instance of PipeParameterValidator for method chaining
*/
public PipeParameterValidator validateSynonymAttributes(
final List<String> lhsAttributes,
final List<String> rhsAttributes,
final boolean isRequired) {
final boolean lhsExistence = lhsAttributes.stream().anyMatch(parameters::hasAttribute);
final boolean rhsExistence = rhsAttributes.stream().anyMatch(parameters::hasAttribute);
if (lhsExistence && rhsExistence) {
throw new PipeParameterNotValidException(
String.format(
"Cannot specify both %s and %s at the same time", lhsAttributes, rhsAttributes));
}
if (isRequired && !lhsExistence && !rhsExistence) {
throw new PipeAttributeNotProvidedException(
Stream.concat(lhsAttributes.stream(), rhsAttributes.stream())
.collect(
Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList))
.toString());
}
return this;
}

/**
* Validates whether the attributes entered by the user contain an attribute whose key is
* attributeKey.
@@ -83,7 +121,7 @@ public PipeParameterValidator validateAttributeValueRange(
* @throws PipeParameterNotValidException if the given argument is not valid
*/
public PipeParameterValidator validate(
final PipeParameterValidator.SingleObjectValidationRule validationRule,
final SingleObjectValidationRule validationRule,
final String messageToThrow,
final Object argument)
throws PipeParameterNotValidException {
@@ -107,7 +145,7 @@ public interface SingleObjectValidationRule {
* @throws PipeParameterNotValidException if the given arguments are not valid
*/
public PipeParameterValidator validate(
final PipeParameterValidator.MultipleObjectsValidationRule validationRule,
final MultipleObjectsValidationRule validationRule,
final String messageToThrow,
final Object... arguments)
throws PipeParameterNotValidException {
Original file line number Diff line number Diff line change
@@ -60,6 +60,8 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_DATABASE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
@@ -77,9 +79,11 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATH_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY;
@@ -91,8 +95,12 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_TABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_DATABASE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
@@ -104,11 +112,15 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATH_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_TABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_TABLE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._EXTRACTOR_WATERMARK_INTERVAL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._SOURCE_WATERMARK_INTERVAL_KEY;
@@ -174,25 +186,22 @@ public void validate(final PipeParameterValidator validator) throws Exception {
&& validator
.getParameters()
.hasAnyAttributes(
PipeExtractorConstant.EXTRACTOR_PATH_KEY,
PipeExtractorConstant.SOURCE_PATH_KEY,
PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
PipeExtractorConstant.SOURCE_PATTERN_KEY)) {
EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY, EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY)) {
throw new PipeException(
"The pipe cannot extract tree model data when sql dialect is set to table.");
}
if (!isTableModelDataAllowedToBeCaptured
&& validator
.getParameters()
.hasAnyAttributes(
PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY,
PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY,
PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY,
PipeExtractorConstant.SOURCE_TABLE_NAME_KEY,
PipeExtractorConstant.EXTRACTOR_DATABASE_KEY,
PipeExtractorConstant.SOURCE_DATABASE_KEY,
PipeExtractorConstant.EXTRACTOR_TABLE_KEY,
PipeExtractorConstant.SOURCE_TABLE_KEY)) {
EXTRACTOR_DATABASE_NAME_KEY,
SOURCE_DATABASE_NAME_KEY,
EXTRACTOR_TABLE_NAME_KEY,
SOURCE_TABLE_NAME_KEY,
EXTRACTOR_DATABASE_KEY,
SOURCE_DATABASE_KEY,
EXTRACTOR_TABLE_KEY,
SOURCE_TABLE_KEY)) {
throw new PipeException(
"The pipe cannot extract table model data when sql dialect is set to tree.");
}
@@ -287,7 +296,7 @@ public void validate(final PipeParameterValidator validator) throws Exception {
EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
}

checkInvalidParameters(validator.getParameters());
checkInvalidParameters(validator);

constructHistoricalExtractor();
constructRealtimeExtractor(validator.getParameters());
@@ -319,7 +328,9 @@ private void validatePattern(final TreePattern treePattern, final TablePattern t
}
}

private void checkInvalidParameters(final PipeParameters parameters) {
private void checkInvalidParameters(final PipeParameterValidator validator) {
final PipeParameters parameters = validator.getParameters();

// Enable history and realtime if specifying start-time or end-time
if (parameters.hasAnyAttributes(
SOURCE_START_TIME_KEY,
@@ -343,87 +354,66 @@ private void checkInvalidParameters(final PipeParameters parameters) {
EXTRACTOR_HISTORY_END_TIME_KEY);
}

// Check coexistence of database-name and database
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_DATABASE_NAME_KEY, SOURCE_DATABASE_NAME_KEY),
Arrays.asList(EXTRACTOR_DATABASE_KEY, SOURCE_DATABASE_KEY),
false);

// Check coexistence of table-name and table
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_TABLE_NAME_KEY, SOURCE_TABLE_NAME_KEY),
Arrays.asList(EXTRACTOR_TABLE_KEY, SOURCE_TABLE_KEY),
false);

// Check coexistence of mode.snapshot and mode
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)
&& parameters.hasAnyAttributes(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODE_SNAPSHOT_KEY,
SOURCE_MODE_SNAPSHOT_KEY,
EXTRACTOR_MODE_KEY,
SOURCE_MODE_KEY);
}
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY),
Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
false);

// Check coexistence of mode.streaming and realtime.mode
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)
&& parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODE_STREAMING_KEY,
SOURCE_MODE_STREAMING_KEY,
EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY);
}
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY),
Arrays.asList(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY),
false);

// Check coexistence of mode.strict, history.loose-range and realtime.loose-range
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) {
if (parameters.hasAnyAttributes(
EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODE_STRICT_KEY,
SOURCE_MODE_STRICT_KEY,
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY),
Arrays.asList(
EXTRACTOR_HISTORY_LOOSE_RANGE_KEY,
SOURCE_HISTORY_LOOSE_RANGE_KEY);
}
if (parameters.hasAnyAttributes(
EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODE_STRICT_KEY,
SOURCE_MODE_STRICT_KEY,
SOURCE_HISTORY_LOOSE_RANGE_KEY,
EXTRACTOR_REALTIME_LOOSE_RANGE_KEY,
SOURCE_REALTIME_LOOSE_RANGE_KEY);
}
}
SOURCE_REALTIME_LOOSE_RANGE_KEY),
false);

// Check coexistence of mods and mods.enable
if (parameters.hasAnyAttributes(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY)
&& parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_MODS_KEY,
SOURCE_MODS_KEY,
EXTRACTOR_MODS_ENABLE_KEY,
SOURCE_MODS_ENABLE_KEY);
}
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY),
Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY),
false);

// Check coexistence of watermark.interval-ms and watermark-interval-ms
if (parameters.hasAnyAttributes(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)
&& parameters.hasAnyAttributes(
_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) {
LOGGER.warn(
"When {} or {} is specified, specifying {} and {} is invalid.",
EXTRACTOR_WATERMARK_INTERVAL_KEY,
SOURCE_WATERMARK_INTERVAL_KEY,
_EXTRACTOR_WATERMARK_INTERVAL_KEY,
_SOURCE_WATERMARK_INTERVAL_KEY);
}
validator.validateSynonymAttributes(
Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY),
Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY),
false);

// Check if specifying mode.snapshot or mode.streaming when disable realtime extractor
if (!parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) {
LOGGER.info(
LOGGER.warn(
"When '{}' ('{}') is set to false, specifying {} and {} is invalid.",
EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY,
EXTRACTOR_MODE_SNAPSHOT_KEY,
SOURCE_MODE_SNAPSHOT_KEY);
}
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)) {
LOGGER.info(
LOGGER.warn(
"When '{}' ('{}') is set to false, specifying {} and {} is invalid.",
EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY,
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
@@ -50,6 +49,7 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;

@@ -102,32 +102,25 @@ public class TwoStageCountProcessor implements PipeProcessor {

@Override
public void validate(PipeParameterValidator validator) throws Exception {
checkInvalidParameters(validator.getParameters());

final String rawOutputSeries;
if (!validator.getParameters().hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)) {
validator.validateRequiredAttribute(_PROCESSOR_OUTPUT_SERIES_KEY);
rawOutputSeries = validator.getParameters().getString(_PROCESSOR_OUTPUT_SERIES_KEY);
} else {
rawOutputSeries = validator.getParameters().getString(PROCESSOR_OUTPUT_SERIES_KEY);
}
checkInvalidParameters(validator);

final String rawOutputSeries =
validator
.getParameters()
.getStringByKeys(PROCESSOR_OUTPUT_SERIES_KEY, _PROCESSOR_OUTPUT_SERIES_KEY);
try {
PathUtils.isLegalPath(rawOutputSeries);
} catch (IllegalPathException e) {
throw new IllegalArgumentException("Illegal output series path: " + rawOutputSeries);
PathUtils.isLegalPath(Objects.requireNonNull(rawOutputSeries));
} catch (Exception e) {
throw new PipeParameterNotValidException("Illegal output series path: " + rawOutputSeries);
}
}

private void checkInvalidParameters(final PipeParameters parameters) {
private void checkInvalidParameters(final PipeParameterValidator validator) {
// Check coexistence of output.series and output-series
if (parameters.hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)
&& parameters.hasAttribute(_PROCESSOR_OUTPUT_SERIES_KEY)) {
LOGGER.warn(
"When {} is specified, specifying {} is invalid.",
PROCESSOR_OUTPUT_SERIES_KEY,
_PROCESSOR_OUTPUT_SERIES_KEY);
}
validator.validateSynonymAttributes(
Collections.singletonList(PROCESSOR_OUTPUT_SERIES_KEY),
Collections.singletonList(_PROCESSOR_OUTPUT_SERIES_KEY),
true);
}

@Override
Original file line number Diff line number Diff line change
@@ -190,6 +190,12 @@ public void validate(final PipeParameterValidator validator) throws Exception {
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));

// Check coexistence of user and username
validator.validateSynonymAttributes(
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
false);

username =
parameters.getStringOrDefault(
Arrays.asList(

0 comments on commit c65b826

Please sign in to comment.