Skip to content

Commit

Permalink
[FLINK-36946][Metrics] Optimize sink operator name truncate (#25832)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tartarus0zm authored Jan 10, 2025
1 parent bceca90 commit b75d76e
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ public final class ConfigConstants {

public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;

/** The suffix of sink operator name. */
public static final String COMMITTER_NAME = "Committer";

public static final String WRITER_NAME = "Writer";

public static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;

/** Not instantiable. */
private ConfigConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.util.AbstractID;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.runtime.metrics.groups.TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -62,7 +62,7 @@ public final JobManagerMetricGroup parent() {

public JobManagerOperatorMetricGroup getOrAddOperator(
AbstractID vertexId, String taskName, OperatorID operatorID, String operatorName) {
final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName);

// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
// operators
Expand All @@ -82,25 +82,13 @@ public JobManagerOperatorMetricGroup getOrAddOperator(
}
}

private String getTruncatedOperatorName(String operatorName) {
if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn(
"The operator name {} exceeded the {} characters length limit and was truncated.",
operatorName,
METRICS_OPERATOR_NAME_MAX_LENGTH);
return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
} else {
return operatorName;
}
}

@VisibleForTesting
int numRegisteredOperatorMetricGroups() {
return operators.size();
}

void removeOperatorMetricGroup(OperatorID operatorID, String operatorName) {
final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName);

// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
// operators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.util.AbstractID;

import javax.annotation.Nullable;
Expand All @@ -45,8 +46,6 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr

private final Map<String, InternalOperatorMetricGroup> operators = new HashMap<>();

static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;

private final TaskIOMetricGroup ioMetrics;

/**
Expand Down Expand Up @@ -150,16 +149,7 @@ public InternalOperatorMetricGroup getOrAddOperator(String operatorName) {

public InternalOperatorMetricGroup getOrAddOperator(
OperatorID operatorID, String operatorName) {
final String truncatedOperatorName;
if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn(
"The operator name {} exceeded the {} characters length limit and was truncated.",
operatorName,
METRICS_OPERATOR_NAME_MAX_LENGTH);
truncatedOperatorName = operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
} else {
truncatedOperatorName = operatorName;
}
final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName);

// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
// operators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Gauge;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH;
import static org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -83,6 +85,8 @@ public class MetricUtils {
@VisibleForTesting static final String METRIC_GROUP_MEMORY = "Memory";

@VisibleForTesting static final String METRIC_GROUP_MANAGED_MEMORY = "Managed";
private static final String WRITER_SUFFIX = ": " + ConfigConstants.WRITER_NAME;
private static final String COMMITTER_SUFFIX = ": " + ConfigConstants.COMMITTER_NAME;

private MetricUtils() {}

Expand Down Expand Up @@ -366,6 +370,34 @@ private static void instantiateCPUMetrics(MetricGroup metrics) {
}
}

public static String truncateOperatorName(String operatorName) {
if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn(
"The operator name {} exceeded the {} characters length limit and was truncated.",
operatorName,
METRICS_OPERATOR_NAME_MAX_LENGTH);
if (operatorName.endsWith(WRITER_SUFFIX)) {
return operatorName.substring(
0,
Math.max(
0,
METRICS_OPERATOR_NAME_MAX_LENGTH - WRITER_SUFFIX.length()))
+ WRITER_SUFFIX;
}
if (operatorName.endsWith(COMMITTER_SUFFIX)) {
return operatorName.substring(
0,
Math.max(
0,
METRICS_OPERATOR_NAME_MAX_LENGTH
- COMMITTER_SUFFIX.length()))
+ COMMITTER_SUFFIX;
}
return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
}
return operatorName;
}

private static final class AttributeGauge<T> implements Gauge<T> {
private final MBeanServer server;
private final ObjectName objectName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
Expand Down Expand Up @@ -72,9 +73,6 @@
public class SinkTransformationTranslator<Input, Output>
implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {

private static final String COMMITTER_NAME = "Committer";
private static final String WRITER_NAME = "Writer";

@Override
public Collection<Integer> translateForBatch(
SinkTransformation<Input, Output> transformation, Context context) {
Expand Down Expand Up @@ -170,7 +168,7 @@ private void expand() {
prewritten,
input ->
input.transform(
WRITER_NAME,
ConfigConstants.WRITER_NAME,
CommittableMessageTypeInfo.noOutput(),
new SinkWriterOperatorFactory<>(sink)),
false,
Expand Down Expand Up @@ -284,7 +282,7 @@ private <CommT, WriteResultT> void addCommittingTopology(
precommitted,
pc ->
pc.transform(
COMMITTER_NAME,
ConfigConstants.COMMITTER_NAME,
committableTypeInformation,
new CommitterOperatorFactory<>(
committingSink,
Expand Down Expand Up @@ -315,7 +313,7 @@ private <WriteResultT> DataStream<CommittableMessage<WriteResultT>> addWriter(
inputStream,
input ->
input.transform(
WRITER_NAME,
ConfigConstants.WRITER_NAME,
typeInformation,
new SinkWriterOperatorFactory<>(sink)),
false,
Expand Down Expand Up @@ -383,10 +381,12 @@ private <I, R> R adjustTransformations(

// Set the operator uid hashes to support stateful upgrades without prior uids
setOperatorUidHashIfPossible(
subTransformation, WRITER_NAME, operatorsUidHashes.getWriterUidHash());
subTransformation,
ConfigConstants.WRITER_NAME,
operatorsUidHashes.getWriterUidHash());
setOperatorUidHashIfPossible(
subTransformation,
COMMITTER_NAME,
ConfigConstants.COMMITTER_NAME,
operatorsUidHashes.getCommitterUidHash());
setOperatorUidHashIfPossible(
subTransformation,
Expand Down Expand Up @@ -479,14 +479,14 @@ private void concatUid(
if (transformationName != null && transformation.getUid() != null) {
// Use the same uid pattern than for Sink V1. We deliberately decided to use the uid
// pattern of Flink 1.13 because 1.14 did not have a dedicated committer operator.
if (transformationName.equals(COMMITTER_NAME)) {
if (transformationName.equals(ConfigConstants.COMMITTER_NAME)) {
final String committerFormat = "Sink Committer: %s";
subTransformation.setUid(
String.format(committerFormat, transformation.getUid()));
return;
}
// Set the writer operator uid to the sinks uid to support state migrations
if (transformationName.equals(WRITER_NAME)) {
if (transformationName.equals(ConfigConstants.WRITER_NAME)) {
subTransformation.setUid(transformation.getUid());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Metric;
Expand Down Expand Up @@ -200,8 +201,8 @@ void testOperatorNameTruncation() throws Exception {
taskMetricGroup.getOrAddOperator(originalName);

String storedName = operatorMetricGroup.getScopeComponents()[0];
assertThat(storedName.length()).isEqualTo(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH);
assertThat(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH))
assertThat(storedName.length()).isEqualTo(ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH);
assertThat(originalName.substring(0, ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH))
.isEqualTo(storedName);
registry.closeAsync().get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,67 @@ public MetricGroup addGroup(String name) {
}
}

@Test
void testTruncateOperatorName() {
// test operator name is null
assertThat(MetricUtils.truncateOperatorName(null)).isNull();
// test operator name length less than 80
final String operatorNameLess = "testOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameLess)).isEqualTo(operatorNameLess);
// test operator name length less than 80 and end with : Writer
final String operatorNameLessEndWithWriter = "testOperatorName: Writer";
assertThat(MetricUtils.truncateOperatorName(operatorNameLessEndWithWriter))
.isEqualTo(operatorNameLessEndWithWriter);
// test operator name length less than 80 and end with : Committer
final String operatorNameLessEndWithCommitter = "testOperatorName: Committer";
assertThat(MetricUtils.truncateOperatorName(operatorNameLessEndWithCommitter))
.isEqualTo(operatorNameLessEndWithCommitter);
// test operator name length less than 80 and contains with : Writer
final String operatorNameLessAndContainsWriter = "test: WriterOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameLessAndContainsWriter))
.isEqualTo(operatorNameLessAndContainsWriter);
// test operator name length less than 80 and contains with : Committer
final String operatorNameLessAndContainsCommitter = "test: CommitterOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameLessAndContainsCommitter))
.isEqualTo(operatorNameLessAndContainsCommitter);

// test operator name length more than 80
final String operatorNameMore =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName";
final String expectedOperatorNameMore =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong";
assertThat(MetricUtils.truncateOperatorName(operatorNameMore))
.isEqualTo(expectedOperatorNameMore);

// test operator name length more than 80 and end with : Writer
final String operatorNameMoreEndWithWriter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName: Writer";
final String expectedOperatorNameMoreEndWithWriter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: Writer";
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreEndWithWriter))
.isEqualTo(expectedOperatorNameMoreEndWithWriter);

// test operator name length more than 80 and end with : Committer
final String operatorNameMoreEndWithCommitter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName: Committer";
final String expectedOperatorNameMoreEndWithCommitter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongL: Committer";
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreEndWithCommitter))
.isEqualTo(expectedOperatorNameMoreEndWithCommitter);

// test operator name length more than 80 and contains with : Writer
final String operatorNameMoreAndContainsWriter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: WriterOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreAndContainsWriter))
.isEqualTo(expectedOperatorNameMore);

// test operator name length more than 80 and contains with : Committer
final String operatorNameMoreAndContainsCommitter =
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: CommitterOperatorName";
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreAndContainsCommitter))
.isEqualTo(expectedOperatorNameMore);
}

// --------------- utility methods and classes ---------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -55,9 +57,8 @@
/** Tests whether all provided metrics of a {@link Sink} are of the expected values (FLIP-33). */
public class SinkMetricsITCase extends TestLogger {

private static final String TEST_SINK_NAME = "MetricTestSink";
// please refer to SinkTransformationTranslator#WRITER_NAME
private static final String DEFAULT_WRITER_NAME = "Writer";
private static final String TEST_LONG_SINK_NAME =
"LongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongMetricTestSink";
private static final int DEFAULT_PARALLELISM = 4;

@Rule public final SharedObjects sharedObjects = SharedObjects.create();
Expand Down Expand Up @@ -103,7 +104,7 @@ public void testMetrics() throws Exception {
return i;
})
.sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new MetricWriter()).build())
.name(TEST_SINK_NAME);
.name(TEST_LONG_SINK_NAME);
JobClient jobClient = env.executeAsync();
final JobID jobId = jobClient.getJobID();

Expand All @@ -123,7 +124,9 @@ private void assertSinkMetrics(
JobID jobId, long processedRecordsPerSubtask, int parallelism, int numSplits) {
List<OperatorMetricGroup> groups =
reporter.findOperatorMetricGroups(
jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
jobId,
MetricUtils.truncateOperatorName(
TEST_LONG_SINK_NAME + ": " + ConfigConstants.WRITER_NAME));
assertThat(groups, hasSize(parallelism));

int subtaskWithMetrics = 0;
Expand Down Expand Up @@ -160,7 +163,7 @@ private void assertSinkMetrics(

// Test operator I/O metrics are reused by task metrics
List<TaskMetricGroup> taskMetricGroups =
reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME);
reporter.findTaskMetricGroups(jobId, TEST_LONG_SINK_NAME);
assertThat(taskMetricGroups, hasSize(parallelism));

int subtaskWithTaskMetrics = 0;
Expand Down

0 comments on commit b75d76e

Please sign in to comment.