Skip to content

Commit

Permalink
[Feature][transforms-v2] Support append only stream from cdc source (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 authored Oct 14, 2024
1 parent 048c47d commit e4e1224
Show file tree
Hide file tree
Showing 13 changed files with 912 additions and 2 deletions.
113 changes: 113 additions & 0 deletions docs/en/transform-v2/rowkind-extractor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# RowKindExtractor

> RowKindExtractor transform plugin
## Description

transform cdc row to append only row that contains the cdc RowKind. <br />
Example: <br />
CDC row: -D 1, test1, test2 <br />
transformed Row: +I 1,test1,test2,DELETE

## Options

| name | type | required | default value |
|-------------------|--------|----------|---------------|
| custom_field_name | string | yes | row_kind |
| transform_type | enum | yes | SHORT |

### custom_field_name [string]

Custom field name of the RowKind field

### transform_type [enum]

the RowKind field value formatting , the option can be `SHORT` or `FULL`

`SHORT` : +I, -U , +U, -D
`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE

## Examples


```yaml

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [4, "D", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "F", 100]
}
{
kind = UPDATE_BEFORE
fields = [2, "B", 100]
},
{
kind = UPDATE_AFTER
fields = [2, "G", 100]
},
{
kind = DELETE
fields = [3, "C", 100]
},
{
kind = DELETE
fields = [4, "D", 100]
}
]
}
}

transform {
RowKindExtractor {
custom_field_name = "custom_name"
transform_type = FULL
result_table_name = "trans_result"
}
}

sink {
Console {
source_table_name = "custom_name"
}
}

```

112 changes: 112 additions & 0 deletions docs/zh/transform-v2/rowkind-extractor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# RowKindExtractor

> RowKindExtractor transform plugin
## Description

将CDC Row 转换为 Append only Row, 转换后的行扩展了RowKind字段 <br />
Example: <br />
CDC row: -D 1, test1, test2 <br />
transformed Row: +I 1,test1,test2,DELETE

## Options

| name | type | required | default value |
|-------------------|--------|----------|---------------|
| custom_field_name | string | yes | row_kind |
| transform_type | enum | yes | SHORT |

### custom_field_name [string]

RowKind列的自定义名

### transform_type [enum]

格式化RowKind值 , 配置为 `SHORT``FULL`

`SHORT` : +I, -U , +U, -D
`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE

## Examples

```yaml

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [4, "D", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "F", 100]
}
{
kind = UPDATE_BEFORE
fields = [2, "B", 100]
},
{
kind = UPDATE_AFTER
fields = [2, "G", 100]
},
{
kind = DELETE
fields = [3, "C", 100]
},
{
kind = DELETE
fields = [4, "D", 100]
}
]
}
}

transform {
RowKindExtractor {
custom_field_name = "custom_name"
transform_type = FULL
result_table_name = "trans_result"
}
}

sink {
Console {
source_table_name = "custom_name"
}
}

```

5 changes: 3 additions & 2 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ seatunnel.sink.ActiveMQ = connector-activemq
seatunnel.source.Qdrant = connector-qdrant
seatunnel.sink.Qdrant = connector-qdrant
seatunnel.source.Sls = connector-sls
seatunnel.sink.Sls = connector-sls
seatunnel.source.Typesense = connector-typesense
seatunnel.sink.Typesense = connector-typesense
seatunnel.source.Opengauss-CDC = connector-cdc-opengauss

seatunnel.transform.Sql = seatunnel-transforms-v2
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
seatunnel.transform.Filter = seatunnel-transforms-v2
Expand All @@ -149,5 +151,4 @@ seatunnel.transform.Copy = seatunnel-transforms-v2
seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2
seatunnel.sink.Sls = connector-sls

seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.TestContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

import java.io.IOException;

public class TestRowKindExtractorTransformIT extends TestSuiteBase {

@TestTemplate
public void testRowKindExtractorTransform(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult1 =
container.executeJob("/rowkind_extractor_transform_case1.conf");
Assertions.assertEquals(0, execResult1.getExitCode());
Container.ExecResult execResult2 =
container.executeJob("/rowkind_extractor_transform_case2.conf");
Assertions.assertEquals(0, execResult2.getExitCode());
}
}
Loading

0 comments on commit e4e1224

Please sign in to comment.