Skip to content

Commit

Permalink
[Hotfix][MongodbCDC]Refine data format to adapt to universal logic (#…
Browse files Browse the repository at this point in the history
…5162)

Co-authored-by: chenzy15 <[email protected]>
  • Loading branch information
MonsterChenzhuo and chenzy15 authored Jul 27, 2023
1 parent 3e9b9f5 commit 4b4b5f9
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 16 deletions.
41 changes: 35 additions & 6 deletions docs/en/connector-v2/source/MongoDB-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ The following table lists the field data type mapping from MongoDB BSON type to
| Int64 | BIGINT |
| Double | DOUBLE |
| Decimal128 | DECIMAL |
| Date | Date |
| Timestamp | Timestamp |
| Date | DATE |
| Timestamp | TIMESTAMP |
| Object | ROW |
| Array | ARRAY |
Expand Down Expand Up @@ -274,9 +274,38 @@ sink {
}
```
## Changelog
## Format of real-time streaming data
- [Feature]Add MongoDB CDC Source Connector([4923](https://github.com/apache/seatunnel/pull/4923))
### next version
```shell
{
_id : { <BSON Object> }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream
"operationType" : "<operation>", // The type of change operation that occurred, such as: insert, delete, update, etc.
"fullDocument" : { <document> }, // The full document data involved in the change operation. This field does not exist in delete operations
"ns" : {
"db" : "<database>", // The database where the change operation occurred
"coll" : "<collection>" // The collection where the change operation occurred
},
"to" : { // These fields are displayed only when the operation type is 'rename'
"db" : "<database>", // The new database name after the change
"coll" : "<collection>" // The new collection name after the change
},
"source":{
"ts_ms":"<timestamp>", // The timestamp when the change operation occurred
"table":"<collection>" // The collection where the change operation occurred
"db":"<database>", // The database where the change operation occurred
"snapshot":"false" // Identify the current stage of data synchronization
},
"documentKey" : { "_id" : <value> }, // The _id field value of the document involved in the change operation
"updateDescription" : { // Description of the update operation
"updatedFields" : { <document> }, // The fields and values that the update operation modified
"removedFields" : [ "<field>", ... ] // The fields and values that the update operation removed
}
"clusterTime" : <Timestamp>, // The timestamp of the Oplog log entry corresponding to the change operation
"txnNumber" : <NumberLong>, // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number
"lsid" : { // Represents information related to the Session in which the transaction is located
"id" : <UUID>,
"uid" : <BinData>
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
Expand All @@ -38,6 +39,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId;

/**
* Fetcher to fetch data from table split, the split is the incremental split {@link
* IncrementalSplit}.
Expand Down Expand Up @@ -147,14 +150,11 @@ public void close() {
private boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
Offset position = taskContext.getStreamOffset(sourceRecord);
// TODO: The sourceRecord from MongoDB CDC and MySQL CDC are inconsistent. For
// compatibility, the getTableId method is commented out for now.
// TableId tableId = getTableId(sourceRecord);
TableId tableId = getTableId(sourceRecord);
if (!taskContext.isExactlyOnce()) {
// log.trace(
// "The table {} is not support exactly-once, so ignore the
// watermark check",
// tableId);
log.trace(
"The table {} is not support exactly-once, so ignore the watermark check",
tableId);
return position.isAfter(splitStartWatermark);
}
// TODO only the table who captured snapshot splits need to filter( Used to support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public class MongodbSourceOptions extends SourceOptions {
+ " { \"name\": \"source\","
+ " \"type\": [{\"name\": \"source\", \"type\": \"record\", \"fields\": ["
+ " {\"name\": \"ts_ms\", \"type\": \"long\"},"
+ " {\"name\": \"table\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"db\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"snapshot\", \"type\": [\"string\", \"null\"] } ]"
+ " }, \"null\" ] },"
+ " { \"name\": \"ts_ms\", \"type\": [\"long\", \"null\"]},"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.json.JsonWriterSettings;

import com.mongodb.kafka.connect.source.json.formatter.DefaultJson;
Expand All @@ -39,13 +40,15 @@
import java.util.Map;

import static com.mongodb.kafka.connect.source.schema.AvroSchema.fromJson;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isWatermarkEvent;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OUTPUT_SCHEMA;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;

public class MongodbRecordUtils {

Expand Down Expand Up @@ -117,6 +120,12 @@ public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, String k
SchemaAndValue keySchemaAndValue =
schemaAndValue.toSchemaAndValue(
fromJson(AvroSchemaDefaults.DEFAULT_AVRO_KEY_SCHEMA), keyDocument);
BsonDocument source = valueDocument.get(SOURCE_FIELD).asDocument();
BsonValue table = valueDocument.get(NS_FIELD).asDocument().get(COLL_FIELD);
BsonValue db = valueDocument.get(NS_FIELD).asDocument().get(DB_FIELD);
source.append(TABLE_NAME_KEY, table);
source.append(DB_FIELD, db);
valueDocument.replace(SOURCE_FIELD, source);
SchemaAndValue valueSchemaAndValue =
schemaAndValue.toSchemaAndValue(fromJson(OUTPUT_SCHEMA), valueDocument);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;

import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;

import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
Expand All @@ -27,6 +29,8 @@
import java.nio.ByteOrder;
import java.util.Objects;

import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;

public class ResumeToken {

private static final int K_TIMESTAMP = 130;
Expand All @@ -41,14 +45,15 @@ public static BsonTimestamp decodeTimestamp(BsonDocument resumeToken) {
} else if (bsonValue.isString()) { // Hex-encoded string (v0 or v1)
keyStringBytes = hexToUint8Array(bsonValue.asString().getValue());
} else {
throw new IllegalArgumentException(
"Unknown resume token format: " + resumeToken.toJson());
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT, "Unknown resume token format: " + bsonValue);
}

ByteBuffer buffer = ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN);
int kType = buffer.get() & 0xff;
if (kType != K_TIMESTAMP) {
throw new IllegalArgumentException("Unknown keyType of timestamp: " + kType);
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT, "Unknown keyType of timestamp: " + kType);
}

int t = buffer.getInt();
Expand Down

0 comments on commit 4b4b5f9

Please sign in to comment.