From cee66261ba96fb8c49b4da0f74abc7c33c05b146 Mon Sep 17 00:00:00 2001
From: Tartarus0zm <zhangmang1@163.com>
Date: Fri, 20 Dec 2024 20:08:19 +0800
Subject: [PATCH 1/3] [FLINK-36946][Metrics] Optimize sink operator name
 truncate

---
 .../flink/configuration/ConfigConstants.java  |  5 +++++
 .../groups/JobManagerJobMetricGroup.java      |  3 ++-
 .../metrics/groups/TaskMetricGroup.java       |  5 ++++-
 .../runtime/metrics/util/MetricUtils.java     | 14 +++++++++++++
 .../SinkTransformationTranslator.java         | 20 +++++++++----------
 .../streaming/runtime/SinkMetricsITCase.java  | 15 ++++++++------
 6 files changed, 44 insertions(+), 18 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 653be48964feb..f26c93e33556e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -109,6 +109,11 @@ public final class ConfigConstants {
 
     public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
 
+    /** The suffix of sink operator name. */
+    public static final String COMMITTER_NAME = "Committer";
+
+    public static final String WRITER_NAME = "Writer";
+
     /** Not instantiable. */
     private ConfigConstants() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
index 9b98f5ad14926..c171946581858 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
@@ -88,7 +89,7 @@ private String getTruncatedOperatorName(String operatorName) {
                     "The operator name {} exceeded the {} characters length limit and was truncated.",
                     operatorName,
                     METRICS_OPERATOR_NAME_MAX_LENGTH);
-            return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
+            return MetricUtils.truncateOperatorName(operatorName, METRICS_OPERATOR_NAME_MAX_LENGTH);
         } else {
             return operatorName;
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index afcbbaa44bff2..db0e58addf1c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -26,6 +26,7 @@
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
@@ -156,7 +157,9 @@ public InternalOperatorMetricGroup getOrAddOperator(
                     "The operator name {} exceeded the {} characters length limit and was truncated.",
                     operatorName,
                     METRICS_OPERATOR_NAME_MAX_LENGTH);
-            truncatedOperatorName = operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
+            truncatedOperatorName =
+                    MetricUtils.truncateOperatorName(
+                            operatorName, METRICS_OPERATOR_NAME_MAX_LENGTH);
         } else {
             truncatedOperatorName = operatorName;
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 356cee13e91b9..0d839c57f2633 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
@@ -83,6 +84,8 @@ public class MetricUtils {
     @VisibleForTesting static final String METRIC_GROUP_MEMORY = "Memory";
 
     @VisibleForTesting static final String METRIC_GROUP_MANAGED_MEMORY = "Managed";
+    private static final String WRITER_SUFFIX = ": " + ConfigConstants.WRITER_NAME;
+    private static final String COMMITTER_SUFFIX = ": " + ConfigConstants.COMMITTER_NAME;
 
     private MetricUtils() {}
 
@@ -366,6 +369,17 @@ private static void instantiateCPUMetrics(MetricGroup metrics) {
         }
     }
 
+    public static String truncateOperatorName(String operatorName, int maxLength) {
+        if (operatorName.endsWith(WRITER_SUFFIX)) {
+            return operatorName.substring(0, maxLength - WRITER_SUFFIX.length()) + WRITER_SUFFIX;
+        }
+        if (operatorName.endsWith(COMMITTER_SUFFIX)) {
+            return operatorName.substring(0, maxLength - COMMITTER_SUFFIX.length())
+                    + COMMITTER_SUFFIX;
+        }
+        return operatorName.substring(0, maxLength);
+    }
+
     private static final class AttributeGauge<T> implements Gauge<T> {
         private final MBeanServer server;
         private final ObjectName objectName;
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index dd41218a4970f..4dfdf2ca319fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -26,6 +26,7 @@
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SupportsCommitter;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
@@ -72,9 +73,6 @@
 public class SinkTransformationTranslator<Input, Output>
         implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {
 
-    private static final String COMMITTER_NAME = "Committer";
-    private static final String WRITER_NAME = "Writer";
-
     @Override
     public Collection<Integer> translateForBatch(
             SinkTransformation<Input, Output> transformation, Context context) {
@@ -170,7 +168,7 @@ private void expand() {
                         prewritten,
                         input ->
                                 input.transform(
-                                        WRITER_NAME,
+                                        ConfigConstants.WRITER_NAME,
                                         CommittableMessageTypeInfo.noOutput(),
                                         new SinkWriterOperatorFactory<>(sink)),
                         false,
@@ -284,7 +282,7 @@ private <CommT, WriteResultT> void addCommittingTopology(
                             precommitted,
                             pc ->
                                     pc.transform(
-                                            COMMITTER_NAME,
+                                            ConfigConstants.COMMITTER_NAME,
                                             committableTypeInformation,
                                             new CommitterOperatorFactory<>(
                                                     committingSink,
@@ -315,7 +313,7 @@ private <WriteResultT> DataStream<CommittableMessage<WriteResultT>> addWriter(
                             inputStream,
                             input ->
                                     input.transform(
-                                            WRITER_NAME,
+                                            ConfigConstants.WRITER_NAME,
                                             typeInformation,
                                             new SinkWriterOperatorFactory<>(sink)),
                             false,
@@ -383,10 +381,12 @@ private <I, R> R adjustTransformations(
 
                 // Set the operator uid hashes to support stateful upgrades without prior uids
                 setOperatorUidHashIfPossible(
-                        subTransformation, WRITER_NAME, operatorsUidHashes.getWriterUidHash());
+                        subTransformation,
+                        ConfigConstants.WRITER_NAME,
+                        operatorsUidHashes.getWriterUidHash());
                 setOperatorUidHashIfPossible(
                         subTransformation,
-                        COMMITTER_NAME,
+                        ConfigConstants.COMMITTER_NAME,
                         operatorsUidHashes.getCommitterUidHash());
                 setOperatorUidHashIfPossible(
                         subTransformation,
@@ -479,14 +479,14 @@ private void concatUid(
             if (transformationName != null && transformation.getUid() != null) {
                 // Use the same uid pattern than for Sink V1. We deliberately decided to use the uid
                 // pattern of Flink 1.13 because 1.14 did not have a dedicated committer operator.
-                if (transformationName.equals(COMMITTER_NAME)) {
+                if (transformationName.equals(ConfigConstants.COMMITTER_NAME)) {
                     final String committerFormat = "Sink Committer: %s";
                     subTransformation.setUid(
                             String.format(committerFormat, transformation.getUid()));
                     return;
                 }
                 // Set the writer operator uid to the sinks uid to support state migrations
-                if (transformationName.equals(WRITER_NAME)) {
+                if (transformationName.equals(ConfigConstants.WRITER_NAME)) {
                     subTransformation.setUid(transformation.getUid());
                     return;
                 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index 404f65f0ee6a5..530fe008d5040 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.metrics.Metric;
@@ -28,6 +29,7 @@
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.testutils.InMemoryReporter;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -55,9 +57,8 @@
 /** Tests whether all provided metrics of a {@link Sink} are of the expected values (FLIP-33). */
 public class SinkMetricsITCase extends TestLogger {
 
-    private static final String TEST_SINK_NAME = "MetricTestSink";
-    // please refer to SinkTransformationTranslator#WRITER_NAME
-    private static final String DEFAULT_WRITER_NAME = "Writer";
+    private static final String TEST_LONG_SINK_NAME =
+            "LongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongMetricTestSink";
     private static final int DEFAULT_PARALLELISM = 4;
 
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
@@ -103,7 +104,7 @@ public void testMetrics() throws Exception {
                             return i;
                         })
                 .sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new MetricWriter()).build())
-                .name(TEST_SINK_NAME);
+                .name(TEST_LONG_SINK_NAME);
         JobClient jobClient = env.executeAsync();
         final JobID jobId = jobClient.getJobID();
 
@@ -123,7 +124,9 @@ private void assertSinkMetrics(
             JobID jobId, long processedRecordsPerSubtask, int parallelism, int numSplits) {
         List<OperatorMetricGroup> groups =
                 reporter.findOperatorMetricGroups(
-                        jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
+                        jobId,
+                        MetricUtils.truncateOperatorName(
+                                TEST_LONG_SINK_NAME + ": " + ConfigConstants.WRITER_NAME, 80));
         assertThat(groups, hasSize(parallelism));
 
         int subtaskWithMetrics = 0;
@@ -160,7 +163,7 @@ private void assertSinkMetrics(
 
         // Test operator I/O metrics are reused by task metrics
         List<TaskMetricGroup> taskMetricGroups =
-                reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME);
+                reporter.findTaskMetricGroups(jobId, TEST_LONG_SINK_NAME);
         assertThat(taskMetricGroups, hasSize(parallelism));
 
         int subtaskWithTaskMetrics = 0;

From 6fdd3c3fe146e2db6a74f964701dde2ec179cb7f Mon Sep 17 00:00:00 2001
From: Tartarus0zm <zhangmang1@163.com>
Date: Sun, 22 Dec 2024 17:20:33 +0800
Subject: [PATCH 2/3] address comment

---
 .../flink/configuration/ConfigConstants.java  |  2 +
 .../groups/JobManagerJobMetricGroup.java      | 17 +-----
 .../metrics/groups/TaskMetricGroup.java       | 15 +----
 .../runtime/metrics/util/MetricUtils.java     | 27 +++++---
 .../metrics/groups/TaskMetricGroupTest.java   |  5 +-
 .../runtime/metrics/util/MetricUtilsTest.java | 61 +++++++++++++++++++
 .../streaming/runtime/SinkMetricsITCase.java  |  2 +-
 7 files changed, 89 insertions(+), 40 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f26c93e33556e..ead76f9bf4074 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -114,6 +114,8 @@ public final class ConfigConstants {
 
     public static final String WRITER_NAME = "Writer";
 
+    public static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
+
     /** Not instantiable. */
     private ConfigConstants() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
index c171946581858..bf9b65879b15a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
@@ -31,7 +31,6 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.flink.runtime.metrics.groups.TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -63,7 +62,7 @@ public final JobManagerMetricGroup parent() {
 
     public JobManagerOperatorMetricGroup getOrAddOperator(
             AbstractID vertexId, String taskName, OperatorID operatorID, String operatorName) {
-        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
+        final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName);
 
         // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
         // operators
@@ -83,25 +82,13 @@ public JobManagerOperatorMetricGroup getOrAddOperator(
         }
     }
 
-    private String getTruncatedOperatorName(String operatorName) {
-        if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
-            LOG.warn(
-                    "The operator name {} exceeded the {} characters length limit and was truncated.",
-                    operatorName,
-                    METRICS_OPERATOR_NAME_MAX_LENGTH);
-            return MetricUtils.truncateOperatorName(operatorName, METRICS_OPERATOR_NAME_MAX_LENGTH);
-        } else {
-            return operatorName;
-        }
-    }
-
     @VisibleForTesting
     int numRegisteredOperatorMetricGroups() {
         return operators.size();
     }
 
     void removeOperatorMetricGroup(OperatorID operatorID, String operatorName) {
-        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
+        final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName);
 
         // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
         // operators
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index db0e58addf1c9..34c2f7deaa9c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -46,8 +46,6 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 
     private final Map<String, InternalOperatorMetricGroup> operators = new HashMap<>();
 
-    static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
-
     private final TaskIOMetricGroup ioMetrics;
 
     /**
@@ -151,18 +149,7 @@ public InternalOperatorMetricGroup getOrAddOperator(String operatorName) {
 
     public InternalOperatorMetricGroup getOrAddOperator(
             OperatorID operatorID, String operatorName) {
-        final String truncatedOperatorName;
-        if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
-            LOG.warn(
-                    "The operator name {} exceeded the {} characters length limit and was truncated.",
-                    operatorName,
-                    METRICS_OPERATOR_NAME_MAX_LENGTH);
-            truncatedOperatorName =
-                    MetricUtils.truncateOperatorName(
-                            operatorName, METRICS_OPERATOR_NAME_MAX_LENGTH);
-        } else {
-            truncatedOperatorName = operatorName;
-        }
+        final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName);
 
         // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
         // operators
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 0d839c57f2633..24f082a7d0b57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -66,6 +66,7 @@
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.configuration.ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH;
 import static org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -369,15 +370,25 @@ private static void instantiateCPUMetrics(MetricGroup metrics) {
         }
     }
 
-    public static String truncateOperatorName(String operatorName, int maxLength) {
-        if (operatorName.endsWith(WRITER_SUFFIX)) {
-            return operatorName.substring(0, maxLength - WRITER_SUFFIX.length()) + WRITER_SUFFIX;
-        }
-        if (operatorName.endsWith(COMMITTER_SUFFIX)) {
-            return operatorName.substring(0, maxLength - COMMITTER_SUFFIX.length())
-                    + COMMITTER_SUFFIX;
+    public static String truncateOperatorName(String operatorName) {
+        if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
+            LOG.warn(
+                    "The operator name {} exceeded the {} characters length limit and was truncated.",
+                    operatorName,
+                    METRICS_OPERATOR_NAME_MAX_LENGTH);
+            if (operatorName.endsWith(WRITER_SUFFIX)) {
+                return operatorName.substring(
+                                0, METRICS_OPERATOR_NAME_MAX_LENGTH - WRITER_SUFFIX.length())
+                        + WRITER_SUFFIX;
+            }
+            if (operatorName.endsWith(COMMITTER_SUFFIX)) {
+                return operatorName.substring(
+                                0, METRICS_OPERATOR_NAME_MAX_LENGTH - COMMITTER_SUFFIX.length())
+                        + COMMITTER_SUFFIX;
+            }
+            return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
         }
-        return operatorName.substring(0, maxLength);
+        return operatorName;
     }
 
     private static final class AttributeGauge<T> implements Gauge<T> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index e2df58b75a8ec..61c7015b47b14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Metric;
@@ -200,8 +201,8 @@ void testOperatorNameTruncation() throws Exception {
                 taskMetricGroup.getOrAddOperator(originalName);
 
         String storedName = operatorMetricGroup.getScopeComponents()[0];
-        assertThat(storedName.length()).isEqualTo(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH);
-        assertThat(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH))
+        assertThat(storedName.length()).isEqualTo(ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH);
+        assertThat(originalName.substring(0, ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH))
                 .isEqualTo(storedName);
         registry.closeAsync().get();
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
index e2182aa9daf61..ae2f50cbbde40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
@@ -283,6 +283,67 @@ public MetricGroup addGroup(String name) {
         }
     }
 
+    @Test
+    void testTruncateOperatorName() {
+        // test operator name is null
+        assertThat(MetricUtils.truncateOperatorName(null)).isNull();
+        // test operator name length less than 80
+        final String operatorNameLess = "testOperatorName";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameLess)).isEqualTo(operatorNameLess);
+        // test operator name length less than 80 and end with : Writer
+        final String operatorNameLessEndWithWriter = "testOperatorName: Writer";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameLessEndWithWriter))
+                .isEqualTo(operatorNameLessEndWithWriter);
+        // test operator name length less than 80 and end with : Committer
+        final String operatorNameLessEndWithCommitter = "testOperatorName: Committer";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameLessEndWithCommitter))
+                .isEqualTo(operatorNameLessEndWithCommitter);
+        // test operator name length less than 80 and contains with : Writer
+        final String operatorNameLessAndContainsWriter = "test: WriterOperatorName";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameLessAndContainsWriter))
+                .isEqualTo(operatorNameLessAndContainsWriter);
+        // test operator name length less than 80 and contains with : Committer
+        final String operatorNameLessAndContainsCommitter = "test: CommitterOperatorName";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameLessAndContainsCommitter))
+                .isEqualTo(operatorNameLessAndContainsCommitter);
+
+        // test operator name length more than 80
+        final String operatorNameMore =
+                "testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName";
+        final String expectedOperatorNameMore =
+                "testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameMore))
+                .isEqualTo(expectedOperatorNameMore);
+
+        // test operator name length more than 80 and end with : Writer
+        final String operatorNameMoreEndWithWriter =
+                "testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName: Writer";
+        final String expectedOperatorNameMoreEndWithWriter =
+                "testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: Writer";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameMoreEndWithWriter))
+                .isEqualTo(expectedOperatorNameMoreEndWithWriter);
+
+        // test operator name length more than 80 and end with : Committer
+        final String operatorNameMoreEndWithCommitter =
+                "testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName: Committer";
+        final String expectedOperatorNameMoreEndWithCommitter =
+                "testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongL: Committer";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameMoreEndWithCommitter))
+                .isEqualTo(expectedOperatorNameMoreEndWithCommitter);
+
+        // test operator name length more than 80 and contains with : Writer
+        final String operatorNameMoreAndContainsWriter =
+                "testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: WriterOperatorName";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameMoreAndContainsWriter))
+                .isEqualTo(expectedOperatorNameMore);
+
+        // test operator name length more than 80 and contains with : Committer
+        final String operatorNameMoreAndContainsCommitter =
+                "testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: CommitterOperatorName";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameMoreAndContainsCommitter))
+                .isEqualTo(expectedOperatorNameMore);
+    }
+
     // --------------- utility methods and classes ---------------
 
     /**
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index 530fe008d5040..c47de118bb70d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -126,7 +126,7 @@ private void assertSinkMetrics(
                 reporter.findOperatorMetricGroups(
                         jobId,
                         MetricUtils.truncateOperatorName(
-                                TEST_LONG_SINK_NAME + ": " + ConfigConstants.WRITER_NAME, 80));
+                                TEST_LONG_SINK_NAME + ": " + ConfigConstants.WRITER_NAME));
         assertThat(groups, hasSize(parallelism));
 
         int subtaskWithMetrics = 0;

From 31e49e778680e40313617100cb0bcacf755f351b Mon Sep 17 00:00:00 2001
From: Tartarus0zm <zhangmang1@163.com>
Date: Wed, 8 Jan 2025 16:29:05 +0800
Subject: [PATCH 3/3] address comment

---
 .../flink/runtime/metrics/util/MetricUtils.java       | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 24f082a7d0b57..6b0c818fad646 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -378,12 +378,19 @@ public static String truncateOperatorName(String operatorName) {
                     METRICS_OPERATOR_NAME_MAX_LENGTH);
             if (operatorName.endsWith(WRITER_SUFFIX)) {
                 return operatorName.substring(
-                                0, METRICS_OPERATOR_NAME_MAX_LENGTH - WRITER_SUFFIX.length())
+                                0,
+                                Math.max(
+                                        0,
+                                        METRICS_OPERATOR_NAME_MAX_LENGTH - WRITER_SUFFIX.length()))
                         + WRITER_SUFFIX;
             }
             if (operatorName.endsWith(COMMITTER_SUFFIX)) {
                 return operatorName.substring(
-                                0, METRICS_OPERATOR_NAME_MAX_LENGTH - COMMITTER_SUFFIX.length())
+                                0,
+                                Math.max(
+                                        0,
+                                        METRICS_OPERATOR_NAME_MAX_LENGTH
+                                                - COMMITTER_SUFFIX.length()))
                         + COMMITTER_SUFFIX;
             }
             return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);