Skip to content

Commit

Permalink
KAFKAC-47: maintain replication state and recover it after disruption
Browse files Browse the repository at this point in the history
  • Loading branch information
avsej committed Sep 24, 2016
1 parent b957958 commit a3c68a2
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Start the Schema Registry, also in its own terminal.
Now, run the connector in a standalone Kafka Connect worker in another terminal (this assumes
Avro settings and that Kafka and the Schema Registry are running locally on the default ports):

$ sudo connect-standalone /etc/schema-registry/connect-avro-standalone.properties \
$ sudo connect-standalone /etc/kafka/connect-standalone.properties \
/etc/kafka-connect-couchbase/quickstart-couchbase.properties

To observe replicated events from the cluster, run CLI kafka consumer:
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
<properties>
<java-compat.version>1.6</java-compat.version>
<confluent.version>3.0.0</confluent.version>
<kafka.version>0.10.0.0-cp1</kafka.version>
<kafka.version>0.10.0.1</kafka.version>
<junit.version>4.12</junit.version>
<dcp-client.version>0.1.0</dcp-client.version>
<dcp-client.version>0.3.0</dcp-client.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.StreamFrom;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.state.StateFormat;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import org.apache.kafka.common.config.types.Password;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;
import rx.functions.Action1;

import java.util.List;
import java.util.concurrent.BlockingQueue;
Expand All @@ -36,13 +40,14 @@ public class CouchbaseMonitorThread extends Thread {

private final long connectionTimeout;
private final Client client;
private final Integer[] partitions;
private Subscription subscription;
private final Short[] partitions;
private final SessionState initialSessionState;

public CouchbaseMonitorThread(List<String> clusterAddress, String bucket, String password, long connectionTimeout,
final BlockingQueue<ByteBuf> queue, Integer[] partitions) {
final BlockingQueue<ByteBuf> queue, Short[] partitions, SessionState sessionState) {
this.connectionTimeout = connectionTimeout;
this.partitions = partitions;
this.initialSessionState = sessionState;
client = Client.configure()
.hostnames(clusterAddress)
.bucket(bucket)
Expand Down Expand Up @@ -78,12 +83,26 @@ public void acknowledgeBuffer(ByteBuf event) {
@Override
public void run() {
client.connect().await(); // FIXME: uncomment and raise timeout exception: .await(connectionTimeout, TimeUnit.MILLISECONDS);
client.initializeFromBeginningToNoEnd().await();
subscription = client.startStreams(partitions).subscribe();
client.initializeState(StreamFrom.BEGINNING, StreamTo.INFINITY).await();
client.failoverLogs(partitions).forEach(new Action1<ByteBuf>() {
@Override
public void call(ByteBuf event) {
short partition = DcpFailoverLogResponse.vbucket(event);
int numEntries = DcpFailoverLogResponse.numLogEntries(event);
PartitionState ps = initialSessionState.get(partition);
for (int i = 0; i < numEntries; i++) {
ps.addToFailoverLog(
DcpFailoverLogResponse.seqnoEntry(event, i),
DcpFailoverLogResponse.vbuuidEntry(event, i)
);
}
client.sessionState().set(partition, ps);
}
});
client.startStreaming(partitions).await();
}

public void shutdown() {
subscription.unsubscribe();
client.disconnect().await();
}
}
48 changes: 39 additions & 9 deletions src/main/java/com/couchbase/connect/kafka/CouchbaseSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.state.StateFormat;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.connect.kafka.dcp.EventType;
Expand Down Expand Up @@ -53,6 +56,7 @@ public class CouchbaseSourceTask extends SourceTask {
private BlockingQueue<ByteBuf> queue;
private String topic;
private String bucket;
private volatile boolean running;

@Override
public String version() {
Expand All @@ -75,13 +79,37 @@ public void start(Map<String, String> properties) {

long connectionTimeout = config.getLong(CouchbaseSourceConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG);
List<String> partitionsList = config.getList(CouchbaseSourceTaskConfig.PARTITIONS_CONFIG);
Integer[] partitions = new Integer[partitionsList.size()];

Short[] partitions = new Short[partitionsList.size()];
List<Map<String, String>> kafkaPartitions = new ArrayList<Map<String, String>>(1);
for (int i = 0; i < partitionsList.size(); i++) {
partitions[i] = Integer.parseInt(partitionsList.get(i));
partitions[i] = Short.parseShort(partitionsList.get(i));
Map<String, String> kafkaPartition = new HashMap<String, String>(2);
kafkaPartition.put("bucket", bucket);
kafkaPartition.put("partition", partitions[i].toString());
kafkaPartitions.add(kafkaPartition);
}
Map<Map<String, String>, Map<String, Object>> offsets = context.offsetStorageReader().offsets(kafkaPartitions);
SessionState sessionState = new SessionState();
sessionState.setToBeginningWithNoEnd(1024); // FIXME: literal
for (Map<String, String> kafkaPartition : kafkaPartitions) {
Map<String, Object> offset = offsets.get(kafkaPartition);
Short partition = Short.parseShort(kafkaPartition.get("partition"));
PartitionState partitionState = sessionState.get(partition);
long startSeqno = 0;
if (offset != null && offset.containsKey("bySeqno")) {
startSeqno = (Long) offset.get("bySeqno");
}
partitionState.setStartSeqno(startSeqno);
partitionState.setEndSeqno(0xffffffff);
partitionState.setSnapshotStartSeqno(startSeqno);
partitionState.setSnapshotEndSeqno(startSeqno);
sessionState.set(partition, partitionState);
}

running = true;
queue = new LinkedBlockingQueue<ByteBuf>();
couchbaseMonitorThread = new CouchbaseMonitorThread(clusterAddress, bucket, password, connectionTimeout, queue, partitions);
couchbaseMonitorThread = new CouchbaseMonitorThread(clusterAddress, bucket, password, connectionTimeout, queue, partitions, sessionState);
couchbaseMonitorThread.start();
}

Expand All @@ -99,7 +127,7 @@ private static List<String> getList(CouchbaseSourceConnectorConfig config, Strin
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> results = new ArrayList<SourceRecord>();

while (true) {
while (running) {
ByteBuf event = queue.poll(100, TimeUnit.MILLISECONDS);
if (event != null) {
SourceRecord record = convert(event);
Expand All @@ -113,6 +141,7 @@ public List<SourceRecord> poll() throws InterruptedException {
return results;
}
}
return results;
}

public SourceRecord convert(ByteBuf event) {
Expand All @@ -138,16 +167,16 @@ public SourceRecord convert(ByteBuf event) {
record.put("revSeqno", DcpDeletionMessage.revisionSeqno(event));
} else if (DcpExpirationMessage.is(event)) {
record.put("partition", DcpExpirationMessage.partition(event));
// FIXME: uncomment in next version
// record.put("key", bufToString(DcpExpirationMessage.key(event)));
// record.put("cas", DcpExpirationMessage.cas(event));
record.put("key", bufToString(DcpExpirationMessage.key(event)));
record.put("cas", DcpExpirationMessage.cas(event));
record.put("bySeqno", DcpExpirationMessage.bySeqno(event));
record.put("revSeqno", DcpExpirationMessage.revisionSeqno(event));
}
final Map<String, Object> offset = new HashMap<String, Object>(2);
offset.put("partition", record.getInt16("partition"));
offset.put("bySeqno", record.getInt64("bySeqno"));
final Map<String, String> partition = Collections.singletonMap("bucket", bucket);
final Map<String, String> partition = new HashMap<String, String>(2);
partition.put("bucket", bucket);
partition.put("partition", record.getInt16("partition").toString());

return new SourceRecord(partition, offset, topic, schema, record);
}
Expand All @@ -156,6 +185,7 @@ public SourceRecord convert(ByteBuf event) {

@Override
public void stop() {
running = false;
couchbaseMonitorThread.shutdown();
try {
couchbaseMonitorThread.join(MAX_TIMEOUT);
Expand Down

0 comments on commit a3c68a2

Please sign in to comment.