Skip to content

Commit

Permalink
[Streaming JMX #1] Small changes to Snowpipe Telemetry and JMX (#672)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng authored Jul 26, 2023
1 parent 0f570ee commit 58b9247
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,17 @@
public abstract class SnowflakeTelemetryBasicInfo {
final String tableName;

/**
* User created or user visible stage name.
*
* <p>It is null for Snowpipe Streaming Telemetry.
*/
final String stageName;

static final KCLogger LOGGER = new KCLogger(SnowflakeTelemetryBasicInfo.class.getName());

/**
* Base Constructor. Accepts a tableName and StageName.
*
* @param tableName Checks for Nullability
* @param stageName Can be null (In case of Snowpipe Streaming since there is no user visible
* Snowflake Stage)
*/
public SnowflakeTelemetryBasicInfo(final String tableName, final String stageName) {
public SnowflakeTelemetryBasicInfo(final String tableName) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(tableName), "tableName cannot be null or empty");
this.tableName = tableName;
this.stageName = stageName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ public class SnowflakeTelemetryPipeCreation extends SnowflakeTelemetryBasicInfo
int fileCountReprocessPurge =
0; // files on stage that are purged due to reprocessing when cleaner starts
long startTime; // start time of the pipe
private final String stageName;
private final String pipeName;

public SnowflakeTelemetryPipeCreation(
final String tableName, final String stageName, final String pipeName) {
super(tableName, stageName);
super(tableName);
this.stageName = stageName;
this.pipeName = pipeName;
this.startTime = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.snowflake.kafka.connector.internal.telemetry;

import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.*;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.FILE_COUNT_SUB_DOMAIN;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.FILE_COUNT_TABLE_STAGE_INGESTION_FAIL;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.LATENCY_SUB_DOMAIN;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.OFFSET_SUB_DOMAIN;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.constructMetricName;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_COMMIT_LAG_FILE_COUNT;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_COMMIT_LAG_MS;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_INGESTION_LAG_FILE_COUNT;
Expand All @@ -9,23 +13,34 @@
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_KAFKA_LAG_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.BYTE_NUMBER;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.CLEANER_RESTART_COUNT;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.COMMITTED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.END_TIME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_ON_INGESTION;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_ON_STAGE;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_PURGED;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_BROKEN_RECORD;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_INGEST_FAIL;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FLUSHED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.MEMORY_USAGE;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PIPE_NAME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PROCESSED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PURGED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.RECORD_NUMBER;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.STAGE_NAME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.START_TIME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TABLE_NAME;

import com.codahale.metrics.*;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil.EventType;
import java.util.*;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -108,6 +123,7 @@ public class SnowflakeTelemetryPipeStatus extends SnowflakeTelemetryBasicInfo {
// May not be set if jmx is set to false
private Meter fileCountTableStageBrokenRecordMeter, fileCountTableStageIngestFailMeter;

private final String stageName;
private final String pipeName;

public SnowflakeTelemetryPipeStatus(
Expand All @@ -116,7 +132,8 @@ public SnowflakeTelemetryPipeStatus(
final String pipeName,
final boolean enableCustomJMXConfig,
final MetricsJmxReporter metricsJmxReporter) {
super(tableName, stageName);
super(tableName);
this.stageName = stageName;
this.pipeName = pipeName;

// Initial value of processed/flushed/committed/purged offset should be set to -1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ public final class TelemetryConstants {
public static final String STAGE_NAME = "stage_name";
public static final String PIPE_NAME = "pipe_name";

public static final String PROCESSED_OFFSET = "processed_offset";
public static final String FLUSHED_OFFSET = "flushed_offset";
public static final String COMMITTED_OFFSET = "committed_offset";
public static final String PURGED_OFFSET = "purged_offset";
public static final String PROCESSED_OFFSET = "processed-offset";
public static final String FLUSHED_OFFSET = "flushed-offset";
public static final String COMMITTED_OFFSET = "committed-offset";
public static final String PURGED_OFFSET = "purged-offset";

public static final String RECORD_NUMBER = "record_number";
public static final String BYTE_NUMBER = "byte_number";

public static final String FILE_COUNT_ON_STAGE = "file_count_on_stage";
public static final String FILE_COUNT_ON_INGESTION = "file_count_on_ingestion";
public static final String FILE_COUNT_PURGED = "file_count_purged";
public static final String FILE_COUNT_ON_STAGE = "file-count-on-stage";
public static final String FILE_COUNT_ON_INGESTION = "file-count-on-ingestion";
public static final String FILE_COUNT_PURGED = "file-count-purged";
public static final String FILE_COUNT_TABLE_STAGE_INGEST_FAIL =
"file_count_table_stage_ingest_fail";
public static final String FILE_COUNT_TABLE_STAGE_BROKEN_RECORD =
"file_count_table_stage_broken_record";
"file-count-table-stage-broken-record";

public static final String CLEANER_RESTART_COUNT = "cleaner_restart_count";

Expand Down

0 comments on commit 58b9247

Please sign in to comment.