Skip to content

Commit 9f76ba2

Browse files
ekaterinadimitrova2adelapena
authored andcommitted
CNDB-15897: CNDB-7197: Added SAI_INDEX_METRICS_ENABLED to be able to enable/disable IndexMetrics (#2025)
... We want to be able to enable/disable IndexMetrics ... We add a flag to be able to do that - `SAI_INDEX_METRICS_ENABLED("cassandra.sai.metrics.index.enabled", "true")` Also, added some simple unit tests to test the new parameter. --------- Co-authored-by: Andrés de la Peña <[email protected]>
1 parent c9bd53f commit 9f76ba2

File tree

7 files changed

+149
-25
lines changed

7 files changed

+149
-25
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,12 @@ public enum CassandraRelevantProperties
333333
CUSTOM_TMD_PROVIDER_PROPERTY("cassandra.custom_token_metadata_provider_class"),
334334

335335
CUSTOM_TRACING_CLASS("cassandra.custom_tracing_class"),
336+
/**
337+
* Whether to enable SAI index metrics such as memtable flush metrics, compaction metrics, and disk usage metrics.
338+
* These metrics include timers, histograms, counters, and gauges for index operations.
339+
*/
340+
SAI_INDEX_METRICS_ENABLED("cassandra.sai.metrics.index.enabled", "true"),
341+
336342
/**
337343
* If true, while creating or altering schema, NetworkTopologyStrategy won't check if the DC exists.
338344
* This is to remain compatible with older workflows that first change the replication before adding the nodes.

src/java/org/apache/cassandra/index/sai/IndexContext.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collections;
2424
import java.util.Iterator;
2525
import java.util.Objects;
26+
import java.util.Optional;
2627
import java.util.Set;
2728
import java.util.concurrent.ConcurrentHashMap;
2829
import java.util.concurrent.ConcurrentMap;
@@ -72,6 +73,7 @@
7273
import org.apache.cassandra.index.sai.iterators.KeyRangeUnionIterator;
7374
import org.apache.cassandra.index.sai.memory.MemtableIndex;
7475
import org.apache.cassandra.index.sai.memory.MemtableKeyRangeIterator;
76+
import org.apache.cassandra.index.sai.metrics.AbstractMetrics;
7577
import org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics;
7678
import org.apache.cassandra.index.sai.metrics.IndexMetrics;
7779
import org.apache.cassandra.index.sai.plan.Expression;
@@ -97,6 +99,8 @@
9799
import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_VALIDATE_MAX_TERM_SIZE_AT_COORDINATOR;
98100
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
99101
import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_INDEX_READS_DISABLED;
102+
import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_INDEX_METRICS_ENABLED;
103+
//import static org.apache.cassandra.config.CassandraRelevantProperties.VALIDATE_MAX_TERM_SIZE_AT_COORDINATOR;
100104

101105
/**
102106
* Manage metadata for each column index.
@@ -143,7 +147,8 @@ public class IndexContext
143147
private final ConcurrentMap<Memtable, MemtableIndex> liveMemtables = new ConcurrentHashMap<>();
144148

145149
private final IndexViewManager viewManager;
146-
private final IndexMetrics indexMetrics;
150+
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
151+
private final Optional<IndexMetrics> indexMetrics;
147152
private final ColumnQueryMetrics columnQueryMetrics;
148153
private final IndexWriterConfig indexWriterConfig;
149154
private final boolean isAnalyzed;
@@ -193,7 +198,7 @@ public IndexContext(@Nonnull String keyspace,
193198
this.vectorSimilarityFunction = indexWriterConfig.getSimilarityFunction();
194199
this.hasEuclideanSimilarityFunc = vectorSimilarityFunction == VectorSimilarityFunction.EUCLIDEAN;
195200

196-
this.indexMetrics = new IndexMetrics(this);
201+
this.indexMetrics = SAI_INDEX_METRICS_ENABLED.getBoolean() ? Optional.of(new IndexMetrics(this)) : Optional.empty();
197202
this.columnQueryMetrics = isVector() ? new ColumnQueryMetrics.VectorIndexMetrics(keyspace, table, getIndexName()) :
198203
isLiteral() ? new ColumnQueryMetrics.TrieIndexMetrics(keyspace, table, getIndexName())
199204
: new ColumnQueryMetrics.BKDIndexMetrics(keyspace, table, getIndexName());
@@ -211,7 +216,7 @@ public IndexContext(@Nonnull String keyspace,
211216
// null config indicates a "fake" index context. As such, it won't actually be used for indexing/accessing
212217
// data, leaving these metrics unused. This also eliminates the overhead of creating these metrics on the
213218
// query path.
214-
this.indexMetrics = null;
219+
this.indexMetrics = Optional.empty();
215220
this.columnQueryMetrics = null;
216221
}
217222

@@ -238,7 +243,7 @@ public ClusteringComparator comparator()
238243
return clusteringComparator;
239244
}
240245

241-
public IndexMetrics getIndexMetrics()
246+
public Optional<IndexMetrics> getIndexMetrics()
242247
{
243248
return indexMetrics;
244249
}
@@ -300,7 +305,7 @@ public void index(DecoratedKey key, Row row, Memtable mt, OpOrder.Group opGroup)
300305
ByteBuffer value = getValueOf(key, row, FBUtilities.nowInSeconds());
301306
target.index(key, row.clustering(), value, mt, opGroup);
302307
}
303-
indexMetrics.memtableIndexWriteLatency.update(nanoTime() - start, TimeUnit.NANOSECONDS);
308+
indexMetrics.ifPresent(metrics -> metrics.memtableIndexWriteLatency.update(nanoTime() - start, TimeUnit.NANOSECONDS));
304309
}
305310

306311
/**
@@ -693,8 +698,9 @@ public void invalidate(boolean obsolete)
693698
dropped = true;
694699
liveMemtables.clear();
695700
viewManager.invalidate(obsolete);
696-
indexMetrics.release();
697-
columnQueryMetrics.release();
701+
indexMetrics.ifPresent(AbstractMetrics::release);
702+
if (columnQueryMetrics != null)
703+
columnQueryMetrics.release();
698704

699705
analyzerFactory.close();
700706
if (queryAnalyzerFactory != analyzerFactory)

src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void complete(Stopwatch stopwatch) throws IOException
145145
catch (Throwable t)
146146
{
147147
logger.error(perIndexComponents.logMessage("Error while flushing index {}"), t.getMessage(), t);
148-
indexContext().getIndexMetrics().memtableIndexFlushErrors.inc();
148+
indexContext().getIndexMetrics().ifPresent(m -> m.memtableIndexFlushErrors.inc());
149149

150150
throw t;
151151
}
@@ -263,7 +263,7 @@ private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwa
263263
{
264264
perIndexComponents.markComplete();
265265

266-
indexContext().getIndexMetrics().memtableIndexFlushCount.inc();
266+
indexContext().getIndexMetrics().ifPresent(m -> m.memtableIndexFlushCount.inc());
267267

268268
long elapsedTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
269269

@@ -273,6 +273,7 @@ private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwa
273273
elapsedTime - startTime,
274274
elapsedTime);
275275

276-
indexContext().getIndexMetrics().memtableFlushCellsPerSecond.update((long) (cellCount * 1000.0 / Math.max(1, elapsedTime - startTime)));
276+
indexContext().getIndexMetrics()
277+
.ifPresent(m -> m.memtableFlushCellsPerSecond.update((long) (cellCount * 1000.0 / Math.max(1, elapsedTime - startTime))));
277278
}
278279
}

src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,11 @@ public void complete(Stopwatch stopwatch) throws IOException
193193
}
194194
finally
195195
{
196-
if (indexContext.getIndexMetrics() != null)
197-
{
198-
indexContext.getIndexMetrics().segmentsPerCompaction.update(segments.size());
196+
indexContext.getIndexMetrics().ifPresent(m -> {
197+
m.segmentsPerCompaction.update(segments.size());
199198
segments.clear();
200-
indexContext.getIndexMetrics().compactionCount.inc();
201-
}
199+
m.compactionCount.inc();
200+
});
202201
}
203202
}
204203

@@ -327,12 +326,12 @@ private void flushSegment() throws IOException
327326
segments.add(segmentMetadata);
328327

329328
double rowCount = segmentMetadata.numRows;
330-
if (indexContext.getIndexMetrics() != null)
331-
indexContext.getIndexMetrics().compactionSegmentCellsPerSecond.update((long)(rowCount / flushMillis * 1000.0));
332-
333329
double segmentBytes = segmentMetadata.componentMetadatas.indexSize();
334-
if (indexContext.getIndexMetrics() != null)
335-
indexContext.getIndexMetrics().compactionSegmentBytesPerSecond.update((long)(segmentBytes / flushMillis * 1000.0));
330+
331+
indexContext.getIndexMetrics().ifPresent(m -> {
332+
m.compactionSegmentCellsPerSecond.update((long)(rowCount / flushMillis * 1000.0));
333+
m.compactionSegmentBytesPerSecond.update((long)(segmentBytes / flushMillis * 1000.0));
334+
});
336335

337336
logger.debug("Flushed segment with {} cells for a total of {} in {} ms for index {} with starting row id {} for sstable {}",
338337
(long) rowCount, FBUtilities.prettyPrintMemory((long) segmentBytes), flushMillis, indexContext.getIndexName(),
@@ -354,7 +353,7 @@ private void flushSegment() throws IOException
354353
logger.error("Failed to build index for SSTable {}", perIndexComponents.descriptor(), t);
355354
perIndexComponents.forceDeleteAllComponents();
356355

357-
indexContext.getIndexMetrics().segmentFlushErrors.inc();
356+
indexContext.getIndexMetrics().ifPresent(m -> m.segmentFlushErrors.inc());
358357

359358
throw t;
360359
}

src/java/org/apache/cassandra/index/sai/plan/QueryController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,8 +383,8 @@ private void updateIndexMetricsQueriesCount(Plan plan)
383383
queriedIndexesContexts.add(indexContext);
384384
return Plan.ControlFlow.Continue;
385385
});
386-
queriedIndexesContexts.forEach(indexContext ->
387-
indexContext.getIndexMetrics().queriesCount.inc());
386+
queriedIndexesContexts.forEach(indexContext -> indexContext.getIndexMetrics()
387+
.ifPresent(m -> m.queriesCount.inc()));
388388
}
389389

390390
Plan buildPlan()

test/unit/org/apache/cassandra/index/sai/SAITester.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,11 @@ protected Object getMetricValue(ObjectName metricObjectName)
476476
return metricValue;
477477
}
478478

479+
protected void assertMetricExists(ObjectName name)
480+
{
481+
Assertions.assertThatNoException().isThrownBy(() -> getMetricValue(name));
482+
}
483+
479484
protected void assertMetricDoesNotExist(ObjectName name)
480485
{
481486
Assertions.assertThatThrownBy(() -> getMetricValue(name)).hasRootCauseInstanceOf(InstanceNotFoundException.class);

test/unit/org/apache/cassandra/index/sai/metrics/IndexMetricsTest.java

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.concurrent.TimeUnit;
2121

22+
import org.apache.cassandra.config.CassandraRelevantProperties;
2223
import org.junit.Test;
2324

2425
import com.datastax.driver.core.ResultSet;
@@ -27,9 +28,14 @@
2728
import org.apache.cassandra.utils.Throwables;
2829
import org.assertj.core.api.Assertions;
2930

31+
import javax.management.ObjectName;
32+
3033
import static org.assertj.core.api.Assertions.assertThatThrownBy;
31-
import static org.junit.Assert.assertEquals;
32-
import static org.junit.Assert.assertTrue;
34+
import static org.junit.Assert.*;
35+
36+
import org.apache.cassandra.inject.Injection;
37+
import org.apache.cassandra.inject.Injections;
38+
import org.apache.cassandra.index.sai.disk.v1.MemtableIndexWriter;
3339

3440
public class IndexMetricsTest extends AbstractMetricsTest
3541
{
@@ -152,6 +158,73 @@ public void testMetricsThroughWriteLifecycle()
152158
waitForHistogramMeanBetween(objectName("CompactionSegmentCellsPerSecond", KEYSPACE, table, index, "IndexMetrics"), 1.0, 1000000.0);
153159
}
154160

161+
@Test
162+
public void testIndexMetricsEnabledAndDisabled()
163+
{
164+
testIndexMetrics(true);
165+
testIndexMetrics(false);
166+
}
167+
168+
private void testIndexMetrics(boolean metricsEnabled)
169+
{
170+
// Set the property before creating any indexes
171+
CassandraRelevantProperties.SAI_INDEX_METRICS_ENABLED.setBoolean(metricsEnabled);
172+
173+
try
174+
{
175+
String table = createTable("CREATE TABLE %s (ID1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
176+
"{'class' : 'SizeTieredCompactionStrategy', 'enabled' : false }");
177+
String index = createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s (v1) USING 'StorageAttachedIndex'");
178+
179+
// Test all Gauge metrics
180+
assertMetricExistsIfEnabled(metricsEnabled, "SSTableCellCount", table, index);
181+
assertMetricExistsIfEnabled(metricsEnabled, "LiveMemtableIndexWriteCount", table, index);
182+
assertMetricExistsIfEnabled(metricsEnabled, "DiskUsedBytes", table, index);
183+
assertMetricExistsIfEnabled(metricsEnabled, "MemtableOnHeapIndexBytes", table, index);
184+
assertMetricExistsIfEnabled(metricsEnabled, "MemtableOffHeapIndexBytes", table, index);
185+
assertMetricExistsIfEnabled(metricsEnabled, "IndexFileCacheBytes", table, index);
186+
187+
// Test all Counter metrics
188+
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCount", table, index);
189+
assertMetricExistsIfEnabled(metricsEnabled, "CompactionCount", table, index);
190+
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushErrors", table, index);
191+
assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentFlushErrors", table, index);
192+
assertMetricExistsIfEnabled(metricsEnabled, "QueriesCount", table, index);
193+
194+
// Test all Histogram metrics
195+
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCellsPerSecond", table, index);
196+
assertMetricExistsIfEnabled(metricsEnabled, "SegmentsPerCompaction", table, index);
197+
assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentCellsPerSecond", table, index);
198+
assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentBytesPerSecond", table, index);
199+
200+
// Test Timer metrics
201+
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index);
202+
203+
// Test indexing operations to ensure null indexMetrics is handled gracefully
204+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('0', 0, '0')");
205+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('1', 1, '1')");
206+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('2', 2, '2')");
207+
208+
// Verify MemtableIndexWriteLatency metric behavior after indexing operations
209+
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index);
210+
}
211+
finally
212+
{
213+
// Reset property to default
214+
CassandraRelevantProperties.SAI_INDEX_METRICS_ENABLED.setBoolean(true);
215+
}
216+
}
217+
218+
private void assertMetricExistsIfEnabled(boolean shouldExist, String metricName, String table, String index)
219+
{
220+
ObjectName name = objectName(metricName, KEYSPACE, table, index, "IndexMetrics");
221+
222+
if (shouldExist)
223+
assertMetricExists(name);
224+
else
225+
assertMetricDoesNotExist(name);
226+
}
227+
155228
private void assertIndexQueryCount(String index, long expectedCount)
156229
{
157230
assertEquals(expectedCount,
@@ -198,4 +271,38 @@ public void testQueriesCount()
198271
assertIndexQueryCount(indexV2, 2L);
199272
assertIndexQueryCount(indexV3, 1L);
200273
}
274+
275+
@Test
276+
public void testMemtableIndexFlushErrorIncrementsMetric() throws Throwable
277+
{
278+
String table = createTable("CREATE TABLE %s (ID1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
279+
"{'class' : 'SizeTieredCompactionStrategy', 'enabled' : false }");
280+
String index = createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s (v1) USING 'StorageAttachedIndex'");
281+
282+
// Write some data to ensure there is something to flush
283+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('0', 0, '0')");
284+
285+
assertEquals(0L, getMetricValue(objectName("MemtableIndexFlushErrors", KEYSPACE, table, index, "IndexMetrics")));
286+
287+
// Inject a failure at the entry of MemtableIndexWriter#flush(...) to force a flush error
288+
Injection failure = newFailureOnEntry("sai_memtable_flush_error", MemtableIndexWriter.class, "flush", RuntimeException.class);
289+
Injections.inject(failure);
290+
291+
try
292+
{
293+
// Trigger a flush, which should hit the injected failure
294+
flush(KEYSPACE, table);
295+
}
296+
catch (Throwable ignored)
297+
{
298+
// Expected due to injected failure
299+
}
300+
finally
301+
{
302+
failure.disable();
303+
}
304+
305+
// Verify the memtable index flush error metric is incremented
306+
assertEquals(1L, getMetricValue(objectName("MemtableIndexFlushErrors", KEYSPACE, table, index, "IndexMetrics")));
307+
}
201308
}

0 commit comments

Comments
 (0)