Skip to content

Commit

Permalink
[SNOW-870373] Enable JMX metrics for Snowpipe Streaming (#674)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng authored Aug 25, 2023
1 parent 1bb7bd2 commit ef0c907
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ public class MetricsUtil {
*/
public static final String PURGED_OFFSET = "purged-offset";

/**
* See {@link com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} for offset
* description
*/
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset";

public static final String LATEST_CONSUMER_OFFSET = "latest-consumer-offset";

// Buffer related constants
public static final String BUFFER_SUB_DOMAIN = "buffer";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ private void createStreamingChannelForTopicPartition(
this.sinkTaskContext,
this.conn,
this.recordService,
this.conn.getTelemetryClient()));
this.conn.getTelemetryClient(),
this.enableCustomJMXMonitoring));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.OFFSET_SUB_DOMAIN;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.constructMetricName;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.DURATION_BETWEEN_GET_OFFSET_TOKEN_RETRY;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.MAX_GET_OFFSET_TOKEN_RETRIES;
import static java.time.temporal.ChronoUnit.SECONDS;
import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
Expand All @@ -17,6 +21,8 @@
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.PartitionBuffer;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import com.snowflake.kafka.connector.records.SnowflakeJsonSchema;
Expand Down Expand Up @@ -200,7 +206,8 @@ public TopicPartitionChannel(
sinkTaskContext,
null, /* Null Connection */
new RecordService(null /* Null Telemetry Service*/),
null);
null,
false);
}

/**
Expand Down Expand Up @@ -232,7 +239,8 @@ public TopicPartitionChannel(
SinkTaskContext sinkTaskContext,
SnowflakeConnectionService conn,
RecordService recordService,
SnowflakeTelemetryService telemetryService) {
SnowflakeTelemetryService telemetryService,
boolean enableCustomJMXMonitoring) {
this.streamingIngestClient = Preconditions.checkNotNull(streamingIngestClient);
Preconditions.checkState(!streamingIngestClient.isClosed());
this.topicPartition = Preconditions.checkNotNull(topicPartition);
Expand Down Expand Up @@ -276,6 +284,12 @@ public TopicPartitionChannel(
+ " correct offset instead",
this.getChannelName());
}

if (enableCustomJMXMonitoring) {
MetricsJmxReporter metricsJmxReporter =
new MetricsJmxReporter(new MetricRegistry(), conn.getConnectorName());
this.registerChannelJMXMetrics(channelName, metricsJmxReporter);
}
}

/**
Expand Down Expand Up @@ -1303,4 +1317,45 @@ public String toString() {
return "[" + this.name() + "]";
}
}

/**
* Registers all the Metrics inside the metricRegistry.
*
* @param channelName channelName
* @param metricsJmxReporter wrapper class for registering all metrics related to above connector
* and channel
*/
public void registerChannelJMXMetrics(
final String channelName, MetricsJmxReporter metricsJmxReporter) {
MetricRegistry currentMetricRegistry = metricsJmxReporter.getMetricRegistry();

// Lazily remove all registered metrics from the registry since this can be invoked during
// partition reassignment
LOGGER.debug(
"Registering metrics for channel:{}, existing:{}",
channelName,
metricsJmxReporter.getMetricRegistry().getMetrics().keySet().toString());
metricsJmxReporter.removeMetricsFromRegistry(channelName);

try {
// offset
currentMetricRegistry.register(
constructMetricName(
channelName, OFFSET_SUB_DOMAIN, MetricsUtil.OFFSET_PERSISTED_IN_SNOWFLAKE),
(Gauge<Long>) this.offsetPersistedInSnowflake::get);

currentMetricRegistry.register(
constructMetricName(channelName, OFFSET_SUB_DOMAIN, MetricsUtil.PROCESSED_OFFSET),
(Gauge<Long>) this.processedOffset::get);

currentMetricRegistry.register(
constructMetricName(channelName, OFFSET_SUB_DOMAIN, MetricsUtil.LATEST_CONSUMER_OFFSET),
(Gauge<Long>) () -> this.latestConsumerOffset);

} catch (IllegalArgumentException ex) {
LOGGER.warn("Metrics already present:{}", ex.getMessage());
}

metricsJmxReporter.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.TestUtils.createBigAvroRecords;
import static com.snowflake.kafka.connector.internal.TestUtils.createNativeJsonSinkRecords;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.MAX_GET_OFFSET_TOKEN_RETRIES;

import com.codahale.metrics.MetricRegistry;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.BufferThreshold;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -225,7 +228,8 @@ public void testCloseChannelException() throws Exception {
mockSinkTaskContext,
mockSnowflakeConnectionService,
new RecordService(mockTelemetryService),
mockTelemetryService);
mockTelemetryService,
false);

topicPartitionChannel.closeChannel();
}
Expand Down Expand Up @@ -490,7 +494,8 @@ public void testInsertRowsWithSchemaEvolution() throws Exception {
mockSinkTaskContext,
conn,
new RecordService(),
mockTelemetryService);
mockTelemetryService,
false);

final int noOfRecords = 3;
List<SinkRecord> records =
Expand Down Expand Up @@ -615,7 +620,8 @@ public void testInsertRows_ValidationResponseHasErrors_NoErrorTolerance() throws
mockSinkTaskContext,
mockSnowflakeConnectionService,
new RecordService(mockTelemetryService),
mockTelemetryService);
mockTelemetryService,
false);

List<SinkRecord> records = TestUtils.createJsonStringSinkRecords(0, 1, TOPIC, PARTITION);

Expand Down Expand Up @@ -809,4 +815,29 @@ public void testBigAvroBufferBytesThreshold() throws Exception {

Assert.assertEquals(2L, topicPartitionChannel.fetchOffsetTokenWithRetry());
}

@Test
public void testRegisterJmxMetrics() {
MetricRegistry metricRegistry = Mockito.spy(MetricRegistry.class);
MetricsJmxReporter metricsJmxReporter =
Mockito.spy(new MetricsJmxReporter(metricRegistry, TEST_CONNECTOR_NAME));

TopicPartitionChannel topicPartitionChannel =
new TopicPartitionChannel(
mockStreamingClient,
topicPartition,
TEST_CHANNEL_NAME,
TEST_TABLE_NAME,
streamingBufferThreshold,
sfConnectorConfig,
mockKafkaRecordErrorReporter,
mockSinkTaskContext);

// test
topicPartitionChannel.registerChannelJMXMetrics(TEST_CHANNEL_NAME, metricsJmxReporter);

// verify 3 metrics registered and started
Mockito.verify(metricsJmxReporter, Mockito.times(1)).start();
Mockito.verify(metricRegistry, Mockito.times(3)).register(Mockito.anyString(), Mockito.any());
}
}

0 comments on commit ef0c907

Please sign in to comment.