Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ public class DatastreamMetadataConstants {
public static final String DESTINATION_TOPIC_PREFIX = SYSTEM_DESTINATION_PREFIX + "destinationTopicPrefix";

/**
* If set to true, datastream would make use of the message's source timestamp while producing record to the
* destination.
*/
public static final String PRESERVE_EVENT_SOURCE_TIMESTAMP = SYSTEM_DESTINATION_PREFIX + "preserveEventSourceTimestamp";


/**
* Timestamp of datastream creation in epoch-millis
*/
public static final String CREATION_MS = "system.creation.ms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ public class KafkaMirrorMakerConnectorTask extends AbstractKafkaBasedConnectorTa
private long _minInFlightMessagesThreshold;
private int _flowControlTriggerCount = 0;

// variable to preserve the source event timestamp
private boolean _preserveEventSourceTimestamp = false;

/**
* Constructor for KafkaMirrorMakerConnectorTask
* @param config Task configuration properties
Expand All @@ -153,11 +156,13 @@ public KafkaMirrorMakerConnectorTask(KafkaBasedConnectorConfig config, Datastrea
_destinationTopicPrefix = task.getDatastreams().get(0).getMetadata()
.getOrDefault(DatastreamMetadataConstants.DESTINATION_TOPIC_PREFIX, DEFAULT_DESTINATION_TOPIC_PREFIX);
_dynamicMetricsManager = DynamicMetricsManager.getInstance();
_preserveEventSourceTimestamp = Boolean.parseBoolean(task.getDatastreams().get(0).getMetadata()
.getOrDefault(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, Boolean.FALSE.toString()));

if (_enablePartitionAssignment) {
LOG.info("Enable Brooklin partition assignment");
}

LOG.info("Preserve event source timestamp is set to {}", _preserveEventSourceTimestamp);
LOG.info("Destination topic prefix has been set to {}", _destinationTopicPrefix);

if (_isFlushlessModeEnabled) {
Expand Down Expand Up @@ -239,7 +244,7 @@ private Set<TopicPartition> getAssignedTopicPartitionFromTask() {

@Override
protected DatastreamProducerRecord translate(ConsumerRecord<?, ?> fromKafka, Instant readTime) {
long eventsSourceTimestamp =
long eventsSourceTimestamp = _preserveEventSourceTimestamp ? fromKafka.timestamp() :
fromKafka.timestampType() == TimestampType.LOG_APPEND_TIME ? fromKafka.timestamp() : readTime.toEpochMilli();
HashMap<String, String> metadata = new HashMap<>();
metadata.put(KAFKA_ORIGIN_CLUSTER, _mirrorMakerSource.getBrokerListString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package com.linkedin.datastream.connectors.kafka.mirrormaker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
Expand All @@ -16,6 +18,7 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.testng.Assert;

Expand Down Expand Up @@ -54,15 +57,16 @@ static Properties getKafkaProducerProperties(DatastreamEmbeddedZookeeperKafkaClu

static void produceEvents(String topic, int destinationPartition, int numEvents,
DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
produceEventsToPartition(topic, destinationPartition, numEvents, kafkaCluster);
produceEventsToPartitionAsync(topic, destinationPartition, numEvents, kafkaCluster);
}

static void produceEvents(String topic, int numEvents, DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
produceEventsToPartition(topic, null, numEvents, kafkaCluster);
produceEventsToPartitionAsync(topic, null, numEvents, kafkaCluster);
}

static void produceEventsToPartition(String topic, Integer destinationPartition, int numEvents,
DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
static void produceEventsToPartitionAsync(String topic, Integer destinationPartition, int numEvents,
DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {

try (Producer<byte[], byte[]> producer = new KafkaProducer<>(getKafkaProducerProperties(kafkaCluster))) {
for (int i = 0; i < numEvents; i++) {
producer.send(new ProducerRecord<>(topic, destinationPartition, ("key-" + i).getBytes(Charsets.UTF_8),
Expand All @@ -76,6 +80,38 @@ static void produceEventsToPartition(String topic, Integer destinationPartition,
}
}

static List<RecordMetadata> produceEventsToPartitionSync(String topic, Integer destinationPartition, int numEvents,
List<Long> eventSourceTimeStamps, DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
if (eventSourceTimeStamps != null && numEvents != eventSourceTimeStamps.size()) {
Assert.fail("Number of source timestamps don't match number of events. Required: " +
numEvents + ", supplied: " + eventSourceTimeStamps.size());
} else if (eventSourceTimeStamps == null) {
eventSourceTimeStamps = new ArrayList<>();
for (int i = 0; i < numEvents; i++) {
eventSourceTimeStamps.add(null);
}
}
List<RecordMetadata> recordMetadataList = new ArrayList<>();

Producer<byte[], byte[]> producer = new KafkaProducer<>(getKafkaProducerProperties(kafkaCluster));
try {
for (int i = 0; i < numEvents; i++) {
RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, destinationPartition,
eventSourceTimeStamps.get(i), ("key-" + i).getBytes(Charsets.UTF_8),
("value-" + i).getBytes(Charsets.UTF_8))).get();
recordMetadataList.add(metadata);
}
} catch (InterruptedException interruptException) {
throw new RuntimeException("Failed to send message.", interruptException);
} catch (Exception exception) {
throw new RuntimeException("Failed to send message.", exception);
} finally {
producer.flush();
producer.close();
}
return recordMetadataList;
}

static Datastream createDatastream(String name, String broker, String sourceRegex, StringMap metadata) {
DatastreamSource source = new DatastreamSource();
source.setConnectionString("kafka://" + broker + "/" + sourceRegex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.linkedin.datastream.connectors.kafka.mirrormaker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -22,8 +23,10 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -91,6 +94,8 @@ public class TestKafkaMirrorMakerConnectorTask extends BaseKafkaZkTest {
private static final long CONNECTOR_AWAIT_STOP_TIMEOUT_MS = 30000;
private static final long DEBOUNCE_TIMER_MS = 1000;
private static final Logger LOG = LoggerFactory.getLogger(TestKafkaMirrorMakerConnectorTask.class);
private static final String MESSAGE_TIMESTAMP_TYPE_CREATE_TIME = "CreateTime";
private static final String MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME = "LogAppendTime";

@Test
public void testConsumeFromMultipleTopics() throws Exception {
Expand Down Expand Up @@ -148,6 +153,35 @@ public List<BrooklinMetricInfo> getMetricInfos() {
Assert.assertTrue(connectorTask.awaitStop(CONNECTOR_AWAIT_STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS),
"did not shut down on time");
}
@Test
public void testPreserveSourceEventTimestampForSourceLogTimestampTypeAsCreateTime() throws Exception {
testEventSendWithTimestamp("topicWithCreateTime", 3, MESSAGE_TIMESTAMP_TYPE_CREATE_TIME, Boolean.TRUE);
}

@Test
public void testPreserveSourceEventTimestampForSourceLogTimestampTypeAsLogAppendTime() throws Exception {
testEventSendWithTimestamp("topicWithLogAppendTime", 3, MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME, Boolean.TRUE);
}

@Test
public void testPreserveSourceEventTimestampNotSetForSourceLogTimestampTypeAsLogAppendTime() throws Exception {
testEventSendWithTimestamp("preserveNotSet_1", 3, MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME, null);
}

@Test
public void testPreserveSourceEventTimestampSetToFalseForSourceLogTimestampTypeAsLogAppendTime() throws Exception {
testEventSendWithTimestamp("preserveNotSet_2", 3, MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME, Boolean.FALSE);
}

@Test
public void testPreserveSourceEventTimestampNotSetForSourceLogTimestampTypeAsCreateTime() throws Exception {
testEventSendWithTimestamp("preserveTsTest_3", 3, MESSAGE_TIMESTAMP_TYPE_CREATE_TIME, null);
}

@Test
public void testPreserveSourceEventTimestampSetToFalseForSourceLogTimestampTypeAsCreateTime() throws Exception {
testEventSendWithTimestamp("preserveTsTest_4", 3, MESSAGE_TIMESTAMP_TYPE_CREATE_TIME, Boolean.FALSE);
}

@Test
public void testConsumeFromMultipleTopicsWithDestinationTopicPrefixMetadata() throws Exception {
Expand Down Expand Up @@ -223,7 +257,7 @@ public void testIdentityPartitioningEnabled() throws Exception {
// produce an event half of the partitions
Set<Integer> expectedPartitionsWithData = new HashSet<>();
for (int i = 0; i < partitionCount; i += 2) {
KafkaMirrorMakerConnectorTestUtils.produceEventsToPartition(yummyTopic, i, 1, _kafkaCluster);
KafkaMirrorMakerConnectorTestUtils.produceEvents(yummyTopic, i, 1, _kafkaCluster);
expectedPartitionsWithData.add(i);
}

Expand Down Expand Up @@ -1301,4 +1335,65 @@ private void validatePausedPartitionsMetrics(String task, String stream, long nu
== numConfigPausedPartitions, POLL_PERIOD_MS, POLL_TIMEOUT_MS),
"numConfigPausedPartitions metric failed to update");
}
}
private void testEventSendWithTimestamp(String topicName, int numberOfEvents, String messageTimestampType,
Boolean preserveSourceEventTimestamp) throws Exception {
Assert.assertTrue(messageTimestampType.equals("CreateTime") ||
messageTimestampType.equals("LogAppendTime"));
Properties topicProperties = new Properties();
topicProperties.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, messageTimestampType);
createTopic(_zkUtils, topicName, 1, topicProperties);

Datastream datastream = KafkaMirrorMakerConnectorTestUtils.createDatastream(topicName + "_Stream", _broker, "^" + topicName + "$");
if (preserveSourceEventTimestamp != null) {
datastream.getMetadata().put(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, preserveSourceEventTimestamp.toString());
}

DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream));
MockDatastreamEventProducer datastreamProducer = new MockDatastreamEventProducer();
task.setEventProducer(datastreamProducer);

Long sourceTimestampBase = 1582766709000L;
List<Long> eventSourceTimestamps = new ArrayList<>();
for (int index = 0; index < numberOfEvents; ++index) {
eventSourceTimestamps.add(sourceTimestampBase + index);
}

KafkaMirrorMakerConnectorTask connectorTask =
KafkaMirrorMakerConnectorTestUtils.createKafkaMirrorMakerConnectorTask(task);

List<RecordMetadata> recordMetadataList = KafkaMirrorMakerConnectorTestUtils.produceEventsToPartitionSync(topicName,
null, numberOfEvents, eventSourceTimestamps, _kafkaCluster);

KafkaMirrorMakerConnectorTestUtils.runKafkaMirrorMakerConnectorTask(connectorTask);

if (!PollUtils.poll(() -> datastreamProducer.getEvents().size() == numberOfEvents, POLL_PERIOD_MS, POLL_TIMEOUT_MS) &&
recordMetadataList.size() == numberOfEvents) {
Assert.fail("did not transfer the msgs within timeout. transferred " + datastreamProducer.getEvents().size());
}

// Get the broker appended timestamps if message timestamp type is LogAppendTime
if (messageTimestampType == MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME) {
for (int index = 0; index < numberOfEvents; ++index) {
eventSourceTimestamps.set(index, recordMetadataList.get(index).timestamp());
}
}
List<DatastreamProducerRecord> records = datastreamProducer.getEvents();

List<Long> readTimestamps = new ArrayList<>();

for (DatastreamProducerRecord record : records) {
readTimestamps.add(record.getEventsSourceTimestamp());
}

if (preserveSourceEventTimestamp != null && preserveSourceEventTimestamp) {
Assert.assertEquals(readTimestamps, eventSourceTimestamps, "source and destination timestamps don't match!");
} else if (messageTimestampType == MESSAGE_TIMESTAMP_TYPE_CREATE_TIME) {
Assert.assertNotEquals(readTimestamps, eventSourceTimestamps);
} else if (messageTimestampType == MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME) {
Assert.assertEquals(readTimestamps, eventSourceTimestamps);
}
connectorTask.stop();
Assert.assertTrue(connectorTask.awaitStop(CONNECTOR_AWAIT_STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS),
"did not shut down on time");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ public interface ReaderCallback {
*/
boolean onMessage(byte[] key, byte[] value) throws IOException;
}
/**
* Interface for the callback invoked whenever messages are read
*/
public interface RecordReaderCallback {
/**
* Callback invoked whenever a message is read to so it can be consumed
*/
boolean onMessage(ConsumerRecord<?, ?> record) throws IOException;
}

