Skip to content

Feature/add headers and metadata #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
6509ba5
Rebase changes
ag-ramachandran Apr 4, 2024
e27733e
* Fix conflicts
ag-ramachandran Apr 4, 2024
ece33e1
* Changes for Avro
ag-ramachandran Apr 7, 2024
8adf338
* Changes for Generic format writer
ag-ramachandran Apr 16, 2024
b5a5c36
* Fix tests for format tests
ag-ramachandran Apr 17, 2024
e2b8c6d
* Reformat code
ag-ramachandran Apr 17, 2024
f6952db
* Remove System out logs
ag-ramachandran Apr 17, 2024
836090d
* Remove System out logs
ag-ramachandran Apr 17, 2024
2c6922d
* Remove System out logs
ag-ramachandran Apr 17, 2024
445470b
* Fix conflicts
ag-ramachandran Sep 17, 2024
c0b675f
* Fix version
ag-ramachandran May 5, 2024
b93c4e7
* Fix conflicts
ag-ramachandran Sep 17, 2024
4228c0e
* Fix conflicts
ag-ramachandran Sep 17, 2024
8664702
* Fix conflicts
ag-ramachandran Sep 17, 2024
dbaae7d
* Fix test
ag-ramachandran May 6, 2024
904e72e
* Fix conflicts
ag-ramachandran Sep 17, 2024
7b1d5e8
* Fix e2e tests
ag-ramachandran May 7, 2024
6b19a67
* Fix e2e tests
ag-ramachandran May 7, 2024
f39d23c
* Fix e2e tests
ag-ramachandran May 7, 2024
618c583
Feature/add headers and metadata v2 (#118)
ag-ramachandran May 10, 2024
c4593fb
* Fix tests and add additional scenarios
ag-ramachandran May 10, 2024
76fb4d6
Feature/add headers and metadata v4 (#119)
ag-ramachandran May 13, 2024
c08a47c
* Remove logs, Bump version
ag-ramachandran May 14, 2024
11db348
* Add additional conditional for preventing NPE on the struct record
ag-ramachandran May 31, 2024
e819f9f
* Add additional conditional for preventing NPE on the struct record
ag-ramachandran May 31, 2024
0b0b8f5
* Minor fixes in tests
ag-ramachandran Jun 3, 2024
42579c4
* Minor fixes in tests
ag-ramachandran Jun 3, 2024
346d747
* Additional fixes
ag-ramachandran Jun 4, 2024
b1b182f
* Additional fixes
ag-ramachandran Jun 4, 2024
249e51a
* Additional fixes
ag-ramachandran Jun 4, 2024
9aa4b85
* Fix conflicts
ag-ramachandran Sep 17, 2024
b1159ef
* Fix issues
ag-ramachandran Sep 17, 2024
6110e59
* Fix issues
ag-ramachandran Sep 17, 2024
b51f3e4
* Fix issues
ag-ramachandran Sep 17, 2024
fb182d2
* Fix issues
ag-ramachandran Sep 17, 2024
b03640a
* Fix issues
ag-ramachandran Sep 17, 2024
0d1962d
* Fix issues
ag-ramachandran Sep 17, 2024
f9be0e5
* Fix issues
ag-ramachandran Sep 17, 2024
206be89
* Add cleanup
ag-ramachandran Sep 17, 2024
b244578
* Add tests
ag-ramachandran Sep 17, 2024
6ac26a2
* Add tests
ag-ramachandran Sep 17, 2024
d042794
* Add more log traces for troubleshooting issues
ag-ramachandran Sep 18, 2024
6e430e0
* Add more log traces for troubleshooting issues
ag-ramachandran Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<description>A Kafka Connect plugin for Azure Data Explorer (Kusto) Database</description>
<version>4.1.3</version>
<version>5.0.0</version>
<properties>
<avro.random.generator.version>0.4.1</avro.random.generator.version>
<awaitility.version>4.2.2</awaitility.version>
Expand Down Expand Up @@ -286,6 +286,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand All @@ -298,6 +304,12 @@
<version>${avro.random.generator.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>tech.allegro.schema.json2avro</groupId>
<artifactId>converter</artifactId>
<version>0.2.15</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand Down Expand Up @@ -332,7 +344,7 @@
<repositories>
<repository>
<id>jitpack</id>
<url>https://jitpack.io</url>
<url>https://jitpack.io</url>
</repository>
<repository>
<id>confluent</id>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ public String getAuthAppId() {
return this.getString(KUSTO_AUTH_APPID_CONF);
}


public String getAuthAppKey() {
return this.getPassword(KUSTO_AUTH_APPKEY_CONF).value();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private static boolean isStreamingEnabled(@NotNull KustoSinkConfig config) throw
return kcsb;
}

public static Client createKustoEngineClient(KustoSinkConfig config) {
public static @NotNull Client createKustoEngineClient(KustoSinkConfig config) {
try {
return ClientFactory.createClient(createKustoEngineConnectionString(config, config.getKustoEngineUrl()));
} catch (Exception e) {
Expand All @@ -142,30 +142,23 @@ public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(Ku

try {
TopicToTableMapping[] mappings = config.getTopicToTableMapping();

for (TopicToTableMapping mapping : mappings) {
IngestionProperties props = new IngestionProperties(mapping.getDb(), mapping.getTable());

String format = mapping.getFormat();
if (StringUtils.isNotEmpty(format)) {
if (isDataFormatAnyTypeOfJson(format)) {
props.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
} else {
props.setDataFormat(format);
}
props.setDataFormat(format);
// if (isDataFormatAnyTypeOfJson(format)) {
// props.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
// } else {
// props.setDataFormat(format);
// }
}

String mappingRef = mapping.getMapping();
if (StringUtils.isNotEmpty(mappingRef) && format != null) {
if (isDataFormatAnyTypeOfJson(format)) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.JSON);
} else if (format.equalsIgnoreCase(IngestionProperties.DataFormat.AVRO.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.AVRO);
} else if (format.equalsIgnoreCase(IngestionProperties.DataFormat.APACHEAVRO.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.APACHEAVRO);
} else {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.CSV);
}
if (StringUtils.isNotEmpty(mappingRef) && StringUtils.isNotEmpty(format)) {
props.setIngestionMapping(mappingRef,
IngestionMapping.IngestionMappingKind.valueOf(format.toUpperCase(Locale.ROOT)));
}
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
topicIngestionProperties.ingestionProperties = props;
Expand All @@ -177,8 +170,7 @@ public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(Ku
throw new ConfigException("Error while parsing kusto ingestion properties.", ex);
}
}

private static boolean isDataFormatAnyTypeOfJson(String format) {
private static boolean isDataFormatAnyTypeOfJson(@NotNull String format) {
return format.equalsIgnoreCase(IngestionProperties.DataFormat.JSON.name())
|| format.equalsIgnoreCase(IngestionProperties.DataFormat.SINGLEJSON.name())
|| format.equalsIgnoreCase(IngestionProperties.DataFormat.MULTIJSON.name());
Expand All @@ -200,9 +192,6 @@ private static void validateTableAccess(Client engineClient, TopicToTableMapping
String format = mapping.getFormat();
String mappingName = mapping.getMapping();
boolean streamingEnabled = mapping.isStreaming();
if (isDataFormatAnyTypeOfJson(format)) {
format = IngestionProperties.DataFormat.JSON.name();
}
boolean hasAccess = false;
boolean shouldCheckStreaming = streamingEnabled;

Expand Down Expand Up @@ -307,7 +296,7 @@ public TopicIngestionProperties getIngestionProps(String topic) {
return topicsToIngestionProps.get(topic);
}

void validateTableMappings(KustoSinkConfig config) {
void validateTableMappings(@NotNull KustoSinkConfig config) {
List<String> databaseTableErrorList = new ArrayList<>();
List<String> accessErrorList = new ArrayList<>();
boolean enableTableValidation = config.getEnableTableValidation();
Expand Down Expand Up @@ -342,7 +331,7 @@ void validateTableMappings(KustoSinkConfig config) {
}
}

private boolean isIngestorRole(TopicToTableMapping testMapping, Client engineClient) {
private boolean isIngestorRole(@NotNull TopicToTableMapping testMapping, @NotNull Client engineClient) {
try {
engineClient.execute(testMapping.getDb(), String.format(FETCH_TABLE_COMMAND, testMapping.getTable()), validateOnlyClientRequestProperties);
} catch (DataServiceException | DataClientException err) {
Expand Down Expand Up @@ -379,7 +368,7 @@ public void open(Collection<TopicPartition> partitions) {
}

@Override
public void close(Collection<TopicPartition> partitions) {
public void close(@NotNull Collection<TopicPartition> partitions) {
log.warn("Closing writers in KustoSinkTask");
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// First stop so that no more ingestions trigger from timer flushes
Expand All @@ -404,7 +393,6 @@ public void close(Collection<TopicPartition> partitions) {
public void start(Map<String, String> props) {
config = new KustoSinkConfig(props);
String url = config.getKustoIngestUrl();

validateTableMappings(config);
if (config.isDlqEnabled()) {
isDlqEnabled = true;
Expand All @@ -417,18 +405,14 @@ public void start(Map<String, String> props) {
} catch (Exception e) {
throw new ConnectException("Failed to initialize producer for miscellaneous dead-letter queue", e);
}

} else {
dlqProducer = null;
isDlqEnabled = false;
dlqTopicName = null;
}

topicsToIngestionProps = getTopicsToIngestionProps(config);

// this should be read properly from settings
createKustoIngestClient(config);

log.info("Started KustoSinkTask with target cluster: ({}), source topics: ({})", url,
topicsToIngestionProps.keySet());
// Adding this check to make code testable
Expand Down Expand Up @@ -458,13 +442,12 @@ public void stop() {
}

@Override
public void put(Collection<SinkRecord> records) {
public void put(@NotNull Collection<SinkRecord> records) {
SinkRecord lastRecord = null;
for (SinkRecord sinkRecord : records) {
lastRecord = sinkRecord;
TopicPartition tp = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition());
TopicPartitionWriter writer = writers.get(tp);

if (writer == null) {
NotFoundException e = new NotFoundException(String.format("Received a record without " +
"a mapped writer for topic:partition(%s:%d), dropping record.", tp.topic(), tp.partition()));
Expand Down Expand Up @@ -497,15 +480,13 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
"verify your `topics` and `kusto.tables.topics.mapping` configurations");
}
Long lastCommittedOffset = writers.get(tp).lastCommittedOffset;

if (lastCommittedOffset != null) {
long offset = lastCommittedOffset + 1L;
log.debug("Forwarding to framework request to commit offset: {} for {} while the offset is {}", offset,
tp, offsets.get(tp));
offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
}
}

return offsetsToCommit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.microsoft.azure.kusto.data.exceptions.KustoDataExceptionBase;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
Expand All @@ -33,13 +36,16 @@
import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
import com.microsoft.azure.kusto.kafka.connect.sink.formatwriter.FormatWriterHelper;

import static com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat.*;

public class TopicPartitionWriter {

private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
private static final String COMPRESSION_EXTENSION = ".gz";
private static final String FILE_EXCEPTION_MESSAGE = "Failed to create file or write record into file for ingestion.";

private final FormatWriterHelper formatWriterHelper = FormatWriterHelper.getInstance();
private final TopicPartition tp;
private final IngestClient client;
private final TopicIngestionProperties ingestionProps;
Expand All @@ -52,10 +58,10 @@ public class TopicPartitionWriter {
private final String dlqTopicName;
private final Producer<byte[], byte[]> dlqProducer;
private final BehaviorOnError behaviorOnError;
private final ReentrantReadWriteLock reentrantReadWriteLock;
FileWriter fileWriter;
long currentOffset;
Long lastCommittedOffset;
private final ReentrantReadWriteLock reentrantReadWriteLock;

TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps,
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> dlqProducer) {
Expand All @@ -75,15 +81,14 @@ public class TopicPartitionWriter {
this.dlqProducer = dlqProducer;
}

static String getTempDirectoryName(String tempDirPath) {
static @NotNull String getTempDirectoryName(String tempDirPath) {
String tempDir = String.format("kusto-sink-connector-%s", UUID.randomUUID());
Path path = Paths.get(tempDirPath, tempDir).toAbsolutePath();
return path.toString();
}

public void handleRollFile(SourceFile fileDescriptor) {
public void handleRollFile(@NotNull SourceFile fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);

/*
* Since retries can be for a longer duration the Kafka Consumer may leave the group. This will result in a new Consumer reading records from the last
* committed offset leading to duplication of records in KustoDB. Also, if the error persists, it might also result in duplicate records being written
Expand All @@ -92,7 +97,7 @@ public void handleRollFile(SourceFile fileDescriptor) {
*/
for (int retryAttempts = 0; true; retryAttempts++) {
try {
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProps.ingestionProperties);
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, updateIngestionPropertiesWithTargetFormat());
if (ingestionProps.streaming && ingestionResult instanceof IngestionStatusResult) {
// If IngestionStatusResult returned then the ingestion status is from streaming ingest
IngestionStatus ingestionStatus = ingestionResult.getIngestionStatusCollection().get(0);
Expand Down Expand Up @@ -123,14 +128,22 @@ public void handleRollFile(SourceFile fileDescriptor) {
}
// TODO : improve handling of specific transient exceptions once the client supports them.
// retrying transient exceptions
log.error("IngestionServiceException when ingesting data into KustoDB, file: {}, database: {}, table: {}, operationId: {}",
fileDescriptor.path, ingestionProps.ingestionProperties.getDatabaseName(),
ingestionProps.ingestionProperties.getTableName(),
ingestionProps.ingestionProperties.getIngestionMapping().getIngestionMappingReference(),exception);
backOffForRemainingAttempts(retryAttempts, exception, fileDescriptor);
} catch (IngestionClientException | URISyntaxException exception) {
log.error("IngestionClientException when ingesting data into KustoDB, file: {}, database: {}, table: {}, operationId: {}",
fileDescriptor.path, ingestionProps.ingestionProperties.getDatabaseName(),
ingestionProps.ingestionProperties.getTableName(),
ingestionProps.ingestionProperties.getIngestionMapping().getIngestionMappingReference(),exception);
throw new ConnectException(exception);
}
}
}

private boolean hasStreamingSucceeded(IngestionStatus status) {
private boolean hasStreamingSucceeded(@NotNull IngestionStatus status) {
switch (status.status) {
case Succeeded:
case Queued:
Expand All @@ -157,32 +170,38 @@ private boolean hasStreamingSucceeded(IngestionStatus status) {
return false;
}

private void backOffForRemainingAttempts(int retryAttempts, Exception exception, SourceFile fileDescriptor) {
private void backOffForRemainingAttempts(int retryAttempts, Exception exception, @NotNull SourceFile fileDescriptor) {
if (retryAttempts < maxRetryAttempts) {
// RetryUtil can be deleted if exponential backOff is not required, currently using constant backOff.
// long sleepTimeMs = RetryUtil.computeExponentialBackOffWithJitter(retryAttempts, TimeUnit.SECONDS.toMillis(5));
long sleepTimeMs = retryBackOffTime;
log.error("Failed to ingest records into Kusto, backing off and retrying ingesting records after {} milliseconds.", sleepTimeMs);
if(exception!=null) {
log.error("Failed to ingest records into Kusto, backing off and retrying ingesting records " +
"after {} milliseconds.", sleepTimeMs, exception);
}
try {
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
} catch (InterruptedException interruptedErr) {
if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) {
log.warn("Writing {} failed records to miscellaneous dead-letter queue topic={}", fileDescriptor.records.size(), dlqTopicName);
log.warn("InterruptedException:: Writing {} failed records to miscellaneous dead-letter queue topic={}",
fileDescriptor.records.size(), dlqTopicName);
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
}
throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interrupted after retryAttempts=%s", retryAttempts + 1),
throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interrupted " +
"after retryAttempts=%s", retryAttempts + 1),
exception);
}
} else {
if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) {
log.warn("Writing {} failed records to miscellaneous dead-letter queue topic={}", fileDescriptor.records.size(), dlqTopicName);
log.warn(String.format("Retries exhausted, writing {%s} failed records to miscellaneous dead-letter queue topic={%s}",
fileDescriptor.records.size(), dlqTopicName));
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
}
throw new ConnectException("Retry attempts exhausted, failed to ingest records into KustoDB.", exception);
}
}

public void sendFailedRecordToDlq(SinkRecord sinkRecord) {
public void sendFailedRecordToDlq(@NotNull SinkRecord sinkRecord) {
byte[] recordKey = String.format("Failed to write sinkRecord to KustoDB with the following kafka coordinates, "
+ "topic=%s, partition=%s, offset=%s.",
sinkRecord.topic(),
Expand Down Expand Up @@ -274,4 +293,24 @@ void close() {
void stop() {
fileWriter.stop();
}

private @NotNull IngestionProperties updateIngestionPropertiesWithTargetFormat() {
IngestionProperties updatedIngestionProperties = new IngestionProperties(this.ingestionProps.ingestionProperties);
IngestionProperties.DataFormat sourceFormat = ingestionProps.ingestionProperties.getDataFormat();
if (formatWriterHelper.isSchemaFormat(sourceFormat)) {
log.debug("Incoming dataformat {}, setting target format to MULTIJSON", sourceFormat);
updatedIngestionProperties.setDataFormat(MULTIJSON);
} else {
updatedIngestionProperties.setDataFormat(ingestionProps.ingestionProperties.getDataFormat());
}
// Just to make it clear , split the conditional
if (formatWriterHelper.isSchemaFormat(sourceFormat)) {
IngestionMapping mappingReference = ingestionProps.ingestionProperties.getIngestionMapping();
if (mappingReference != null && StringUtils.isNotEmpty(mappingReference.getIngestionMappingReference())) {
String ingestionMappingReferenceName = mappingReference.getIngestionMappingReference();
updatedIngestionProperties.setIngestionMapping(ingestionMappingReferenceName, IngestionMapping.IngestionMappingKind.JSON);
}
}
return updatedIngestionProperties;
}
}
Loading
Loading