Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36946][Metrics] Optimize sink operator name truncate #25832

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ public final class ConfigConstants {

public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;

/** The suffix of sink operator name. */
public static final String COMMITTER_NAME = "Committer";

public static final String WRITER_NAME = "Writer";

public static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;

/** Not instantiable. */
private ConfigConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.util.AbstractID;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.runtime.metrics.groups.TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -62,7 +62,7 @@ public final JobManagerMetricGroup parent() {

public JobManagerOperatorMetricGroup getOrAddOperator(
AbstractID vertexId, String taskName, OperatorID operatorID, String operatorName) {
final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName);

// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
// operators
Expand All @@ -82,25 +82,13 @@ public JobManagerOperatorMetricGroup getOrAddOperator(
}
}

private String getTruncatedOperatorName(String operatorName) {
if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn(
"The operator name {} exceeded the {} characters length limit and was truncated.",
operatorName,
METRICS_OPERATOR_NAME_MAX_LENGTH);
return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
} else {
return operatorName;
}
}

@VisibleForTesting
int numRegisteredOperatorMetricGroups() {
return operators.size();
}

void removeOperatorMetricGroup(OperatorID operatorID, String operatorName) {
final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName);

// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
// operators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.util.AbstractID;

import javax.annotation.Nullable;
Expand All @@ -45,8 +46,6 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr

private final Map<String, InternalOperatorMetricGroup> operators = new HashMap<>();

static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;

private final TaskIOMetricGroup ioMetrics;

