Skip to content

Commit

Permalink
[FLINK-34466] Lineage interfaces for kafka connector (#130)
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski authored Nov 14, 2024
1 parent 0fed445 commit 727327d
Show file tree
Hide file tree
Showing 23 changed files with 1,178 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;

import java.util.Objects;
import java.util.Properties;

/** Default implementation of {@link KafkaDatasetFacet}. */
@PublicEvolving
public class DefaultKafkaDatasetFacet implements KafkaDatasetFacet {

public static final String KAFKA_FACET_NAME = "kafka";

private Properties properties;

private final KafkaDatasetIdentifier topicIdentifier;

public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier, Properties properties) {
this(topicIdentifier);

this.properties = new Properties();
KafkaPropertiesUtil.copyProperties(properties, this.properties);
}

public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier) {
this.topicIdentifier = topicIdentifier;
}

public void setProperties(Properties properties) {
this.properties = new Properties();
KafkaPropertiesUtil.copyProperties(properties, this.properties);
}

public Properties getProperties() {
return properties;
}

public KafkaDatasetIdentifier getTopicIdentifier() {
return topicIdentifier;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultKafkaDatasetFacet that = (DefaultKafkaDatasetFacet) o;
return Objects.equals(properties, that.properties)
&& Objects.equals(topicIdentifier, that.topicIdentifier);
}

@Override
public int hashCode() {
return Objects.hash(properties, topicIdentifier);
}

@Override
public String name() {
return KAFKA_FACET_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

/** Default implementation of {@link KafkaDatasetIdentifier}. */
@PublicEvolving
public class DefaultKafkaDatasetIdentifier implements KafkaDatasetIdentifier {

@Nullable private final List<String> topics;
@Nullable private final Pattern topicPattern;

private DefaultKafkaDatasetIdentifier(
@Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {
this.topics = fixedTopics;
this.topicPattern = topicPattern;
}

public static DefaultKafkaDatasetIdentifier ofPattern(Pattern pattern) {
return new DefaultKafkaDatasetIdentifier(null, pattern);
}

public static DefaultKafkaDatasetIdentifier ofTopics(List<String> fixedTopics) {
return new DefaultKafkaDatasetIdentifier(fixedTopics, null);
}

@Nullable
public List<String> getTopics() {
return topics;
}

@Nullable
public Pattern getTopicPattern() {
return topicPattern;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultKafkaDatasetIdentifier that = (DefaultKafkaDatasetIdentifier) o;
return Objects.equals(topics, that.topics)
&& Objects.equals(topicPattern, that.topicPattern);
}

@Override
public int hashCode() {
return Objects.hash(topics, topicPattern);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.Objects;

/** Default implementation of {@link KafkaDatasetFacet}. */
@PublicEvolving
public class DefaultTypeDatasetFacet implements TypeDatasetFacet {

public static final String TYPE_FACET_NAME = "type";

private final TypeInformation typeInformation;

public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
this.typeInformation = typeInformation;
}

public TypeInformation getTypeInformation() {
return typeInformation;
}

public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
return Objects.equals(typeInformation, that.typeInformation);
}

@Override
public int hashCode() {
return Objects.hash(typeInformation);
}

@Override
public String name() {
return TYPE_FACET_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import java.util.Properties;

/** Facet definition to contain all Kafka specific information on Kafka sources and sinks. */
@PublicEvolving
public interface KafkaDatasetFacet extends LineageDatasetFacet {
Properties getProperties();

KafkaDatasetIdentifier getTopicIdentifier();

void setProperties(Properties properties);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Optional;

/** Contains method to extract {@link KafkaDatasetFacet}. */
@PublicEvolving
public interface KafkaDatasetFacetProvider {

/**
* Returns a Kafka dataset facet or empty in case an implementing class is not able to identify
* a dataset.
*/
Optional<KafkaDatasetFacet> getKafkaDatasetFacet();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

/** Kafka dataset identifier which can contain either a list of topics or a topic pattern. */
@PublicEvolving
public interface KafkaDatasetIdentifier {
@Nullable
List<String> getTopics();

@Nullable
Pattern getTopicPattern();

/**
* Assigns lineage dataset's name which is topic pattern if it is present or comma separated
* list of topics.
*/
default String toLineageName() {
if (getTopicPattern() != null) {
return getTopicPattern().toString();
}
return String.join(",", Objects.requireNonNull(getTopics()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Optional;

/** Contains method which allows extracting topic identifier. */
@PublicEvolving
public interface KafkaDatasetIdentifierProvider {

/**
* Gets Kafka dataset identifier or empty in case a class implementing is not able to extract
* dataset identifier.
*/
Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

/** Utility class with useful methods for managing lineage objects. */
public class LineageUtil {

private static final String KAFKA_DATASET_PREFIX = "kafka://";
private static final String COMMA = ",";
private static final String SEMICOLON = ";";

public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) {
return datasetOf(namespace, kafkaDatasetFacet, Collections.emptyList());
}

public static LineageDataset datasetOf(
String namespace, KafkaDatasetFacet kafkaDatasetFacet, TypeDatasetFacet typeFacet) {
return datasetOf(namespace, kafkaDatasetFacet, Collections.singletonList(typeFacet));
}

private static LineageDataset datasetOf(
String namespace,
KafkaDatasetFacet kafkaDatasetFacet,
List<LineageDatasetFacet> facets) {
return new LineageDataset() {
@Override
public String name() {
return kafkaDatasetFacet.getTopicIdentifier().toLineageName();
}

@Override
public String namespace() {
return namespace;
}

@Override
public Map<String, LineageDatasetFacet> facets() {
Map<String, LineageDatasetFacet> facetMap = new HashMap<>();
facetMap.put(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);
facetMap.putAll(
facets.stream()
.collect(
Collectors.toMap(LineageDatasetFacet::name, item -> item)));
return facetMap;
}
};
}

public static String namespaceOf(Properties properties) {
String bootstrapServers = properties.getProperty("bootstrap.servers");

if (bootstrapServers == null) {
return KAFKA_DATASET_PREFIX;
}

if (bootstrapServers.contains(COMMA)) {
bootstrapServers = bootstrapServers.split(COMMA)[0];
} else if (bootstrapServers.contains(SEMICOLON)) {
bootstrapServers = bootstrapServers.split(SEMICOLON)[0];
}

return String.format(KAFKA_DATASET_PREFIX + bootstrapServers);
}

public static SourceLineageVertex sourceLineageVertexOf(Collection<LineageDataset> datasets) {
return new SourceLineageVertex() {
@Override
public Boundedness boundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public List<LineageDataset> datasets() {
return datasets.stream().collect(Collectors.toList());
}
};
}

public static LineageVertex lineageVertexOf(Collection<LineageDataset> datasets) {
return new LineageVertex() {
@Override
public List<LineageDataset> datasets() {
return datasets.stream().collect(Collectors.toList());
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

/** Facet definition to contain type information of source and sink. */
@PublicEvolving
public interface TypeDatasetFacet extends LineageDatasetFacet {
TypeInformation getTypeInformation();
}
Loading

0 comments on commit 727327d

Please sign in to comment.