private KafkaTestUtils() {
}
Expand Down Expand Up @@ -102,6 +111,43 @@ public static void waitForTopicCreation(ZkUtils zkUtils, String topic, String br
throw new IllegalStateException("Topic was not ready within the timeout");
}

/**
* Consume messages from a given partition of a Kafka topic, using given RecordReaderCallback
*/
public static void readTopic(String topic, Integer partition, String brokerList, RecordReaderCallback callback)
throws Exception {
Validate.notNull(topic);
Validate.notNull(partition);
Validate.notNull(brokerList);
Validate.notNull(callback);

KafkaConsumer<byte[], byte[]> consumer = createConsumer(brokerList);
if (partition >= 0) {
List<TopicPartition> topicPartitions = Collections.singletonList(new TopicPartition(topic, partition));
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
} else {
consumer.subscribe(Collections.singletonList(topic));
}

boolean keepGoing = true;
long now = System.currentTimeMillis();
do {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
for (ConsumerRecord<byte[], byte[]> record : records.records(topic)) {
if (!callback.onMessage(record)) {
keepGoing = false;
break;
}
}

// Guard against buggy test which can hang forever
if (System.currentTimeMillis() - now >= DEFAULT_TIMEOUT_MS) {
throw new TimeoutException("Timed out before reading all messages");
}
} while (keepGoing);
}