/**
Expand Down Expand Up @@ -150,16 +149,7 @@ public InternalOperatorMetricGroup getOrAddOperator(String operatorName) {

public InternalOperatorMetricGroup getOrAddOperator(
OperatorID operatorID, String operatorName) {
final String truncatedOperatorName;
if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn(
"The operator name {} exceeded the {} characters length limit and was truncated.",
operatorName,
METRICS_OPERATOR_NAME_MAX_LENGTH);
truncatedOperatorName = operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
} else {
truncatedOperatorName = operatorName;
}
final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName);

// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
// operators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Gauge;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH;
import static org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -83,6 +85,8 @@ public class MetricUtils {
@VisibleForTesting static final String METRIC_GROUP_MEMORY = "Memory";

@VisibleForTesting static final String METRIC_GROUP_MANAGED_MEMORY = "Managed";
private static final String WRITER_SUFFIX = ": " + ConfigConstants.WRITER_NAME;
private static final String COMMITTER_SUFFIX = ": " + ConfigConstants.COMMITTER_NAME;

private MetricUtils() {}

Expand Down Expand Up @@ -366,6 +370,34 @@ private static void instantiateCPUMetrics(MetricGroup metrics) {
}
}

public static String truncateOperatorName(String operatorName) {
if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn(
"The operator name {} exceeded the {} characters length limit and was truncated.",
operatorName,
METRICS_OPERATOR_NAME_MAX_LENGTH);
if (operatorName.endsWith(WRITER_SUFFIX)) {
return operatorName.substring(
0,
Math.max(
0,
METRICS_OPERATOR_NAME_MAX_LENGTH - WRITER_SUFFIX.length()))
+ WRITER_SUFFIX;
}
if (operatorName.endsWith(COMMITTER_SUFFIX)) {
return operatorName.substring(
0,
Math.max(
0,
METRICS_OPERATOR_NAME_MAX_LENGTH
- COMMITTER_SUFFIX.length()))
+ COMMITTER_SUFFIX;
}
return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
}
return operatorName;
}

private static final class AttributeGauge<T> implements Gauge<T> {
private final MBeanServer server;
private final ObjectName objectName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
Expand Down Expand Up @@ -72,9 +73,6 @@
public class SinkTransformationTranslator<Input, Output>
implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {

private static final String COMMITTER_NAME = "Committer";
private static final String WRITER_NAME = "Writer";

@Override
public Collection<Integer> translateForBatch(
SinkTransformation<Input, Output> transformation, Context context) {
Expand Down Expand Up @@ -170,7 +168,7 @@ private void expand() {
prewritten,
input ->
input.transform(
WRITER_NAME,
ConfigConstants.WRITER_NAME,
CommittableMessageTypeInfo.noOutput(),
new SinkWriterOperatorFactory<>(sink)),
false,
Expand Down Expand Up @@ -284,7 +282,7 @@ private <CommT, WriteResultT> void addCommittingTopology(
precommitted,
pc ->
pc.transform(
COMMITTER_NAME,
ConfigConstants.COMMITTER_NAME,
committableTypeInformation,
new CommitterOperatorFactory<>(
committingSink,
Expand Down Expand Up @@ -315,7 +313,7 @@ private <WriteResultT> DataStream<CommittableMessage<WriteResultT>> addWriter(
inputStream,
input ->
input.transform(
WRITER_NAME,
ConfigConstants.WRITER_NAME,
typeInformation,
new SinkWriterOperatorFactory<>(sink)),
false,
Expand Down Expand Up @@ -383,10 +381,12 @@ private <I, R> R adjustTransformations(

// Set the operator uid hashes to support stateful upgrades without prior uids
setOperatorUidHashIfPossible(
subTransformation, WRITER_NAME, operatorsUidHashes.getWriterUidHash());
subTransformation,
ConfigConstants.WRITER_NAME,
operatorsUidHashes.getWriterUidHash());
setOperatorUidHashIfPossible(
subTransformation,
COMMITTER_NAME,
ConfigConstants.COMMITTER_NAME,
operatorsUidHashes.getCommitterUidHash());
setOperatorUidHashIfPossible(
subTransformation,
Expand Down Expand Up @@ -479,14 +479,14 @@ private void concatUid(
if (transformationName != null && transformation.getUid() != null) {
// Use the same uid pattern than for Sink V1. We deliberately decided to use the uid
// pattern of Flink 1.13 because 1.14 did not have a dedicated committer operator.
if (transformationName.equals(COMMITTER_NAME)) {
if (transformationName.equals(ConfigConstants.COMMITTER_NAME)) {
final String committerFormat = "Sink Committer: %s";
subTransformation.setUid(
String.format(committerFormat, transformation.getUid()));
return;
}
// Set the writer operator uid to the sinks uid to support state migrations
if (transformationName.equals(WRITER_NAME)) {
if (transformationName.equals(ConfigConstants.WRITER_NAME)) {
subTransformation.setUid(transformation.getUid());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Metric;
Expand Down Expand Up @@ -200,8 +201,8 @@ void testOperatorNameTruncation() throws Exception {
taskMetricGroup.getOrAddOperator(originalName);

String storedName = operatorMetricGroup.getScopeComponents()[0];
assertThat(storedName.length()).isEqualTo(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH);
assertThat(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH))
assertThat(storedName.length()).isEqualTo(ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH);
assertThat(originalName.substring(0, ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH))
.isEqualTo(storedName);
registry.closeAsync().get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,67 @@ public MetricGroup addGroup(String name) {
}
}

@Test
void testTruncateOperatorName() {
// test operator name is null
assertThat(MetricUtils.truncateOperatorName(null)).isNull();
// test operator name length less than 80
final String operatorNameLess = "testOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameLess)).isEqualTo(operatorNameLess);
// test operator name length less than 80 and end with : Writer
final String operatorNameLessEndWithWriter = "testOperatorName: Writer";
assertThat(MetricUtils.truncateOperatorName(operatorNameLessEndWithWriter))
.isEqualTo(operatorNameLessEndWithWriter);
// test operator name length less than 80 and end with : Committer
final String operatorNameLessEndWithCommitter = "testOperatorName: Committer";
assertThat(MetricUtils.truncateOperatorName(operatorNameLessEndWithCommitter))
.isEqualTo(operatorNameLessEndWithCommitter);
// test operator name length less than 80 and contains with : Writer
final String operatorNameLessAndContainsWriter = "test: WriterOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameLessAndContainsWriter))
.isEqualTo(operatorNameLessAndContainsWriter);
// test operator name length less than 80 and contains with : Committer
final String operatorNameLessAndContainsCommitter = "test: CommitterOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameLessAndContainsCommitter))
.isEqualTo(operatorNameLessAndContainsCommitter);

// test operator name length more than 80
final String operatorNameMore =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName";
final String expectedOperatorNameMore =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong";
assertThat(MetricUtils.truncateOperatorName(operatorNameMore))
.isEqualTo(expectedOperatorNameMore);

// test operator name length more than 80 and end with : Writer
final String operatorNameMoreEndWithWriter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName: Writer";
final String expectedOperatorNameMoreEndWithWriter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: Writer";
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreEndWithWriter))
.isEqualTo(expectedOperatorNameMoreEndWithWriter);

// test operator name length more than 80 and end with : Committer
final String operatorNameMoreEndWithCommitter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName: Committer";
final String expectedOperatorNameMoreEndWithCommitter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongL: Committer";
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreEndWithCommitter))
.isEqualTo(expectedOperatorNameMoreEndWithCommitter);

// test operator name length more than 80 and contains with : Writer
final String operatorNameMoreAndContainsWriter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: WriterOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreAndContainsWriter))
.isEqualTo(expectedOperatorNameMore);

// test operator name length more than 80 and contains with : Committer
final String operatorNameMoreAndContainsCommitter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: CommitterOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreAndContainsCommitter))
.isEqualTo(expectedOperatorNameMore);
}

// --------------- utility methods and classes ---------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -55,9 +57,8 @@
/** Tests whether all provided metrics of a {@link Sink} are of the expected values (FLIP-33). */
public class SinkMetricsITCase extends TestLogger {

private static final String TEST_SINK_NAME = "MetricTestSink";
// please refer to SinkTransformationTranslator#WRITER_NAME
private static final String DEFAULT_WRITER_NAME = "Writer";
private static final String TEST_LONG_SINK_NAME =
"LongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongMetricTestSink";
private static final int DEFAULT_PARALLELISM = 4;

@Rule public final SharedObjects sharedObjects = SharedObjects.create();
Expand Down Expand Up @@ -103,7 +104,7 @@ public void testMetrics() throws Exception {
return i;
})
.sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new MetricWriter()).build())
.name(TEST_SINK_NAME);
.name(TEST_LONG_SINK_NAME);
JobClient jobClient = env.executeAsync();
final JobID jobId = jobClient.getJobID();

Expand All @@ -123,7 +124,9 @@ private void assertSinkMetrics(
JobID jobId, long processedRecordsPerSubtask, int parallelism, int numSplits) {
List<OperatorMetricGroup> groups =
reporter.findOperatorMetricGroups(
jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
jobId,
MetricUtils.truncateOperatorName(
TEST_LONG_SINK_NAME + ": " + ConfigConstants.WRITER_NAME));
assertThat(groups, hasSize(parallelism));

int subtaskWithMetrics = 0;
Expand Down Expand Up @@ -160,7 +163,7 @@ private void assertSinkMetrics(

// Test operator I/O metrics are reused by task metrics
List<TaskMetricGroup> taskMetricGroups =
reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME);
reporter.findTaskMetricGroups(jobId, TEST_LONG_SINK_NAME);
assertThat(taskMetricGroups, hasSize(parallelism));

int subtaskWithTaskMetrics = 0;
Expand Down