Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,12 @@ public enum CassandraRelevantProperties
*/
SAI_INDEX_METRICS_ENABLED("cassandra.sai.metrics.index.enabled", "true"),

/**
* Whether to enable SAI table state metrics such as disk usage, queryable index count, and index build progress.
* These metrics include gauges for table-level SAI state information.
*/
SAI_TABLE_STATE_METRICS_ENABLED("cassandra.sai.metrics.table_state.enabled", "true"),

/**
* If true, while creating or altering schema, NetworkTopologyStrategy won't check if the DC exists.
* This is to remain compatible with older workflows that first change the replication before adding the nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
import org.apache.cassandra.utils.Throwables;
import org.apache.lucene.index.CorruptIndexException;

import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_TABLE_STATE_METRICS_ENABLED;

/**
* Orchestrates building of storage-attached indices, and manages lifecycle of resources shared between them.
*/
Expand All @@ -85,7 +87,8 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons
public static final Index.Group.Key GROUP_KEY = new Index.Group.Key(StorageAttachedIndexGroup.class);

private final TableQueryMetrics queryMetrics;
private final TableStateMetrics stateMetrics;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private final Optional<TableStateMetrics> stateMetrics;
private final IndexGroupMetrics groupMetrics;

private final Set<StorageAttachedIndex> indices = ConcurrentHashMap.newKeySet();
Expand All @@ -100,7 +103,9 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons
{
this.baseCfs = baseCfs;
this.queryMetrics = new TableQueryMetrics(baseCfs.metadata());
this.stateMetrics = new TableStateMetrics(baseCfs.metadata(), this);
this.stateMetrics = SAI_TABLE_STATE_METRICS_ENABLED.getBoolean()
? Optional.of(new TableStateMetrics(baseCfs.metadata(), this))
: Optional.empty();
this.groupMetrics = new IndexGroupMetrics(baseCfs.metadata(), this);
this.contextManager = new SSTableContextManager(baseCfs.getTracker());
this.version = Version.current(baseCfs.keyspace.getName());
Expand Down Expand Up @@ -164,7 +169,7 @@ public void invalidate()
// in case of removing last index from group, sstable contexts should already been removed by StorageAttachedIndexGroup#removeIndex
queryMetrics.release();
groupMetrics.release();
stateMetrics.release();
stateMetrics.ifPresent(TableStateMetrics::release);
baseCfs.getTracker().unsubscribe(this);
}

Expand All @@ -176,7 +181,7 @@ public void unload()
contextManager.clear();
queryMetrics.release();
groupMetrics.release();
stateMetrics.release();
stateMetrics.ifPresent(TableStateMetrics::release);
}

@Override
Expand Down Expand Up @@ -473,6 +478,12 @@ public TableQueryMetrics queryMetrics()
return queryMetrics;
}

// Needed by CNDB
public Optional<TableStateMetrics> stateMetrics()
{
return stateMetrics;
}

