Skip to content

Commit

Permalink
[Feature][Core] Support cdc task ddl restore for zeta (#7463)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Nov 14, 2024
1 parent f428f9d commit 8e32228
Show file tree
Hide file tree
Showing 108 changed files with 2,631 additions and 423 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -46,12 +46,8 @@ public interface SinkWriter<T, CommitInfoT, StateT> {
*/
void write(T element) throws IOException;

/**
* apply schema change to third party data receiver.
*
* @param event
* @throws IOException
*/
/** @deprecated instead by {@link SupportSchemaEvolutionSinkWriter} TODO: remove this method */
@Deprecated
default void applySchemaChange(SchemaChangeEvent event) throws IOException {}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.api.table.schema.SchemaChangeType;

import java.util.List;

public interface SupportSchemaEvolutionSink {

/**
* The sink connector supports schema evolution types.
*
* @return the supported schema change types
*/
List<SchemaChangeType> supports();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;

import java.io.IOException;

public interface SupportSchemaEvolutionSinkWriter {

/**
* apply schema change to third party data receiver.
*
* @param event
* @throws IOException
*/
void applySchemaChange(SchemaChangeEvent event) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.schema.SchemaChangeType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import lombok.Getter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -44,10 +47,11 @@

public class MultiTableSink
implements SeaTunnelSink<
SeaTunnelRow,
MultiTableState,
MultiTableCommitInfo,
MultiTableAggregatedCommitInfo> {
SeaTunnelRow,
MultiTableState,
MultiTableCommitInfo,
MultiTableAggregatedCommitInfo>,
SupportSchemaEvolutionSink {

@Getter private final Map<String, SeaTunnelSink> sinks;
private final int replicaNum;
Expand Down Expand Up @@ -188,4 +192,13 @@ public void setJobContext(JobContext jobContext) {
public Optional<CatalogTable> getWriteCatalogTable() {
return SeaTunnelSink.super.getWriteCatalogTable();
}

@Override
public List<SchemaChangeType> supports() {
SeaTunnelSink firstSink = sinks.entrySet().iterator().next().getValue();
if (firstSink instanceof SupportSchemaEvolutionSink) {
return ((SupportSchemaEvolutionSink) firstSink).supports();
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.tracing.MDCTracer;

Expand All @@ -46,7 +47,8 @@

@Slf4j
public class MultiTableSinkWriter
implements SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> {
implements SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState>,
SupportSchemaEvolutionSinkWriter {

private final Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters;
private final Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext;
Expand Down Expand Up @@ -153,7 +155,14 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
sinkWriterEntry.getKey().getTableIdentifier(),
sinkWriterEntry.getKey().getIndex());
synchronized (runnable.get(i)) {
sinkWriterEntry.getValue().applySchemaChange(event);
if (sinkWriterEntry.getValue()
instanceof SupportSchemaEvolutionSinkWriter) {
((SupportSchemaEvolutionSinkWriter) sinkWriterEntry.getValue())
.applySchemaChange(event);
} else {
// TODO remove deprecated method
sinkWriterEntry.getValue().applySchemaChange(event);
}
}
log.info(
"Finish apply schema change for table {} sub-writer {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.api.source;

import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;

/**
* A {@link Collector} is used to collect data from {@link SourceReader}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.source;

import org.apache.seatunnel.api.table.schema.SchemaChangeType;

import java.util.List;

public interface SupportSchemaEvolution {

/**
* Whether the source connector supports schema evolution.
*
* @return the supported schema change types
*/
List<SchemaChangeType> supports();
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
}
}

public static MultipleRowType convertToMultipleRowType(List<CatalogTable> catalogTables) {
@Deprecated
private static MultipleRowType convertToMultipleRowType(List<CatalogTable> catalogTables) {
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
for (CatalogTable catalogTable : catalogTables) {
String tableId = catalogTable.getTableId().toTablePath().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,37 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import lombok.AllArgsConstructor;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/** Represent a physical table schema. */
@Data
@AllArgsConstructor
public final class TableSchema implements Serializable {
private static final long serialVersionUID = 1L;
private final List<Column> columns;

@Getter(AccessLevel.PRIVATE)
private final List<String> columnNames;

private final PrimaryKey primaryKey;

private final List<ConstraintKey> constraintKeys;

public TableSchema(
List<Column> columns, PrimaryKey primaryKey, List<ConstraintKey> constraintKeys) {
this.columns = columns;
this.columnNames = columns.stream().map(Column::getName).collect(Collectors.toList());
this.primaryKey = primaryKey;
this.constraintKeys = constraintKeys;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -58,7 +70,23 @@ public SeaTunnelRowType toPhysicalRowDataType() {
}

public String[] getFieldNames() {
return columns.stream().map(Column::getName).toArray(String[]::new);
return columnNames.toArray(new String[0]);
}

public int indexOf(String columnName) {
return columnNames.indexOf(columnName);
}

public Column getColumn(String columnName) {
return columns.get(indexOf(columnName));
}

public boolean contains(String columnName) {
return columnNames.contains(columnName);
}

public List<Column> getColumns() {
return Collections.unmodifiableList(columns);
}

public static final class Builder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.factory;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
@AllArgsConstructor
public class ChangeStreamTableSourceCheckpoint implements Serializable {
// The state of the enumerator, from checkpoint data
private byte[] enumeratorState;

// The splits of the enumerator, from checkpoint data
public List<List<byte[]>> splits;
}
Loading

0 comments on commit 8e32228

Please sign in to comment.