/**
* Consume messages from a given partition of a Kafka topic, using given ReaderCallback
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.codahale.metrics.Meter;

import com.linkedin.datastream.common.BrooklinEnvelope;
import com.linkedin.datastream.common.DatastreamMetadataConstants;
import com.linkedin.datastream.common.BrooklinEnvelopeMetadataConstants;
import com.linkedin.datastream.common.ErrorLogger;
import com.linkedin.datastream.metrics.BrooklinMeterInfo;
Expand All @@ -36,7 +37,6 @@
import com.linkedin.datastream.server.api.transport.SendCallback;
import com.linkedin.datastream.server.api.transport.TransportProvider;


/**
* This is a Kafka Transport provider that writes events to Kafka.
* It handles record translation and data movement to the Kafka producer.
Expand All @@ -58,6 +58,7 @@ public class KafkaTransportProvider implements TransportProvider {
private final Meter _eventWriteRate;
private final Meter _eventByteWriteRate;
private final Meter _eventTransportErrorRate;
private final boolean _preserveEventSourceTimestamp;

/**
* Constructor for KafkaTransportProvider.
Expand All @@ -82,6 +83,9 @@ public KafkaTransportProvider(DatastreamTask datastreamTask, List<KafkaProducerW
ErrorLogger.logAndThrowDatastreamRuntimeException(LOG, errorMessage, null);
}

_preserveEventSourceTimestamp = Boolean.parseBoolean(datastreamTask.getDatastreams().get(0).getMetadata()
.getOrDefault(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, Boolean.FALSE.toString()));

// initialize metrics
_dynamicMetricsManager = DynamicMetricsManager.getInstance();
_metricsNamesPrefix = metricsNamesPrefix == null ? CLASS_NAME : metricsNamesPrefix + CLASS_NAME;
Expand Down Expand Up @@ -116,15 +120,17 @@ private ProducerRecord<byte[], byte[]> convertToProducerRecord(String topicName,
payloadValue = (byte[]) event;
}

Long recordTimeStamp = _preserveEventSourceTimestamp ? record.getEventsSourceTimestamp() : null;

if (partition.isPresent() && partition.get() >= 0) {
// If the partition is specified. We send the record to the specific partition
return new ProducerRecord<>(topicName, partition.get(), keyValue, payloadValue, headers);
return new ProducerRecord<>(topicName, partition.get(), recordTimeStamp, keyValue, payloadValue, headers);
} else {
// If the partition is not specified. We use the partitionKey as the key. Kafka will use the hash of that
// to determine the partition. If partitionKey does not exist, use the key value.
keyValue = record.getPartitionKey().isPresent()
? record.getPartitionKey().get().getBytes(StandardCharsets.UTF_8) : keyValue;
return new ProducerRecord<>(topicName, null, keyValue, payloadValue, headers);
return new ProducerRecord<>(topicName, null, recordTimeStamp, keyValue, payloadValue, headers);
}
}

Expand Down
Loading