Skip to content

Commit 7c3c804

Browse files
Merge remote-tracking branch 'datastax/main' into revive-fused-adc
2 parents 3afcb24 + 8277624 commit 7c3c804

23 files changed

+1626
-88
lines changed

src/java/org/apache/cassandra/cql3/QueryEvents.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@
3434

3535
import org.apache.cassandra.cql3.statements.AuthenticationStatement;
3636
import org.apache.cassandra.cql3.statements.BatchStatement;
37+
import org.apache.cassandra.exceptions.InvalidRequestException;
38+
import org.apache.cassandra.exceptions.RequestFailureException;
39+
import org.apache.cassandra.exceptions.RequestTimeoutException;
40+
import org.apache.cassandra.exceptions.UnavailableException;
41+
import org.apache.cassandra.exceptions.WriteTimeoutException;
42+
import org.apache.cassandra.metrics.ClientRequestsMetrics;
43+
import org.apache.cassandra.metrics.ClientRequestsMetricsProvider;
3744
import org.apache.cassandra.service.QueryState;
3845
import org.apache.cassandra.transport.Message;
3946
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -84,12 +91,31 @@ public void notifyQuerySuccess(CQLStatement statement,
8491
}
8592
}
8693

94+
private void updateMetrics(CQLStatement statement, Exception cause)
95+
{
96+
if (statement instanceof CQLStatement.SingleKeyspaceCqlStatement)
97+
{
98+
ClientRequestsMetrics metrics = ClientRequestsMetricsProvider.instance.metrics(((CQLStatement.SingleKeyspaceCqlStatement) statement).keyspace());
99+
if (cause instanceof InvalidRequestException)
100+
metrics.allRequestsMetrics.invalid.mark();
101+
else if (cause instanceof UnavailableException)
102+
metrics.allRequestsMetrics.unavailables.mark();
103+
else if (cause instanceof RequestTimeoutException)
104+
metrics.allRequestsMetrics.timeouts.mark();
105+
else if (cause instanceof RequestFailureException)
106+
metrics.allRequestsMetrics.failures.mark();
107+
else
108+
metrics.allRequestsMetrics.otherErrors.mark();
109+
}
110+
}
111+
87112
public void notifyQueryFailure(CQLStatement statement,
88113
String query,
89114
QueryOptions options,
90115
QueryState state,
91116
Exception cause)
92117
{
118+
updateMetrics(statement, cause);
93119
try
94120
{
95121
final String maybeObfuscatedQuery = listeners.size() > 0 ? maybeObfuscatePassword(statement, query) : query;
@@ -129,6 +155,9 @@ public void notifyExecuteFailure(QueryHandler.Prepared prepared,
129155
Exception cause)
130156
{
131157
CQLStatement statement = prepared != null ? prepared.statement : null;
158+
159+
updateMetrics(statement, cause);
160+
132161
String query = prepared != null ? prepared.statement.getRawCQLStatement() : null;
133162
try
134163
{
@@ -183,6 +212,10 @@ public void notifyBatchFailure(List<QueryHandler.Prepared> prepared,
183212
queries.add(p.statement.getRawCQLStatement());
184213
});
185214
}
215+
216+
if (!statements.isEmpty())
217+
updateMetrics(statements.get(0), cause);
218+
186219
try
187220
{
188221
for (Listener listener : listeners)

src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,35 @@ public static CommitLogReplayer construct(CommitLog commitLog, UUID localHostId)
180180
cfPersisted.put(cfs.metadata.id, filter);
181181
}
182182
CommitLogPosition globalPosition = firstNotCovered(cfPersisted.values());
183-
logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
183+
184+
// Limit the amount of column family data logged to prevent massive log lines
185+
if (logger.isDebugEnabled())
186+
{
187+
int maxColumnFamiliesToLog = 10;
188+
int cfCount = cfPersisted.size();
189+
if (cfCount <= maxColumnFamiliesToLog)
190+
{
191+
logger.debug("Global replay position is {} from {} columnfamilies: {}",
192+
globalPosition, cfCount, FBUtilities.toString(cfPersisted));
193+
}
194+
else
195+
{
196+
// For large numbers of column families, just log the count and a sample
197+
Map<TableId, IntervalSet<CommitLogPosition>> sample = new HashMap<>();
198+
int count = 0;
199+
for (Map.Entry<TableId, IntervalSet<CommitLogPosition>> entry : cfPersisted.entrySet())
200+
{
201+
if (count++ >= maxColumnFamiliesToLog)
202+
break;
203+
sample.put(entry.getKey(), entry.getValue());
204+
}
205+
logger.debug("Global replay position is {} from {} columnfamilies (showing first {}): {}",
206+
globalPosition, cfCount, maxColumnFamiliesToLog, FBUtilities.toString(sample));
207+
logger.debug("Use TRACE level to see all {} columnfamilies", cfCount);
208+
logger.trace("Full columnfamilies list: {}", FBUtilities.toString(cfPersisted));
209+
}
210+
}
211+
184212
return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
185213
}
186214

@@ -429,7 +457,25 @@ public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTable
429457

430458
if (!skippedSSTables.isEmpty()) {
431459
logger.warn("Origin of {} sstables is unknown or doesn't match the local node; commitLogIntervals for them were ignored", skippedSSTables.size());
432-
logger.debug("Ignored commitLogIntervals from the following sstables: {}", skippedSSTables);
460+
461+
// Limit the number of SSTable names logged to prevent massive log lines
462+
int maxSSTablesToLog = 100;
463+
if (skippedSSTables.size() <= maxSSTablesToLog) {
464+
logger.debug("Ignored commitLogIntervals from the following sstables: {}", skippedSSTables);
465+
} else {
466+
List<String> sample = new ArrayList<>();
467+
int count = 0;
468+
for (String sstable : skippedSSTables)
469+
{
470+
if (count++ >= maxSSTablesToLog)
471+
break;
472+
sample.add(sstable);
473+
}
474+
logger.debug("Ignored commitLogIntervals from {} sstables (showing first {}): {}",
475+
skippedSSTables.size(), maxSSTablesToLog, sample);
476+
logger.debug("Use TRACE level to see all {} skipped sstables", skippedSSTables.size());
477+
logger.trace("Full list of ignored sstables: {}", skippedSSTables);
478+
}
433479
}
434480

435481
if (truncatedAt != null)

src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31+
import org.apache.cassandra.config.DatabaseDescriptor;
3132
import org.apache.cassandra.db.ColumnFamilyStore;
3233
import org.apache.cassandra.db.Keyspace;
3334
import org.apache.cassandra.db.Mutation;
@@ -177,7 +178,7 @@ private boolean hasViews(ColumnFamilyStore cfs)
177178

178179
private boolean hasCDC(ColumnFamilyStore cfs)
179180
{
180-
return cfs.metadata().params.cdc;
181+
return DatabaseDescriptor.isCDCEnabled() && cfs.metadata().params.cdc;
181182
}
182183

183184
/*

src/java/org/apache/cassandra/index/sai/disk/PostingListKeyRangeIterator.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ protected PrimaryKey computeNext()
115115
if (rowId == PostingList.END_OF_STREAM)
116116
return endOfData();
117117

118-
return new PrimaryKeyWithSource(primaryKeyMap, rowId, searcherContext.minimumKey, searcherContext.maximumKey);
118+
return primaryKeyMap.primaryKeyFromRowId(rowId, searcherContext.minimumKey, searcherContext.maximumKey);
119119
}
120120
catch (Throwable t)
121121
{
@@ -160,20 +160,11 @@ private long getNextRowId() throws IOException
160160
long segmentRowId;
161161
if (needsSkipping)
162162
{
163-
long targetSstableRowId;
164-
if (skipToToken instanceof PrimaryKeyWithSource
165-
&& ((PrimaryKeyWithSource) skipToToken).getSourceSstableId().equals(primaryKeyMap.getSSTableId()))
163+
long targetSstableRowId = primaryKeyMap.ceiling(skipToToken);
164+
// skipToToken is larger than max token in token file
165+
if (targetSstableRowId < 0)
166166
{
167-
targetSstableRowId = ((PrimaryKeyWithSource) skipToToken).getSourceRowId();
168-
}
169-
else
170-
{
171-
targetSstableRowId = primaryKeyMap.ceiling(skipToToken);
172-
// skipToToken is larger than max token in token file
173-
if (targetSstableRowId < 0)
174-
{
175-
return PostingList.END_OF_STREAM;
176-
}
167+
return PostingList.END_OF_STREAM;
177168
}
178169
int targetSegmentRowId = Math.toIntExact(targetSstableRowId - searcherContext.getSegmentRowIdOffset());
179170
segmentRowId = postingList.advance(targetSegmentRowId);

src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.Closeable;
2222
import java.io.IOException;
2323

24+
import javax.annotation.Nonnull;
2425
import javax.annotation.concurrent.NotThreadSafe;
2526

2627
import org.apache.cassandra.index.sai.utils.PrimaryKey;
@@ -84,6 +85,24 @@ default void close() throws IOException
8485
*/
8586
PrimaryKey primaryKeyFromRowId(long sstableRowId);
8687

88+
/**
89+
* Returns a {@link PrimaryKey} for a row Id
90+
*
91+
* Note: the lower and upper bounds are used to avoid reading the primary key from disk in the event
92+
* that compared primary keys are in non-overlapping ranges. The ranges can be within the table, and must
93+
* contain the row id. This requirement is not validated, as validation would remove the performance benefit
94+
* of this optimization.
95+
*
96+
* @param sstableRowId the row Id to lookup
97+
* @param lowerBound the inclusive lower bound of the primary key being created
98+
* @param upperBound the inclusive upper bound of the primary key being created
99+
* @return the {@link PrimaryKey} associated with the row Id
100+
*/
101+
default PrimaryKey primaryKeyFromRowId(long sstableRowId, @Nonnull PrimaryKey lowerBound, @Nonnull PrimaryKey upperBound)
102+
{
103+
return primaryKeyFromRowId(sstableRowId);
104+
}
105+
87106
/**
88107
* Returns a row Id for a {@link PrimaryKey}. If there is no such term, returns the `-(next row id) - 1` where
89108
* `next row id` is the row id of the next greatest {@link PrimaryKey} in the map.

src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.cassandra.index.sai.SSTableContext;
3939
import org.apache.cassandra.index.sai.disk.ModernResettableByteBuffersIndexOutput;
4040
import org.apache.cassandra.index.sai.disk.PostingList;
41-
import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource;
4241
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
4342
import org.apache.cassandra.index.sai.disk.format.Version;
4443
import org.apache.cassandra.index.sai.disk.io.IndexInput;
@@ -181,10 +180,9 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version,
181180
// We need to load eagerly to allow us to close the partition key map.
182181
min = pkm.primaryKeyFromRowId(minSSTableRowId).loadDeferred();
183182
max = pkm.primaryKeyFromRowId(maxSSTableRowId).loadDeferred();
183+
this.minKey = pkm.primaryKeyFromRowId(minSSTableRowId, min, max).loadDeferred();
184+
this.maxKey = pkm.primaryKeyFromRowId(maxSSTableRowId, min, max).loadDeferred();
184185
}
185-
186-
this.minKey = new PrimaryKeyWithSource(min, sstableContext.sstable.getId(), minSSTableRowId, min, max);
187-
this.maxKey = new PrimaryKeyWithSource(max, sstableContext.sstable.getId(), maxSSTableRowId, min, max);
188186
}
189187
else
190188
{

src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyWithSource.java renamed to src/java/org/apache/cassandra/index/sai/disk/v2/PrimaryKeyWithSource.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,28 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.cassandra.index.sai.disk;
19+
package org.apache.cassandra.index.sai.disk.v2;
2020

2121
import io.github.jbellis.jvector.util.RamUsageEstimator;
2222
import org.apache.cassandra.db.Clustering;
2323
import org.apache.cassandra.db.DecoratedKey;
2424
import org.apache.cassandra.dht.Token;
25+
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
2526
import org.apache.cassandra.index.sai.utils.PrimaryKey;
2627
import org.apache.cassandra.io.sstable.SSTableId;
2728
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
2829
import org.apache.cassandra.utils.bytecomparable.ByteSource;
2930

30-
public class PrimaryKeyWithSource implements PrimaryKey
31+
class PrimaryKeyWithSource implements PrimaryKey
3132
{
32-
private final PrimaryKeyMap primaryKeyMap;
3333
private final SSTableId<?> sourceSstableId;
3434
private final long sourceRowId;
3535
private PrimaryKey delegatePrimaryKey;
36+
private PrimaryKeyMap primaryKeyMap;
3637
private final PrimaryKey sourceSstableMinKey;
3738
private final PrimaryKey sourceSstableMaxKey;
3839

39-
public PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
40+
PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
4041
{
4142
this.primaryKeyMap = primaryKeyMap;
4243
this.sourceSstableId = primaryKeyMap.getSSTableId();
@@ -45,20 +46,13 @@ public PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, Prim
4546
this.sourceSstableMaxKey = sourceSstableMaxKey;
4647
}
4748

48-
public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId<?> sourceSstableId, long sourceRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
49-
{
50-
this.delegatePrimaryKey = primaryKey;
51-
this.primaryKeyMap = null;
52-
this.sourceSstableId = sourceSstableId;
53-
this.sourceRowId = sourceRowId;
54-
this.sourceSstableMinKey = sourceSstableMinKey;
55-
this.sourceSstableMaxKey = sourceSstableMaxKey;
56-
}
57-
5849
private PrimaryKey primaryKey()
5950
{
6051
if (delegatePrimaryKey == null)
52+
{
6153
delegatePrimaryKey = primaryKeyMap.primaryKeyFromRowId(sourceRowId);
54+
primaryKeyMap = null; // Removes the no longer needed reference to the primary key map.
55+
}
6256

6357
return delegatePrimaryKey;
6458
}
@@ -74,13 +68,10 @@ public SSTableId<?> getSourceSstableId()
7468
}
7569

7670
@Override
77-
public PrimaryKeyWithSource forStaticRow()
71+
public PrimaryKey forStaticRow()
7872
{
79-
return new PrimaryKeyWithSource(primaryKey().forStaticRow(),
80-
sourceSstableId,
81-
sourceRowId,
82-
sourceSstableMinKey,
83-
sourceSstableMaxKey);
73+
// We cannot use row awareness if we need a static row.
74+
return primaryKey().forStaticRow();
8475
}
8576

8677
@Override
@@ -104,7 +95,8 @@ public Clustering clustering()
10495
@Override
10596
public PrimaryKey loadDeferred()
10697
{
107-
return primaryKey().loadDeferred();
98+
primaryKey().loadDeferred();
99+
return this;
108100
}
109101

110102
@Override

src/java/org/apache/cassandra/index/sai/disk/v2/RowAwarePrimaryKeyFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.cassandra.db.ClusteringComparator;
2929
import org.apache.cassandra.db.DecoratedKey;
3030
import org.apache.cassandra.dht.Token;
31+
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
3132
import org.apache.cassandra.index.sai.utils.PrimaryKey;
3233
import org.apache.cassandra.utils.ByteBufferUtil;
3334
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
@@ -67,6 +68,11 @@ public PrimaryKey create(DecoratedKey partitionKey, Clustering clustering)
6768
return new RowAwarePrimaryKey(partitionKey.getToken(), partitionKey, clustering, null);
6869
}
6970

71+
PrimaryKey createWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
72+
{
73+
return new PrimaryKeyWithSource(primaryKeyMap, sstableRowId, sourceSstableMinKey, sourceSstableMaxKey);
74+
}
75+
7076
private class RowAwarePrimaryKey implements PrimaryKey
7177
{
7278
private Token token;

0 commit comments

Comments
 (0)