Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs][DOC-656] CDC: Add content for parallel streaming support using logical replication #26254

Merged
merged 11 commits into from
Apr 1, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,73 @@ The type of LSN to use for the specified replication slot:
* SEQUENCE - A monotonic increasing number that determines the record in global order within the context of a slot.
* HYBRID_TIME - A hybrid time value that can be used to compare transactions across slots.

##### streaming.mode

Specified whether the connector should use a single task mode to stream changes or multi task mode.

* `default` uses the single task mode and streams all the changes using it.
* `parallel` uses a multi task mode and streams changes using the number of specified replication slots.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You specify the slots to use using the slot.names, publication.names, or slot.ranges property.


{{< note title="Important" >}}

When deploying the connector with `parallel` streaming mode, the user will need to ensure that the `table.include.list` only contains one table for which the streaming is supposed to happen in parallel.

{{< /note >}}

{{< note title="Usage with snapshot" >}}

If `snapshot.mode` is set to `initial` or `initial_only`, the user also needs to ensure that the configuration also contains a valid value for the configuration property `primary.key.hash.columns`.

{{< /note >}}

##### slot.names

A list of comma separated values of different slot names to be used by each task when using the `streaming.mode=parallel`.

No default.

{{< warning title="Warning" >}}

The configuration `slot.names` is only supposed to be used when `streaming.mode` is set to `parallel`, it won't have any effect otherwise.

{{< /warning >}}

##### publication.names

A list of comma separated values of different publication names to be used by each task when using the `streaming.mode=parallel`.

No default.

{{< warning title="Warning" >}}

The configuration `publication.names` is only supposed to be used when `streaming.mode` is set to `parallel`, it won't have any effect otherwise.

{{< /warning >}}

##### slot.ranges

A list of semi-colon separated values of different hash code ranges to be used by each task when using the `streaming.mode=parallel`.

No default.

{{< warning title="Warning" >}}

The configuration `slot.ranges` is only supposed to be used when `streaming.mode` is set to `parallel`, it won't have any effect otherwise.

{{< /warning >}}

For example, suppose we have a table with 3 tablets where the tablets have hash ranges as `[0,21845)`, `[21845,43690)` and `[43690,65536)` then the value for this configuration would be `slot.ranges=0,21845;21845,43690;43690,65536`.

##### primary.key.hash.columns

The columns of the table which consitute the hash part of the primary key.

{{< note title="Use only with parallel streaming mode" >}}

This configuration is only supposed to be used when `streaming.mode` is set to `parallel`.

{{< /note >}}

## Pass-through configuration properties

The connector also supports pass-through configuration properties that are used when creating the Kafka producer and consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1531,6 +1531,138 @@ The following table lists the streaming metrics that are available.
| `MaxQueueSizeInBytes` | long | The maximum buffer of the queue in bytes. This metric is available if `max.queue.size.in.bytes` is set to a positive long value. |
| `CurrentQueueSizeInBytes` | long | The current volume, in bytes, of records in the queue. |

## Advanced

### Using parallel streaming

YugabyteDB also supports parallel streaming of a single table using logical replication - this essentially means that you can start the replication for the table using parallel tasks where each of them will be polling on specific tablets.

{{< note title="Important" >}}