public ColumnFamilyStore table()
{
return baseCfs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class TableStateMetrics extends AbstractMetrics
{
public static final String TABLE_STATE_METRIC_TYPE = "TableStateMetrics";

private final Gauge diskUsageBytes;
// Visible for CNDB
public final Gauge diskUsageBytes;
private final Gauge diskUsagePercentageOfBaseTable;
private final Gauge totalIndexCount;
private final Gauge totalIndexBuildsInProgress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.cassandra.index.sai.SAITester;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class DiskSpaceTest extends SAITester
{
Expand Down Expand Up @@ -52,4 +53,63 @@ public void testTableTotalDiskSpaceUsed() throws Throwable
assertEquals(sstableSize, totalDiskSpaceUsed());
verifyIndexComponentsNotIncludedInSSTable();
}

@Test
public void testDiskUsageWithCompactionAfterUpdates() throws Throwable
{
createTable(CREATE_TABLE_TEMPLATE);

// Insert initial data and flush to create first SSTable
int rows = 100;
for (int j = 0; j < rows; j++)
execute("INSERT INTO %s (id1, v1) VALUES (?, ?)", Integer.toString(j), j);

flush();

assertEquals("Should have 1 SSTable after first flush", 1, getCurrentColumnFamilyStore().getLiveSSTables().size());
long sstableSizeAfterFirstFlush = totalDiskSpaceUsed();

// Update the same keys with different values to create overlapping data
for (int j = 0; j < rows; j++)
execute("INSERT INTO %s (id1, v1) VALUES (?, ?)", Integer.toString(j), j + 1000);

flush();

assertEquals("Should have 2 SSTables after second flush", 2, getCurrentColumnFamilyStore().getLiveSSTables().size());
long sstableSizeBeforeCompaction = totalDiskSpaceUsed();

// Verify that we have more disk usage with 2 SSTables containing overlapping data
assertTrue("Disk usage should increase with overlapping SSTables",
sstableSizeBeforeCompaction > sstableSizeAfterFirstFlush);

// Compact to merge the updates before creating index
compact();
waitForCompactionsFinished();

assertEquals("Should have 1 SSTable after compaction", 1, getCurrentColumnFamilyStore().getLiveSSTables().size());
long sstableSizeAfterCompaction = totalDiskSpaceUsed();

// Compaction should reduce disk usage by merging duplicate keys
assertTrue("Compaction should reduce disk usage by merging duplicates",
sstableSizeAfterCompaction < sstableSizeBeforeCompaction);

// Create index on the compacted SSTable
String indexName = createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1"));

long indexSize = indexDiskSpaceUse();
long sstableSizeWithIndex = totalDiskSpaceUsed();

assertTrue("Index size should be positive", indexSize > 0);

// Verify that total disk usage equals base table plus index after compaction
assertEquals("Total disk should equal base table plus index after compaction",
sstableSizeAfterCompaction + indexSize, sstableSizeWithIndex);
verifyIndexComponentsIncludedInSSTable();

// Drop index and verify disk space accounting
dropIndex("DROP INDEX %s." + indexName);
assertEquals("After dropping index, disk usage should equal base table only",
sstableSizeAfterCompaction, totalDiskSpaceUsed());
verifyIndexComponentsNotIncludedInSSTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.datastax.driver.core.ResultSet;

import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_TABLE_STATE_METRICS_ENABLED;
import static org.apache.cassandra.index.sai.metrics.TableStateMetrics.TABLE_STATE_METRIC_TYPE;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
Expand All @@ -49,9 +50,9 @@ public void testMetricRelease()
createTable(String.format(CREATE_TABLE_TEMPLATE, keyspace, table));
createIndex(String.format(CREATE_INDEX_TEMPLATE, index, keyspace, table, "v1"));

execute("INSERT INTO " + keyspace + "." + table + " (id1, v1, v2) VALUES ('0', 0, '0')");
execute("INSERT INTO " + keyspace + '.' + table + " (id1, v1, v2) VALUES ('0', 0, '0')");

ResultSet rows = executeNet("SELECT id1 FROM " + keyspace + "." + table + " WHERE v1 = 0");
ResultSet rows = executeNet("SELECT id1 FROM " + keyspace + '.' + table + " WHERE v1 = 0");
assertEquals(1, rows.all().size());
assertEquals(1L, getTableStateMetrics(keyspace, table, "TotalIndexCount"));

Expand All @@ -75,14 +76,14 @@ public void testMetricCreation()
createIndex(String.format(CREATE_INDEX_TEMPLATE, index+"_v1", keyspace, table, "v1"));
createIndex(String.format(CREATE_INDEX_TEMPLATE, index+"_v2", keyspace, table, "v2"));

execute("INSERT INTO " + keyspace + "." + table + " (id1, v1, v2) VALUES ('0', 0, '0')");
execute("INSERT INTO " + keyspace + "." + table + " (id1, v1, v2) VALUES ('1', 1, '1')");
execute("INSERT INTO " + keyspace + "." + table + " (id1, v1, v2) VALUES ('2', 2, '2')");
execute("INSERT INTO " + keyspace + "." + table + " (id1, v1, v2) VALUES ('3', 3, '3')");
execute("INSERT INTO " + keyspace + '.' + table + " (id1, v1, v2) VALUES ('0', 0, '0')");
execute("INSERT INTO " + keyspace + '.' + table + " (id1, v1, v2) VALUES ('1', 1, '1')");
execute("INSERT INTO " + keyspace + '.' + table + " (id1, v1, v2) VALUES ('2', 2, '2')");
execute("INSERT INTO " + keyspace + '.' + table + " (id1, v1, v2) VALUES ('3', 3, '3')");

flush(keyspace, table);

ResultSet rows = executeNet("SELECT id1, v1, v2 FROM " + keyspace + "." + table + " WHERE v1 >= 0");
ResultSet rows = executeNet("SELECT id1, v1, v2 FROM " + keyspace + '.' + table + " WHERE v1 >= 0");

int actualRows = rows.all().size();
assertEquals(4, actualRows);
Expand All @@ -98,4 +99,55 @@ private int getTableStateMetrics(String keyspace, String table, String metricsNa
{
return (int) getMetricValue(objectNameNoIndex(metricsName, keyspace, table, TABLE_STATE_METRIC_TYPE));
}

@Test
public void testTableStateMetricsEnabledAndDisabled()
{
testTableStateMetrics(true);
testTableStateMetrics(false);
}

private void testTableStateMetrics(boolean metricsEnabled)
{
SAI_TABLE_STATE_METRICS_ENABLED.setBoolean(metricsEnabled);

try
{
String table = "test_table_state_metrics_" + (metricsEnabled ? "enabled" : "disabled");
String index = "test_index_" + (metricsEnabled ? "enabled" : "disabled");

String keyspace = createKeyspace(CREATE_KEYSPACE_TEMPLATE);
createTable(String.format(CREATE_TABLE_TEMPLATE, keyspace, table));
createIndex(String.format(CREATE_INDEX_TEMPLATE, index, keyspace, table, "v1"));

// Test all TableStateMetrics Gauge metrics
assertTableStateMetricExistsIfEnabled(metricsEnabled, "TotalIndexCount", keyspace, table);
assertTableStateMetricExistsIfEnabled(metricsEnabled, "TotalQueryableIndexCount", keyspace, table);
assertTableStateMetricExistsIfEnabled(metricsEnabled, "TotalIndexBuildsInProgress", keyspace, table);
assertTableStateMetricExistsIfEnabled(metricsEnabled, "DiskUsedBytes", keyspace, table);
assertTableStateMetricExistsIfEnabled(metricsEnabled, "DiskPercentageOfBaseTable", keyspace, table);

// Test indexing operations to ensure null stateMetrics is handled gracefully
execute("INSERT INTO " + keyspace + '.' + table + " (id1, v1, v2) VALUES ('0', 0, '0')");
execute("INSERT INTO " + keyspace + '.' + table + " (id1, v1, v2) VALUES ('1', 1, '1')");
execute("INSERT INTO " + keyspace + '.' + table + " (id1, v1, v2) VALUES ('2', 2, '2')");

// Verify metrics still behave correctly after operations
assertTableStateMetricExistsIfEnabled(metricsEnabled, "TotalIndexCount", keyspace, table);
assertTableStateMetricExistsIfEnabled(metricsEnabled, "DiskUsedBytes", keyspace, table);
}
finally
{
// Reset property to default
SAI_TABLE_STATE_METRICS_ENABLED.setBoolean(true);
}
}

void assertTableStateMetricExistsIfEnabled(boolean shouldExist, String metricName, String keyspace, String table)
{
if (shouldExist)
assertMetricExists(objectNameNoIndex(metricName, keyspace, table, TABLE_STATE_METRIC_TYPE));
else
assertMetricDoesNotExist(objectNameNoIndex(metricName, keyspace, table, TABLE_STATE_METRIC_TYPE));
}
}