To enable parallel streaming, you will need to enable the Technical Preview flags [ysql_enable_pg_export_snapshot](#) and [ysql_yb_enable_consistent_replication_from_hash_range](#).

{{< /note >}}

Follow these steps to configure parallel streaming using the YugabyteDB Connector:

**Step 1: Decide on the number of tasks**

This will be crucial as we will need to create the same number of replication slots and publications. Note that the number of tasks cannot be greater than the number of tablets you have in the table to be streamed.

For example, if we have a table `test` with 3 tablets then we will create 3 tablets.

**Step 2: Create publication and replication slots**

If you are creating a slot and publication yourself, ensure that a publication is created before you create the replication slot.

If you do not want to create the publication and slots, decide on names so that the connector can create the publication and slots.

```sql
CREATE PUBLICATION pb FOR TABLE test;
CREATE PUBLICATION pb2 FOR TABLE test;
CREATE PUBLICATION pb3 FOR TABLE test;

CREATE_REPLICATION_SLOT rs LOGICAL yboutput;
CREATE_REPLICATION_SLOT rs2 LOGICAL yboutput;
CREATE_REPLICATION_SLOT rs3 LOGICAL yboutput;
```

**Step 3: Get hash ranges**

Execute the following query in YSQL for a `table_name` and number of tasks to get the ranges. Please modify the below query for `num_ranges` and `table_name`.

```sql
WITH params AS (
SELECT
num_ranges::int AS num_ranges,
'table_name'::text AS table_name
),
yb_local_tablets_cte AS (
SELECT *,
COALESCE(('x' || encode(partition_key_start, 'hex'))::BIT(16)::INT, 0) AS partition_key_start_int,
COALESCE(('x' || encode(partition_key_end, 'hex'))::BIT(16)::INT, 65536) AS partition_key_end_int
FROM yb_local_tablets
WHERE table_name = (SELECT table_name FROM params)
),

grouped AS (
SELECT
yt.*,
NTILE((SELECT num_ranges FROM params)) OVER (ORDER BY partition_key_start_int) AS bucket_num
FROM yb_local_tablets_cte yt
),

buckets AS (
SELECT
bucket_num,
MIN(partition_key_start_int) AS bucket_start,
MAX(partition_key_end_int) AS bucket_end
FROM grouped
GROUP BY bucket_num
),
distinct_ranges AS (
SELECT DISTINCT
b.bucket_start,
b.bucket_start || ',' || b.bucket_end AS partition_range
FROM grouped g
JOIN buckets b ON g.bucket_num = b.bucket_num
)
SELECT STRING_AGG(partition_range, ';' ORDER BY bucket_start) AS concatenated_ranges
FROM distinct_ranges;
```

The output will be in format that can be added as ranges in the connector config:

```output
concatenated_ranges
---------------------------------
0,21845;21845,43690;43690,65536
```

Copy the output somewhere as it will be needed later on.

**Step 4: Build connector configuration**

Using the above set of obtained values, we will add the following additional configuration properties to the connector and deploy it:

```json
{
...
"streaming.mode":"parallel",
"slot.names":"rs,rs2,rs3",
"publication.names":"pb,pb2,pb3",
"slot.ranges":"0,21845;21845,43690;43690,65536"
...
}
```

If you have to take the snapshot, you’ll need to add 2 other configuration properties:

```json
{
...
"snapshot.mode":"initial",
"primary.key.hash.columns":"id"
...
}
```

To learn more about the above configuration properties, visit [YugabyteDB connector properties](../using-logical-replication/yugabytedb-connector-properties)

{{< warning title="Warning" >}}

Note that the order of slot names, publication names and slot ranges is important since the assignment of ranges to slots is sequential and we want that the same range gets assigned to the same slot across restarts.

The configuration for the connector shouldn’t change on restart.

{{< /warning >}}

{{< note title="Important" >}}

Adding the configuration value for `primary.key.hash.columns` is important as we will need the columns which form the hash part of the primary key as the connector relies on the column names to figure out the appropriate range each task should be polling.

{{< /note >}}

## Behavior when things go wrong

Debezium is a distributed system that captures all changes in multiple upstream databases; it never misses or loses an event. When the system is operating normally or being managed carefully then Debezium provides _exactly once_ delivery of every change event record. If a fault does happen then the system does not lose any events. However, while it is recovering from the fault, it's possible that the connector might emit some duplicate change events. In these abnormal situations, Debezium, like Kafka, provides _at least once_ delivery of change events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,73 @@ The type of LSN to use for the specified replication slot:
* SEQUENCE - A monotonic increasing number that determines the record in global order within the context of a slot.
* HYBRID_TIME - A hybrid time value that can be used to compare transactions across slots.

##### streaming.mode

Specified whether the connector should use a single task mode to stream changes or multi task mode.

* `default` uses the single task mode and streams all the changes using it.
* `parallel` uses a multi task mode and streams changes using the number of specified replication slots.

{{< note title="Important" >}}

When deploying the connector with `parallel` streaming mode, the user will need to ensure that the `table.include.list` only contains one table for which the streaming is supposed to happen in parallel.

{{< /note >}}

{{< note title="Usage with snapshot" >}}

If `snapshot.mode` is set to `initial` or `initial_only`, the user also needs to ensure that the configuration also contains a valid value for the configuration property `primary.key.hash.columns`.

{{< /note >}}

##### slot.names

A list of comma separated values of different slot names to be used by each task when using the `streaming.mode=parallel`.

No default.

{{< warning title="Warning" >}}

The configuration `slot.names` is only supposed to be used when `streaming.mode` is set to `parallel`, it won't have any effect otherwise.

{{< /warning >}}

##### publication.names

A list of comma separated values of different publication names to be used by each task when using the `streaming.mode=parallel`.

No default.

{{< warning title="Warning" >}}

The configuration `publication.names` is only supposed to be used when `streaming.mode` is set to `parallel`, it won't have any effect otherwise.

{{< /warning >}}

##### slot.ranges

A list of semi-colon separated values of different hash code ranges to be used by each task when using the `streaming.mode=parallel`.

No default.

{{< warning title="Warning" >}}

The configuration `slot.ranges` is only supposed to be used when `streaming.mode` is set to `parallel`, it won't have any effect otherwise.

{{< /warning >}}

For example, suppose we have a table with 3 tablets where the tablets have hash ranges as `[0,21845)`, `[21845,43690)` and `[43690,65536)` then the value for this configuration would be `slot.ranges=0,21845;21845,43690;43690,65536`.

##### primary.key.hash.columns

The columns of the table which consitute the hash part of the primary key.

{{< note title="Use only with parallel streaming mode" >}}

This configuration is only supposed to be used when `streaming.mode` is set to `parallel`.

{{< /note >}}

## Pass-through configuration properties

The connector also supports pass-through configuration properties that are used when creating the Kafka producer and consumer.
Expand Down
Loading