diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index dcd791caf306..00367ffa888a 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@ -33,7 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Timer.Context; +import org.apache.cassandra.metrics.Timer.Context; import net.nicoulaj.compilecommand.annotations.DontInline; import org.apache.cassandra.concurrent.Interruptible; import org.apache.cassandra.concurrent.Interruptible.TerminateException; @@ -338,7 +338,7 @@ void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom) { do { - WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time(), Context::stop); + WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.startTime(), Context::stop); if (availableSegment == null && allocatingFrom == currentAllocatingFrom) prepared.awaitUninterruptibly(); else diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index cd3eb56105d6..764e79a528df 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -32,7 +32,7 @@ import org.apache.cassandra.utils.concurrent.Semaphore; import org.apache.cassandra.utils.concurrent.WaitQueue; -import static com.codahale.metrics.Timer.Context; +import org.apache.cassandra.metrics.Timer.Context; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index e2aeef1dd247..f42ad568e488 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -37,7 +37,7 @@ import com.google.common.annotations.VisibleForTesting; import org.cliffc.high_scale_lib.NonBlockingHashMap; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import net.openhft.chronicle.core.util.ThrowingFunction; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; @@ -355,7 +355,7 @@ synchronized void sync(boolean flush) if (flush || close) { - try (Timer.Context ignored = CommitLog.instance.metrics.waitingOnFlush.time()) + try (Timer.Context ignored = CommitLog.instance.metrics.waitingOnFlush.startTime()) { flush(startMarker, sectionEnd); } @@ -736,7 +736,7 @@ void markWritten() void awaitDiskSync(Timer waitingOnCommit) { - try (Timer.Context ignored = waitingOnCommit.time()) + try (Timer.Context ignored = waitingOnCommit.startTime()) { segment.waitForSync(position); } diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java index ae170a87d51e..858326bb538f 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java @@ -39,7 +39,7 @@ protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) if (lastSyncedAt < expectedSyncTime) { pending.incrementAndGet(); - awaitSyncAt(expectedSyncTime, commitLog.metrics.waitingOnCommit.time()); + awaitSyncAt(expectedSyncTime, commitLog.metrics.waitingOnCommit.startTime()); pending.decrementAndGet(); } } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 801b1ec06be7..4c5049144afe 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -59,7 +59,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import net.openhft.chronicle.core.util.ThrowingSupplier; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.ExecutorFactory; diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java index 635ab38ec621..1c098921bcb5 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@ -35,7 +35,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Runnables; -import com.codahale.metrics.Counter; +import org.apache.cassandra.metrics.Counter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/java/org/apache/cassandra/db/virtual/CIDRFilteringMetricsTable.java b/src/java/org/apache/cassandra/db/virtual/CIDRFilteringMetricsTable.java index 6bbf82c40225..17f2a606ac78 100644 --- a/src/java/org/apache/cassandra/db/virtual/CIDRFilteringMetricsTable.java +++ b/src/java/org/apache/cassandra/db/virtual/CIDRFilteringMetricsTable.java @@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting; -import com.codahale.metrics.Counter; +import org.apache.cassandra.metrics.Counter; import com.codahale.metrics.Snapshot; import org.apache.cassandra.auth.CassandraAuthorizer; import org.apache.cassandra.config.DatabaseDescriptor; diff --git a/src/java/org/apache/cassandra/db/virtual/model/CounterMetricRow.java b/src/java/org/apache/cassandra/db/virtual/model/CounterMetricRow.java index 81d32ef99043..162fe6f9abf2 100644 --- a/src/java/org/apache/cassandra/db/virtual/model/CounterMetricRow.java +++ b/src/java/org/apache/cassandra/db/virtual/model/CounterMetricRow.java @@ -18,7 +18,7 @@ package org.apache.cassandra.db.virtual.model; -import com.codahale.metrics.Counter; +import org.apache.cassandra.metrics.Counter; import com.codahale.metrics.Metric; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/db/virtual/model/HistogramMetricRow.java b/src/java/org/apache/cassandra/db/virtual/model/HistogramMetricRow.java index 057872b7dd12..886635fe4bbe 100644 --- a/src/java/org/apache/cassandra/db/virtual/model/HistogramMetricRow.java +++ b/src/java/org/apache/cassandra/db/virtual/model/HistogramMetricRow.java @@ -18,7 +18,7 @@ package org.apache.cassandra.db.virtual.model; -import com.codahale.metrics.Histogram; +import org.apache.cassandra.metrics.Histogram; import com.codahale.metrics.Metric; import com.codahale.metrics.Snapshot; diff --git a/src/java/org/apache/cassandra/db/virtual/model/MeterMetricRow.java b/src/java/org/apache/cassandra/db/virtual/model/MeterMetricRow.java index e4857a5a39bc..cbd3fba58599 100644 --- a/src/java/org/apache/cassandra/db/virtual/model/MeterMetricRow.java +++ b/src/java/org/apache/cassandra/db/virtual/model/MeterMetricRow.java @@ -18,7 +18,7 @@ package org.apache.cassandra.db.virtual.model; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import com.codahale.metrics.Metric; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/db/virtual/model/MetricRow.java b/src/java/org/apache/cassandra/db/virtual/model/MetricRow.java index 88ed98146b43..355e9520cd80 100644 --- a/src/java/org/apache/cassandra/db/virtual/model/MetricRow.java +++ b/src/java/org/apache/cassandra/db/virtual/model/MetricRow.java @@ -18,12 +18,12 @@ package org.apache.cassandra.db.virtual.model; -import com.codahale.metrics.Counter; +import org.apache.cassandra.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Histogram; +import org.apache.cassandra.metrics.Meter; import com.codahale.metrics.Metric; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/db/virtual/model/TimerMetricRow.java b/src/java/org/apache/cassandra/db/virtual/model/TimerMetricRow.java index 4fd4b8175835..704c3d7afe10 100644 --- a/src/java/org/apache/cassandra/db/virtual/model/TimerMetricRow.java +++ b/src/java/org/apache/cassandra/db/virtual/model/TimerMetricRow.java @@ -19,7 +19,7 @@ package org.apache.cassandra.db.virtual.model; import com.codahale.metrics.Metric; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/index/sai/metrics/ColumnQueryMetrics.java b/src/java/org/apache/cassandra/index/sai/metrics/ColumnQueryMetrics.java index d2438510d93a..6068ac4b37f0 100644 --- a/src/java/org/apache/cassandra/index/sai/metrics/ColumnQueryMetrics.java +++ b/src/java/org/apache/cassandra/index/sai/metrics/ColumnQueryMetrics.java @@ -19,8 +19,8 @@ import java.util.concurrent.TimeUnit; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Meter; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.index.sai.utils.IndexIdentifier; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/index/sai/metrics/IndexMetrics.java b/src/java/org/apache/cassandra/index/sai/metrics/IndexMetrics.java index 575fb79b9b9d..46d958eab988 100644 --- a/src/java/org/apache/cassandra/index/sai/metrics/IndexMetrics.java +++ b/src/java/org/apache/cassandra/index/sai/metrics/IndexMetrics.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.index.sai.metrics; -import com.codahale.metrics.Counter; +import org.apache.cassandra.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Histogram; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.index.sai.StorageAttachedIndex; import org.apache.cassandra.index.sai.memory.MemtableIndexManager; diff --git a/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java b/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java index bbfbe1d28701..204cc07ae3f7 100644 --- a/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java +++ b/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java @@ -19,9 +19,9 @@ import java.util.concurrent.TimeUnit; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Counter; +import org.apache.cassandra.metrics.Histogram; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.index.sai.QueryContext; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.tracing.Tracing; diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/RowIndexEntry.java b/src/java/org/apache/cassandra/io/sstable/format/big/RowIndexEntry.java index 7828599e14ff..80d0b0ba8206 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/RowIndexEntry.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; import java.util.List; -import com.codahale.metrics.Histogram; +import org.apache.cassandra.metrics.Histogram; import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ArrayClustering; diff --git a/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManager.java index 7de33ba2f117..aa39fefec7c9 100644 --- a/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManager.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; @@ -243,7 +243,7 @@ public void redistributeSummaries() throws IOException Pair> redistributionTransactionInfo = getRestributionTransactions(); Map transactions = redistributionTransactionInfo.right; long nonRedistributingOffHeapSize = redistributionTransactionInfo.left; - try (Timer.Context ctx = CompactionManager.instance.getMetrics().indexSummaryRedistributionTime.time()) + try (Timer.Context ctx = CompactionManager.instance.getMetrics().indexSummaryRedistributionTime.startTime()) { redistributeSummaries(new IndexSummaryRedistribution(transactions, nonRedistributingOffHeapSize, diff --git a/src/java/org/apache/cassandra/metrics/AbstractCacheMetrics.java b/src/java/org/apache/cassandra/metrics/AbstractCacheMetrics.java index 7b4b7510d5f7..e389183f166c 100644 --- a/src/java/org/apache/cassandra/metrics/AbstractCacheMetrics.java +++ b/src/java/org/apache/cassandra/metrics/AbstractCacheMetrics.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; import com.codahale.metrics.RatioGauge; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/AtomicLongCounter.java b/src/java/org/apache/cassandra/metrics/AtomicLongCounter.java new file mode 100644 index 000000000000..fb8751b061d5 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/AtomicLongCounter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.cassandra.utils.ReflectionUtils; + +/** + * This type of Counter is not very efficient for updates but fast to read. + * It should be used only when getCount performance is critical + * and we can sucrifice the cost to update it by multiple threads + */ +public class AtomicLongCounter extends com.codahale.metrics.Counter implements Counter +{ + private final AtomicLong counter = new AtomicLong(); + + public AtomicLongCounter() + { + ReflectionUtils.setFieldToNull(this, "count"); + } + + @Override + public void inc() + { + counter.incrementAndGet(); + } + + @Override + public void inc(long n) + { + counter.addAndGet(n); + } + + @Override + public void dec() + { + counter.decrementAndGet(); + } + + @Override + public void dec(long n) + { + counter.addAndGet(-n); + } + + @Override + public long getCount() + { + return counter.get(); + } +} diff --git a/src/java/org/apache/cassandra/metrics/BatchMetrics.java b/src/java/org/apache/cassandra/metrics/BatchMetrics.java index 28bf4cfbb2b8..445fd6dea307 100644 --- a/src/java/org/apache/cassandra/metrics/BatchMetrics.java +++ b/src/java/org/apache/cassandra/metrics/BatchMetrics.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.metrics; -import com.codahale.metrics.Histogram; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java index afcf62a09bb5..67bd81f00b88 100644 --- a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java +++ b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java @@ -18,7 +18,6 @@ package org.apache.cassandra.metrics; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; import org.apache.cassandra.utils.memory.BufferPool; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java index 654bb059d16e..f951803feec2 100644 --- a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java @@ -18,9 +18,6 @@ */ package org.apache.cassandra.metrics; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java b/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java index f42f49592a2c..63c8dfb8cb50 100644 --- a/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java @@ -18,8 +18,6 @@ */ package org.apache.cassandra.metrics; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/CIDRAuthorizerMetrics.java b/src/java/org/apache/cassandra/metrics/CIDRAuthorizerMetrics.java index 39a6e357c316..76f6e1307eba 100644 --- a/src/java/org/apache/cassandra/metrics/CIDRAuthorizerMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CIDRAuthorizerMetrics.java @@ -20,8 +20,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Timer; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/CQLMetrics.java b/src/java/org/apache/cassandra/metrics/CQLMetrics.java index edf13dfb4e3f..aae5f653d3e7 100644 --- a/src/java/org/apache/cassandra/metrics/CQLMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CQLMetrics.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.metrics; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.RatioGauge; import org.apache.cassandra.cql3.QueryProcessor; diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java index 8cf83f520870..e3d10747d294 100644 --- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java +++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java @@ -39,15 +39,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; import com.codahale.metrics.Metered; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricSet; -import com.codahale.metrics.Timer; import org.apache.cassandra.db.virtual.CollectionVirtualTableAdapter; import org.apache.cassandra.db.virtual.VirtualTable; import org.apache.cassandra.db.virtual.model.CounterMetricRow; @@ -294,14 +290,39 @@ private static int findNthIndexOf(String str, int start, int n) public Counter counter(MetricName... name) { - Counter counter = super.counter(name[0].getMetricName()); + String simpleMetricName = name[0].getMetricName(); + Metric metric = super.getMetrics().get(simpleMetricName); + if (metric instanceof Counter) + return (Counter) metric; + + Counter counter = new ThreadLocalCounter(); + super.register(simpleMetricName, counter); + Stream.of(name).forEach(n -> register(n, counter)); + return counter; + } + + public Counter atomicLongCounter(MetricName... name) + { + String simpleMetricName = name[0].getMetricName(); + Metric metric = super.getMetrics().get(simpleMetricName); + if (metric instanceof Counter) + return (Counter) metric; + + Counter counter = new AtomicLongCounter(); + super.register(simpleMetricName, counter); Stream.of(name).forEach(n -> register(n, counter)); return counter; } public Meter meter(MetricName... name) { - Meter meter = super.meter(name[0].getMetricName()); + String simpleMetricName = name[0].getMetricName(); + Metric metric = super.getMetrics().get(simpleMetricName); + if (metric instanceof Meter) + return (Meter) metric; + + Meter meter = new ThreadLocalMeter(); + super.register(simpleMetricName, meter); Stream.of(name).forEach(n -> register(n, meter)); return meter; } diff --git a/src/java/org/apache/cassandra/metrics/ChunkCacheMetrics.java b/src/java/org/apache/cassandra/metrics/ChunkCacheMetrics.java index a208998c9a21..c7b1b0be89b6 100644 --- a/src/java/org/apache/cassandra/metrics/ChunkCacheMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ChunkCacheMetrics.java @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import com.codahale.metrics.Timer; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.stats.CacheStats; import com.github.benmanes.caffeine.cache.stats.StatsCounter; diff --git a/src/java/org/apache/cassandra/metrics/ClearableHistogram.java b/src/java/org/apache/cassandra/metrics/ClearableHistogram.java index f83ef8276ad9..8d6749d2fd15 100644 --- a/src/java/org/apache/cassandra/metrics/ClearableHistogram.java +++ b/src/java/org/apache/cassandra/metrics/ClearableHistogram.java @@ -17,18 +17,13 @@ */ package org.apache.cassandra.metrics; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.concurrent.atomic.LongAdder; - import com.google.common.annotations.VisibleForTesting; -import com.codahale.metrics.Histogram; /** * Adds ability to reset a histogram */ -public class ClearableHistogram extends Histogram +public class ClearableHistogram extends ThreadLocalHistogram { private final DecayingEstimatedHistogramReservoir reservoirRef; @@ -53,30 +48,6 @@ public void clear() private void clearCount() { - // We have unfortunately no access to the count field so we need to use reflection to ensure that it is cleared. - // I hate that as it is fragile and pretty ugly but we only use that method for tests and it should fail pretty - // clearly if we start using an incompatible version of the metrics. - try - { - Field countField = Histogram.class.getDeclaredField("count"); - countField.setAccessible(true); - // in 3.1 the counter object is a LongAdderAdapter which is a package private interface - // from com.codahale.metrics. In 4.0, it is a java LongAdder so the code will be simpler. - Object counter = countField.get(this); - if (counter instanceof LongAdder) // For com.codahale.metrics version >= 4.0 - { - ((LongAdder) counter).reset(); - } - else // 3.1 and 3.2 - { - Method sumThenReset = counter.getClass().getDeclaredMethod("sumThenReset"); - sumThenReset.setAccessible(true); - sumThenReset.invoke(counter); - } - } - catch (Exception e) - { - throw new IllegalStateException("Cannot reset the com.codahale.metrics.Histogram count. This might be due to a change of version of the metric library", e); - } + reset(); } } diff --git a/src/java/org/apache/cassandra/metrics/ClientMessageSizeMetrics.java b/src/java/org/apache/cassandra/metrics/ClientMessageSizeMetrics.java index 416e6b44b48c..e3c19a8f88e7 100644 --- a/src/java/org/apache/cassandra/metrics/ClientMessageSizeMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientMessageSizeMetrics.java @@ -20,9 +20,6 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; - /** * Metrics to track the size of incoming and outgoing bytes at Cassandra server. */ diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java b/src/java/org/apache/cassandra/metrics/ClientMetrics.java index 1df5a23b2170..064d0463acfa 100644 --- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java @@ -31,10 +31,7 @@ import com.google.common.annotations.VisibleForTesting; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; import com.codahale.metrics.Reservoir; -import com.codahale.metrics.Timer; import org.apache.cassandra.auth.AuthenticatedUser; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.auth.IAuthenticator.AuthenticationMode; @@ -201,7 +198,7 @@ public synchronized void init(Server servers) Reservoir ipUsageReservoir = ClientResourceLimits.ipUsageReservoir(); Metrics.register(factory.createMetricName("RequestsSizeByIpDistribution"), - new Histogram(ipUsageReservoir) + new ThreadLocalHistogram(ipUsageReservoir) { public long getCount() { diff --git a/src/java/org/apache/cassandra/metrics/ClientRangeRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRangeRequestMetrics.java index c974651381b5..ab654333a23b 100644 --- a/src/java/org/apache/cassandra/metrics/ClientRangeRequestMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientRangeRequestMetrics.java @@ -18,7 +18,6 @@ */ package org.apache.cassandra.metrics; -import com.codahale.metrics.Histogram; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java index 61fcc34bf129..e47c40f4c711 100644 --- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java @@ -21,7 +21,6 @@ package org.apache.cassandra.metrics; -import com.codahale.metrics.Meter; import org.apache.cassandra.exceptions.ReadAbortException; import org.apache.cassandra.exceptions.ReadSizeAbortException; import org.apache.cassandra.exceptions.TombstoneAbortException; diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java index f7719762a448..bfa44a559a8e 100644 --- a/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java @@ -20,8 +20,6 @@ import java.util.Collection; -import com.codahale.metrics.Counter; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.restrictions.StatementRestrictions; import org.apache.cassandra.cql3.selection.Selection; diff --git a/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java index 50427af0735f..bcaebc650a90 100644 --- a/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java @@ -18,7 +18,6 @@ */ package org.apache.cassandra.metrics; -import com.codahale.metrics.Histogram; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java index 1c7390522a3b..d197ed8f6e03 100644 --- a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java @@ -18,8 +18,6 @@ package org.apache.cassandra.metrics; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; import org.apache.cassandra.db.commitlog.AbstractCommitLogService; import org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager; diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index 3bdd14a0bb35..642bd7b23c6e 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -19,11 +19,8 @@ import java.util.*; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; diff --git a/src/java/org/apache/cassandra/metrics/Counter.java b/src/java/org/apache/cassandra/metrics/Counter.java new file mode 100644 index 000000000000..d1ce58dc9b00 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/Counter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Counting; +import com.codahale.metrics.Metric; + +/** + * An interface which mimics {@link com.codahale.metrics.Counter} API and allows alternative implementations + */ +public interface Counter extends Metric, Counting +{ + void inc(); + + void inc(long n); + + void dec(); + + void dec(long n); +} diff --git a/src/java/org/apache/cassandra/metrics/DenylistMetrics.java b/src/java/org/apache/cassandra/metrics/DenylistMetrics.java index e3c34ab35756..72fd0817006c 100644 --- a/src/java/org/apache/cassandra/metrics/DenylistMetrics.java +++ b/src/java/org/apache/cassandra/metrics/DenylistMetrics.java @@ -18,8 +18,6 @@ package org.apache.cassandra.metrics; -import com.codahale.metrics.Meter; - import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; public class DenylistMetrics diff --git a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java index e17c24de7a67..a3cd1248fecf 100644 --- a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java +++ b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java @@ -22,9 +22,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; - import org.apache.cassandra.net.Verb; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java index 187f5afe436b..c8aa29c21b9e 100644 --- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java +++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java @@ -19,7 +19,6 @@ import java.util.Map.Entry; -import com.codahale.metrics.Counter; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; diff --git a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java index 2a8ce92776d9..830a56c2a664 100644 --- a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java @@ -20,8 +20,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.cassandra.concurrent.ImmediateExecutor; diff --git a/src/java/org/apache/cassandra/metrics/Histogram.java b/src/java/org/apache/cassandra/metrics/Histogram.java new file mode 100644 index 000000000000..fa0397db181e --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/Histogram.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Counting; +import com.codahale.metrics.Metric; +import com.codahale.metrics.Sampling; + +/** + * An interface which mimics {@link com.codahale.metrics.Histogram} API and allows alternative implementations + */ +public interface Histogram extends Metric, Sampling, Counting +{ + void update(long value); + void update(int value); +} diff --git a/src/java/org/apache/cassandra/metrics/InternodeOutboundMetrics.java b/src/java/org/apache/cassandra/metrics/InternodeOutboundMetrics.java index f2a1a8bb6f03..247571b47985 100644 --- a/src/java/org/apache/cassandra/metrics/InternodeOutboundMetrics.java +++ b/src/java/org/apache/cassandra/metrics/InternodeOutboundMetrics.java @@ -18,7 +18,6 @@ package org.apache.cassandra.metrics; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; import org.apache.cassandra.net.OutboundConnections; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index a1916bebd071..2168a5f5e5ad 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -21,11 +21,7 @@ import com.google.common.collect.ImmutableMap; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -354,7 +350,7 @@ public Long getValue() */ private Counter createKeyspaceCounter(String name, final ToLongFunction extractor) { - return Metrics.register(factory.createMetricName(name), new Counter() + return Metrics.register(factory.createMetricName(name), new ThreadLocalCounter() { @Override public long getCount() diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java index e4abd40bbf01..2ca507eb4924 100644 --- a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java @@ -23,10 +23,8 @@ import com.google.common.collect.Lists; -import com.codahale.metrics.Counter; import com.codahale.metrics.Reservoir; import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -169,7 +167,7 @@ public void release() Metrics.remove(factory.createMetricName(namePrefix + "TotalLatency")); } - public class LatencyMetricsTimer extends Timer + public class LatencyMetricsTimer extends ThreadLocalTimer { long releasedLatencyCount = 0; @@ -249,7 +247,8 @@ public Snapshot getSnapshot() } } - class LatencyMetricsCounter extends Counter + // TODO: optimize + class LatencyMetricsCounter extends ThreadLocalCounter { @Override public long getCount() diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java index 8f122d6dd73f..5f25c9a34553 100644 --- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java @@ -32,7 +32,6 @@ import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; -import com.codahale.metrics.Timer; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.Verb; diff --git a/src/java/org/apache/cassandra/metrics/Meter.java b/src/java/org/apache/cassandra/metrics/Meter.java new file mode 100644 index 000000000000..cf4ef4e03533 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/Meter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Metered; + +/** + * An interface which mimics {@link com.codahale.metrics.Meter} API and allows alternative implementations + */ +public interface Meter extends Metered +{ + void mark(long n); + void mark(); +} diff --git a/src/java/org/apache/cassandra/metrics/MutualTlsMetrics.java b/src/java/org/apache/cassandra/metrics/MutualTlsMetrics.java index c16cbe406095..3894b9198e23 100644 --- a/src/java/org/apache/cassandra/metrics/MutualTlsMetrics.java +++ b/src/java/org/apache/cassandra/metrics/MutualTlsMetrics.java @@ -18,7 +18,6 @@ package org.apache.cassandra.metrics; -import com.codahale.metrics.Histogram; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/PaxosMetrics.java b/src/java/org/apache/cassandra/metrics/PaxosMetrics.java index 45088a1d1789..25caf3f74604 100644 --- a/src/java/org/apache/cassandra/metrics/PaxosMetrics.java +++ b/src/java/org/apache/cassandra/metrics/PaxosMetrics.java @@ -18,8 +18,6 @@ package org.apache.cassandra.metrics; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java index 1adb5dcd7f30..b429d9629c46 100644 --- a/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ReadRepairMetrics.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.metrics; -import com.codahale.metrics.Meter; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/RepairMetrics.java b/src/java/org/apache/cassandra/metrics/RepairMetrics.java index 27dbbd31181c..26ebdf6a301d 100644 --- a/src/java/org/apache/cassandra/metrics/RepairMetrics.java +++ b/src/java/org/apache/cassandra/metrics/RepairMetrics.java @@ -24,8 +24,6 @@ import com.google.common.annotations.VisibleForTesting; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.messages.RepairMessage; diff --git a/src/java/org/apache/cassandra/metrics/SnapshottingTimer.java b/src/java/org/apache/cassandra/metrics/SnapshottingTimer.java index 3536492a511f..ae18522a67bf 100644 --- a/src/java/org/apache/cassandra/metrics/SnapshottingTimer.java +++ b/src/java/org/apache/cassandra/metrics/SnapshottingTimer.java @@ -20,9 +20,8 @@ import com.codahale.metrics.Clock; import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; -public class SnapshottingTimer extends Timer +public class SnapshottingTimer extends ThreadLocalTimer { private final SnapshottingReservoir reservoir; diff --git a/src/java/org/apache/cassandra/metrics/StorageMetrics.java b/src/java/org/apache/cassandra/metrics/StorageMetrics.java index 1cd65c089670..92486eb0bc44 100644 --- a/src/java/org/apache/cassandra/metrics/StorageMetrics.java +++ b/src/java/org/apache/cassandra/metrics/StorageMetrics.java @@ -20,9 +20,7 @@ import java.util.function.ToLongFunction; import java.util.stream.StreamSupport; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; import org.apache.cassandra.db.Keyspace; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -44,7 +42,7 @@ public class StorageMetrics createSummingGauge("UnreplicatedUncompressedLoad", metric -> metric.unreplicatedUncompressedLiveDiskSpaceUsed.getValue()); public static final Counter uncaughtExceptions = Metrics.counter(factory.createMetricName("Exceptions")); - public static final Counter totalHintsInProgress = Metrics.counter(factory.createMetricName("TotalHintsInProgress")); + public static final Counter totalHintsInProgress = Metrics.atomicLongCounter(factory.createMetricName("TotalHintsInProgress")); public static final Counter totalHints = Metrics.counter(factory.createMetricName("TotalHints")); public static final Counter repairExceptions = Metrics.counter(factory.createMetricName("RepairExceptions")); public static final Counter totalOpsForInvalidToken = Metrics.counter(factory.createMetricName("TotalOpsForInvalidToken")); diff --git a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java index eb5d84006340..eb89f2a87cf8 100644 --- a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java @@ -20,8 +20,6 @@ import java.util.concurrent.ConcurrentMap; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Timer; import org.apache.cassandra.locator.InetAddressAndPort; import org.cliffc.high_scale_lib.NonBlockingHashMap; diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java b/src/java/org/apache/cassandra/metrics/TCMMetrics.java index 01061b725a1d..94db413dd053 100644 --- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java @@ -21,9 +21,6 @@ import java.util.concurrent.TimeUnit; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index fabb0814e49a..fa94d8d7ff85 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -36,12 +36,8 @@ import com.google.common.collect.Maps; import org.apache.commons.lang3.ArrayUtils; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; -import com.codahale.metrics.Timer; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalCounter.java b/src/java/org/apache/cassandra/metrics/ThreadLocalCounter.java new file mode 100644 index 000000000000..caebc46da4a4 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ThreadLocalCounter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import org.apache.cassandra.utils.ReflectionUtils; + +/** + * An alternative to Dropwizard Counter which implements the same kind of API. + * it has more efficent inc/dec operations and consumes less memory. + * The counter logic is implemented using {@link ThreadLocalMetrics} functionality. + * + * NOTE: Dropwizard Counter is a concrete class and there is no an interface for Dropwizard Counter logic, + * so we have to create an alternative hierarchy. +*/ +public class ThreadLocalCounter extends com.codahale.metrics.Counter implements Counter +{ + private final int metricId; + + ThreadLocalCounter(int metricId) + { + this.metricId = metricId; + ThreadLocalMetrics.destroyWhenUnreachable(this, metricId); + ReflectionUtils.setFieldToNull(this, "count"); // reduce metrics memory footprint + } + + public ThreadLocalCounter() + { + this(ThreadLocalMetrics.allocateMetricId()); + } + + @Override + public void inc() + { + ThreadLocalMetrics.add(metricId, 1); + } + + @Override + public void inc(long n) + { + ThreadLocalMetrics.add(metricId, n); + } + + @Override + public void dec() + { + ThreadLocalMetrics.add(metricId, -1); + } + + @Override + public void dec(long n) + { + ThreadLocalMetrics.add(metricId, -n); + } + + @Override + public long getCount() + { + return ThreadLocalMetrics.getCount(metricId); + } +} diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalHistogram.java b/src/java/org/apache/cassandra/metrics/ThreadLocalHistogram.java new file mode 100644 index 000000000000..5579c2b04765 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ThreadLocalHistogram.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Reservoir; +import com.codahale.metrics.Snapshot; +import org.apache.cassandra.utils.ReflectionUtils; + +/** + * An alternative to Dropwizard Histogram which implements the same kind of API. + * it has more efficent counting operations and consumes less memory. + * The counter logic is implemented using {@link ThreadLocalMetrics} functionality. + * + * NOTE: Dropwizard Histogram is a concrete class and there is no an interface for Dropwizard Histogram logic, + * so we have to create an alternative API hierarchy. + */ +public class ThreadLocalHistogram extends com.codahale.metrics.Histogram implements Histogram +{ + private final Reservoir reservoir; + private final int countMetricId; + + /** + * Creates a new {@link ThreadLocalHistogram} with the given reservoir. + * + * @param reservoir the reservoir to create a histogram from + */ + public ThreadLocalHistogram(Reservoir reservoir) { + super(reservoir); + this.reservoir = reservoir; + this.countMetricId = ThreadLocalMetrics.allocateMetricId(); + ThreadLocalMetrics.destroyWhenUnreachable(this, countMetricId); + ReflectionUtils.setFieldToNull(this, "count"); // reduce metrics memory footprint + } + + /** + * Adds a recorded value. + * + * @param value the length of the value + */ + public void update(int value) { + update((long) value); + } + + /** + * Adds a recorded value. + * + * @param value the length of the value + */ + public void update(long value) { + ThreadLocalMetrics.add(countMetricId, 1); + reservoir.update(value); + } + + /** + * Returns the number of values recorded. + * + * @return the number of values recorded + */ + @Override + public long getCount() { + return ThreadLocalMetrics.getCount(countMetricId); + } + + public void reset() + { + ThreadLocalMetrics.getCountAndReset(countMetricId); + } + + @Override + public Snapshot getSnapshot() { + return reservoir.getSnapshot(); + } +} diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalMeter.java b/src/java/org/apache/cassandra/metrics/ThreadLocalMeter.java new file mode 100644 index 000000000000..0336cbb99f24 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ThreadLocalMeter.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.annotations.VisibleForTesting; + +import com.codahale.metrics.Clock; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.utils.MonotonicClock; +import org.apache.cassandra.utils.ReflectionUtils; + +import static java.lang.Math.exp; + +/** + * An alternative to Dropwizard Meter which implements the same kind of API. + * it has more efficent mark operations and consumes less memory. + * Only exponential decaying moving average is supported for 1/5/15-minutes rate values. + * Tick logic is moved out from a mark operation and always executed in a background thread. + * For better cache locality rate values are extracted to a common non thread-local array + * updated by a background thread in bulk. + * Counter logic inside is implemented using @see ThreadLocalMetrics functionality. + * + * NOTE: Dropwizard Meter is a class and there is no an interface for Dropwizard Meter logic, + * so we have to create an alternative hierarchy. + */ +public class ThreadLocalMeter extends com.codahale.metrics.Meter implements Meter +{ + private static final int INTERVAL_SEC = 5; + private static final long TICK_INTERVAL_NS = TimeUnit.SECONDS.toNanos(INTERVAL_SEC); + private static final double SECONDS_PER_MINUTE = 60.0; + private static final int ONE_MINUTE = 1; + private static final int FIVE_MINUTES = 5; + private static final int FIFTEEN_MINUTES = 15; + private static final double M1_ALPHA = 1 - exp(-INTERVAL_SEC / SECONDS_PER_MINUTE / ONE_MINUTE); + private static final double M5_ALPHA = 1 - exp(-INTERVAL_SEC / SECONDS_PER_MINUTE / FIVE_MINUTES); + private static final double M15_ALPHA = 1 - exp(-INTERVAL_SEC / SECONDS_PER_MINUTE / FIFTEEN_MINUTES); + + private static final int M1_RATE_OFFSET = 0; + private static final int M5_RATE_OFFSET = 1; + private static final int M15_RATE_OFFSET = 2; + private static final int RATES_COUNT = 3; + private static final double NON_INITIALIZED = Double.MIN_VALUE; + + private static final int BACKGROUND_TICK_INTERVAL_SEC = INTERVAL_SEC; + private static final List> allMeters = new CopyOnWriteArrayList<>(); + + /** + * CASSANDRA-19332 + * If ticking would reduce even Long.MAX_VALUE in the 15 minute EWMA below this target then don't bother + * ticking in a loop and instead reset all the EWMAs. + */ + private static final double maxTickZeroTarget = 0.0001; + private static final int maxTicks; + + static + { + int m3Ticks = 1; + double emulatedM15Rate = 0.0; + emulatedM15Rate = tickFifteenMinuteEWMA(emulatedM15Rate, Long.MAX_VALUE); + do + { + emulatedM15Rate = tickFifteenMinuteEWMA(emulatedM15Rate, 0); + m3Ticks++; + } + while (getRatePerSecond(emulatedM15Rate) > maxTickZeroTarget); + maxTicks = m3Ticks; + } + + static + { + ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(ThreadLocalMeter::tickAll, + BACKGROUND_TICK_INTERVAL_SEC, + BACKGROUND_TICK_INTERVAL_SEC, + TimeUnit.SECONDS); + } + + private static final Object ratesArrayGuard = new Object(); + + private static volatile double[] rates = new double[RATES_COUNT * 16]; + static final AtomicInteger rateGroupIdGenerator = new AtomicInteger(); + + private static final Object freeRateGroupIdSetGuard = new Object(); + private static final BitSet freeRateGroupIdSet = new BitSet(); + + private static int allocateRateGroupOffset() + { + int rateGroupId; + synchronized (freeRateGroupIdSetGuard) + { + rateGroupId = freeRateGroupIdSet.nextSetBit(0); + if (rateGroupId >= 0) + freeRateGroupIdSet.clear(rateGroupId); + } + if (rateGroupId < 0) + { + rateGroupId = rateGroupIdGenerator.getAndAdd(RATES_COUNT); + } + synchronized (ratesArrayGuard) + { + if (rates.length < rateGroupId + RATES_COUNT) + { + double[] newRates = new double[(int) (rateGroupId + RATES_COUNT)]; + System.arraycopy(rates, 0, newRates, 0, rates.length); + rates = newRates; + } + rates[rateGroupId + M1_RATE_OFFSET] = NON_INITIALIZED; + rates[rateGroupId + M5_RATE_OFFSET] = NON_INITIALIZED; + rates[rateGroupId + M15_RATE_OFFSET] = NON_INITIALIZED; + } + return rateGroupId; + } + + static double getRateValue(int offset) + { + return rates[offset]; + } + + private static void setRateValue(int offset, double value) + { + rates[offset] = value; + } + + private final int countMetricId; + private final int uncountedMetricId; + private final int rateGroupId; + private final long startTime; + private final MonotonicClock clock; + private long lastTick; + + public ThreadLocalMeter() + { + this(MonotonicClock.Global.approxTime); + } + + public ThreadLocalMeter(MonotonicClock clock) + { + super(null, Clock.defaultClock()); // reduce metrics memory footprint + this.clock = clock; + this.startTime = this.clock.now(); + this.lastTick = this.startTime; + this.countMetricId = ThreadLocalMetrics.allocateMetricId(); + this.uncountedMetricId = ThreadLocalMetrics.allocateMetricId(); + this.rateGroupId = allocateRateGroupOffset(); + allMeters.add(new WeakReference<>(this)); + ThreadLocalMetrics.destroyWhenUnreachable(this, new MeterCleaner(countMetricId, uncountedMetricId, rateGroupId)); + ReflectionUtils.setFieldToNull(this, "count"); // reduce metrics memory footprint + } + + private static class MeterCleaner implements ThreadLocalMetrics.MetricCleaner + { + private final int countMetricId; + private final int uncountedMetricId; + private final int rateGroupId; + + private MeterCleaner(int countMetricId, int uncountedMetricId, int rateGroupId) + { + this.countMetricId = countMetricId; + this.uncountedMetricId = uncountedMetricId; + this.rateGroupId = rateGroupId; + } + + @Override + public void clean() + { + synchronized (freeRateGroupIdSetGuard) + { + freeRateGroupIdSet.set(rateGroupId); + } + ThreadLocalMetrics.recycleMetricId(countMetricId); + ThreadLocalMetrics.recycleMetricId(uncountedMetricId); + } + } + + /** + * Mark the occurrence of an event. + */ + public void mark() + { + mark(1); + } + + /** + * Mark the occurrence of a given number of events. + * + * @param n the number of events + */ + public void mark(long n) + { + ThreadLocalMetrics context = ThreadLocalMetrics.get(); + context.addNonStatic(countMetricId, n); + context.addNonStatic(uncountedMetricId, n); + } + + @Override + public long getCount() + { + return ThreadLocalMetrics.getCount(countMetricId); + } + + @Override + public double getFifteenMinuteRate() + { + return getRatePerSecond(getRateValue(rateGroupId + M15_RATE_OFFSET)); + } + + @Override + public double getFiveMinuteRate() + { + return getRatePerSecond(getRateValue(rateGroupId + M5_RATE_OFFSET)); + } + + @Override + public double getOneMinuteRate() + { + return getRatePerSecond(getRateValue(rateGroupId + M1_RATE_OFFSET)); + } + + @Override + public double getMeanRate() + { + long count = getCount(); + if (count == 0) + return 0.0; + else + { + final double elapsed = clock.now() - startTime; + return count / elapsed * TimeUnit.SECONDS.toNanos(1); + } + } + + @VisibleForTesting + static void tickAll() + { + synchronized (ratesArrayGuard) + { + List> emptyRefsToRemove = null; + for (WeakReference threadLocalMeterRef : allMeters) + { + ThreadLocalMeter meter = threadLocalMeterRef.get(); + if (meter != null) + meter.tickIfNessesary(); + else + { + if (emptyRefsToRemove == null) + emptyRefsToRemove = new ArrayList<>(); + emptyRefsToRemove.add(threadLocalMeterRef); + } + if (emptyRefsToRemove != null) + allMeters.removeAll(emptyRefsToRemove); + } + } + } + + private void tickIfNessesary() + { + long newTick = clock.now(); + long age = newTick - lastTick; + if (age > TICK_INTERVAL_NS) + { + lastTick = newTick - age % TICK_INTERVAL_NS; + long requiredTicks = age / TICK_INTERVAL_NS; + tick(requiredTicks); + } + } + private void tick(long requiredTicks) + { + if (requiredTicks >= maxTicks) + reset(); + else if (requiredTicks > 0) + { + long count = ThreadLocalMetrics.getCountAndReset(uncountedMetricId); + for (long i = 0; i < requiredTicks; i++) + { + double m1Rate = getRateValue(rateGroupId + M1_RATE_OFFSET); + double m5Rate = getRateValue(rateGroupId + M5_RATE_OFFSET); + double m15Rate = getRateValue(rateGroupId + M15_RATE_OFFSET); + setRateValue(rateGroupId + M1_RATE_OFFSET, tickOneMinuteEWMA(m1Rate, count)); + setRateValue(rateGroupId + M5_RATE_OFFSET, tickFiveMinuteEWMA(m5Rate, count)); + setRateValue(rateGroupId + M15_RATE_OFFSET, tickFifteenMinuteEWMA(m15Rate, count)); + count = 0; + } + } + } + + private static double tickOneMinuteEWMA(double oldRate, long count) + { + return tick(M1_ALPHA, oldRate, count); + } + + private static double tickFiveMinuteEWMA(double oldRate, long count) + { + return tick(M5_ALPHA, oldRate, count); + } + + private static double tickFifteenMinuteEWMA(double oldRate, long count) + { + return tick(M15_ALPHA, oldRate, count); + } + + private static double tick(double alpha, double oldRate, long count) + { + double instantRate = (double) count / TICK_INTERVAL_NS; + if (oldRate != NON_INITIALIZED) + return oldRate + alpha * (instantRate - oldRate); + else // init + return instantRate; + } + private static double getRatePerSecond(double rate) + { + if (rate == NON_INITIALIZED) + rate = 0.0; + return rate * (double) TimeUnit.SECONDS.toNanos(1L); + } + + /** + * Set the rate to the smallest possible positive value. Used to avoid calling tick a large number of times. + */ + private void reset() + { + ThreadLocalMetrics.getCountAndReset(uncountedMetricId); + setRateValue(rateGroupId + M1_RATE_OFFSET, Double.MIN_NORMAL); + setRateValue(rateGroupId + M5_RATE_OFFSET, Double.MIN_NORMAL); + setRateValue(rateGroupId + M15_RATE_OFFSET, Double.MIN_NORMAL); + } + + @VisibleForTesting + static int getTickingMetersCount() + { + return allMeters.size(); + } +} diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java new file mode 100644 index 000000000000..061edfd5f862 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.annotations.VisibleForTesting; + +import io.netty.util.concurrent.FastThreadLocal; +import org.apache.cassandra.concurrent.Shutdownable; + +import static com.google.common.collect.ImmutableList.of; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE; +import static org.apache.cassandra.utils.ExecutorUtils.shutdownAndWait; + +/** + * A thread-local counter implementation designed to use in metrics as an alternative to LongAdder used by Dropwizard metrics. + * This implementation has reduced write (increment) CPU usage costs in exchange for a higher read cost. + * We keep and increment parts of counters locally for each thread. + * To reduce memory footprint per counter they are stored grouped to a long[] array for each thread. + * Piggyback volatile visibility is expected for readers who execute getCount method to see recent writes to thread local arrays. + * If a metric is not used anymore the position in the array is reused. Phantom references are used to track aliveness of metric users. + * When a thread died the counter values accumulated by it are transfered to a shared summaryValues collection. + * Threads death is tracked using 2 approaches: FastThreadLocal.onRemoval callback and phantom references to Thread objects. + */ +public class ThreadLocalMetrics +{ + static final AtomicInteger idGenerator = new AtomicInteger(); + + private static final Object freeMetricIdSetGuard = new Object(); + + @VisibleForTesting + static final BitSet freeMetricIdSet = new BitSet(); + + private static final List allThreadLocalMetrics = new CopyOnWriteArrayList<>(); + + /* the lock is used to coordinate the threads which: + * 1) transfer values from a dead thread to summaryValues + * 2) calculate a getCount value. + * Using this lock we want to avoid + * a value temporary lost for a transferring value in getCount + * as well as a double-counting + */ + private static final ReadWriteLock summaryLock = new ReentrantReadWriteLock(); + + private static final FastThreadLocal threadLocalMetricsCurrent = new FastThreadLocal<>() + { + @Override + protected ThreadLocalMetrics initialValue() + { + ThreadLocalMetrics result = new ThreadLocalMetrics(); + allThreadLocalMetrics.add(result); + destroyWhenUnreachable(Thread.currentThread(), result::release); + return result; + } + + // this method is invoked when a thread is going to finish, but it works only for FastThreadLocalThread + @Override + protected void onRemoval(ThreadLocalMetrics value) + { + value.release(); + } + }; + + private static final Map summaryValues = new ConcurrentHashMap<>(); + + private static final Shutdownable cleaner; + private static final Set> phantomReferences = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private static final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + + static + { + cleaner = executorFactory().infiniteLoop("ThreadLocalMetrics-Cleaner", ThreadLocalMetrics::cleanupOneReference, UNSAFE); + } + + private long[] counterValues = new long[16]; + + private static void cleanupOneReference() throws InterruptedException + { + Object obj = referenceQueue.remove(100); + if (obj instanceof MetricIdReference) + { + ((MetricIdReference) obj).release(); + phantomReferences.remove(obj); + } + else if (obj instanceof MetricCleanerReference) + { + ((MetricCleanerReference) obj).release(); + phantomReferences.remove(obj); + } + } + + private static class MetricIdReference extends PhantomReference + { + private final int metricId; + + public MetricIdReference(Object referent, ReferenceQueue q, int metricId) + { + super(referent, q); + this.metricId = metricId; + } + + public void release() + { + recycleMetricId(metricId); + } + } + + private static class MetricCleanerReference extends PhantomReference + { + private final MetricCleaner metricCleaner; + + public MetricCleanerReference(Object referent, ReferenceQueue q, MetricCleaner metricCleaner) + { + super(referent, q); + this.metricCleaner = metricCleaner; + } + + public void release() + { + metricCleaner.clean(); + } + } + + interface MetricCleaner + { + void clean(); + } + + static void destroyWhenUnreachable(Object referent, int metricId) + { + phantomReferences.add(new MetricIdReference(referent, referenceQueue, metricId)); + } + + static void destroyWhenUnreachable(Object referent, MetricCleaner metricCleaner) + { + phantomReferences.add(new MetricCleanerReference(referent, referenceQueue, metricCleaner)); + } + + @VisibleForTesting + public static void shutdownCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + { + shutdownAndWait(timeout, unit, of(cleaner)); + } + + private void release() + { + Lock lock = summaryLock.writeLock(); + lock.lock(); + try + { + // we may try to release ThreadLocalMetrics 2 times: onRemoval and by PhantomReference + // so this if check is needed to avoid a potential double release + if (allThreadLocalMetrics.remove(this)) + for (int metricId = 0; metricId < counterValues.length; metricId++) + { + long value = counterValues[metricId]; + if (value != 0) + getSummary(metricId).addAndGet(value); + } + } + finally + { + lock.unlock(); + } + } + + /** + * If we already have ThreadLocalMetrics instance looked up for the current thread + * we can use this method to avoid thread local lookup costs. + * It can be used if you need to update several counters at the same time. + * @param metricId metric to add a value + * @param n valuen to add, can be negative number as well + */ + public void addNonStatic(int metricId, long n) + { + getNonStatic(metricId)[metricId] += n; + } + + public static void add(int metricId, long n) + { + get(metricId)[metricId] += n; + } + + private static long getCount(int metricId, boolean resetToZero) + { + long summaryLocal; + long result; + AtomicLong summary = getSummary(metricId); + Lock readLock = summaryLock.readLock(); + readLock.lock(); + try + { + summaryLocal = summary.get(); + result = 0; + for (ThreadLocalMetrics threadLocalMetrics : allThreadLocalMetrics) + { + if (threadLocalMetrics != null) + { + long count = 0; + long[] currentCounterValues = threadLocalMetrics.counterValues; + if (metricId < currentCounterValues.length) + count = currentCounterValues[metricId]; + result += count; + } + } + } + finally + { + readLock.unlock(); + } + result += summaryLocal; + if (resetToZero) + summary.addAndGet(-result); // compensative reset without writing to thread local values + return result; + } + + private static AtomicLong getSummary(int metricId) + { + AtomicLong result = summaryValues.get(metricId); + if (result != null) + return result; + return summaryValues.computeIfAbsent(metricId, (metricIdToAdd) -> new AtomicLong()); + } + + public static long getCount(int metricId) + { + return getCount(metricId, false); + } + + public static long getCountAndReset(int metricId) + { + return getCount(metricId, true); + } + + public static ThreadLocalMetrics get() { + return threadLocalMetricsCurrent.get(); + } + + private static long[] get(int metricId) + { + ThreadLocalMetrics threadLocalMetrics = ThreadLocalMetrics.get(); + return threadLocalMetrics.getNonStatic(metricId); + } + + private long[] getNonStatic(int metricId) + { + long[] currentCounterValues = counterValues; + if (metricId < currentCounterValues.length) + return currentCounterValues; + + long[] newCounterValues = new long[(int)(metricId * 1.1)]; + System.arraycopy(currentCounterValues, 0, newCounterValues, 0, currentCounterValues.length); + counterValues = newCounterValues; + return newCounterValues; + } + + static int allocateMetricId() + { + int metricId; + synchronized (freeMetricIdSetGuard) + { + metricId = freeMetricIdSet.nextSetBit(0); + if (metricId >= 0) + freeMetricIdSet.clear(metricId); + } + if (metricId < 0) + metricId = idGenerator.getAndIncrement(); + + return metricId; + } + + static void recycleMetricId(int metricId) + { + // we use lock here to avoid potential issues when a metric is releasing and a thread is detected as dead at the same time + // in this case we may clean a summary value and later the thread removal logic may re-add a non-zero summary value + Lock lock = summaryLock.writeLock(); + lock.lock(); + try + { + for (ThreadLocalMetrics threadLocalMetrics : allThreadLocalMetrics) + if (threadLocalMetrics != null) + { + long[] currentCounterValues = threadLocalMetrics.counterValues; + if (metricId < currentCounterValues.length) + currentCounterValues[metricId] = 0; + } + summaryValues.remove(metricId); + } + finally + { + lock.unlock(); + } + synchronized (freeMetricIdSetGuard) + { + freeMetricIdSet.set(metricId); + } + } + + @VisibleForTesting + static int getAllocatedMetricsCount() + { + int freeCount; + synchronized (freeMetricIdSetGuard) + { + freeCount = freeMetricIdSet.cardinality(); + } + return idGenerator.get() - freeCount; + } + + @VisibleForTesting + static int getThreadLocalMetricsObjectsCount() + { + return allThreadLocalMetrics.size(); + } +} diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java b/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java new file mode 100644 index 000000000000..86b8fb6f562d --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.Reservoir; +import com.codahale.metrics.Snapshot; + +/** + * An alternative to Dropwizard Timer which implements the same kind of API. + * it has more efficent latency histogram implementation and consumes less memory. + * + * NOTE: Dropwizard Timer is a concrete class and there is no an interface for Dropwizard Timer logic, + * so we have to create an alternative hierarchy. + */ +public class ThreadLocalTimer extends com.codahale.metrics.Timer implements Timer +{ + private final Meter meter; + private final ThreadLocalHistogram histogram; + private final Clock clock; // usually we need precise clocks for timing + + /** + * Creates a new {@link Timer} using an {@link ExponentiallyDecayingReservoir} and the default + * {@link Clock}. + */ + public ThreadLocalTimer() { + this(new ExponentiallyDecayingReservoir()); + } + + /** + * Creates a new {@link Timer} that uses the given {@link Reservoir}. + * + * @param reservoir the {@link Reservoir} implementation the timer should use + */ + public ThreadLocalTimer(Reservoir reservoir) { + this(reservoir, Clock.defaultClock()); + } + + /** + * Creates a new {@link Timer} that uses the given {@link Reservoir} and {@link Clock}. + * + * @param reservoir the {@link Reservoir} implementation the timer should use + * @param clock the {@link Clock} implementation the timer should use + */ + public ThreadLocalTimer(Reservoir reservoir, Clock clock) { + // clock is intentionally not propagated to ThreadLocalMeter + // we do not need a precise and more expensive time there + this(new ThreadLocalMeter(), new ThreadLocalHistogram(reservoir), clock); + } + + public ThreadLocalTimer(Meter meter, ThreadLocalHistogram histogram, Clock clock) { + super(null, null, clock); + this.meter = meter; + this.histogram = histogram; + this.clock = clock; + } + + /** + * Adds a recorded duration. + * + * @param duration the length of the duration + * @param unit the scale unit of {@code duration} + */ + @Override + public void update(long duration, TimeUnit unit) { + update(unit.toNanos(duration)); + } + + /** + * Adds a recorded duration. + * + * @param duration the {@link Duration} to add to the timer. Negative or zero value are ignored. + */ + @Override + public void update(Duration duration) { + update(duration.toNanos()); + } + + /** + * Times and records the duration of event. + * + * @param event a {@link Callable} whose {@link Callable#call()} method implements a process + * whose duration should be timed + * @param the type of the value returned by {@code event} + * @return the value returned by {@code event} + * @throws Exception if {@code event} throws an {@link Exception} + */ + @Override + public T time(Callable event) throws Exception { + final long startTime = clock.getTick(); + try { + return event.call(); + } finally { + update(clock.getTick() - startTime); + } + } + + /** + * Times and records the duration of event. Should not throw exceptions, for that use the + * {@link #time(Callable)} method. + * + * @param event a {@link Supplier} whose {@link Supplier#get()} method implements a process + * whose duration should be timed + * @param the type of the value returned by {@code event} + * @return the value returned by {@code event} + */ + @Override + public T timeSupplier(Supplier event) { + final long startTime = clock.getTick(); + try { + return event.get(); + } finally { + update(clock.getTick() - startTime); + } + } + + /** + * Times and records the duration of event. + * + * @param event a {@link Runnable} whose {@link Runnable#run()} method implements a process + * whose duration should be timed + */ + @Override + public void time(Runnable event) { + final long startTime = clock.getTick(); + try { + event.run(); + } finally { + update(clock.getTick() - startTime); + } + } + + @Override + public com.codahale.metrics.Timer.Context time() + { + throw new UnsupportedOperationException("use startTime() instread"); + } + + @Override + public Timer.Context startTime() + { + return new Timer.Context(this, clock); + } + + @Override + public long getCount() { + return histogram.getCount(); + } + + @Override + public double getFifteenMinuteRate() { + return meter.getFifteenMinuteRate(); + } + + @Override + public double getFiveMinuteRate() { + return meter.getFiveMinuteRate(); + } + + @Override + public double getMeanRate() { + return meter.getMeanRate(); + } + + @Override + public double getOneMinuteRate() { + return meter.getOneMinuteRate(); + } + + @Override + public Snapshot getSnapshot() { + return histogram.getSnapshot(); + } + + private void update(long duration) { + if (duration >= 0) { + histogram.update(duration); + meter.mark(); + } + } +} diff --git a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java index 90a50ad59701..b6c0caaa1177 100644 --- a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java @@ -19,7 +19,6 @@ import java.util.concurrent.ThreadPoolExecutor; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import org.apache.cassandra.concurrent.ResizableThreadPool; import org.apache.cassandra.metrics.CassandraMetricsRegistry.MetricName; @@ -89,8 +88,8 @@ public ThreadPoolMetrics(ResizableThreadPool executor, String path, String poolN this.path = path; this.poolName = poolName; - totalBlocked = new Counter(); - currentBlocked = new Counter(); + totalBlocked = new ThreadLocalCounter(); + currentBlocked = new ThreadLocalCounter(); activeTasks = executor::getActiveTaskCount; pendingTasks = executor::getPendingTaskCount; completedTasks = executor::getCompletedTaskCount; diff --git a/src/java/org/apache/cassandra/metrics/Timer.java b/src/java/org/apache/cassandra/metrics/Timer.java new file mode 100644 index 000000000000..9e8187d76526 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/Timer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.Metered; +import com.codahale.metrics.Sampling; + +/** + * An interface which mimics {@link com.codahale.metrics.Timer} API and allows alternative implementations + */ +public interface Timer extends Metered, Sampling +{ + /** + * A timing context. + */ + public static class Context implements AutoCloseable { + private final Timer timer; + private final Clock clock; + private final long startTime; + + Context(Timer timer, Clock clock) { + this.timer = timer; + this.clock = clock; + this.startTime = clock.getTick(); + } + + /** + * Updates the timer with the difference between current and start time. Call to this method will + * not reset the start time. Multiple calls result in multiple updates. + * + * @return the elapsed time in nanoseconds + */ + public long stop() { + final long elapsed = clock.getTick() - startTime; + timer.update(elapsed, TimeUnit.NANOSECONDS); + return elapsed; + } + + /** + * Equivalent to calling {@link #stop()}. + */ + @Override + public void close() { + stop(); + } + } + + void update(long duration, TimeUnit unit); + + void update(Duration duration); + + T time(Callable event) throws Exception; + + T timeSupplier(Supplier event); + + void time(Runnable event); + + /* we have to implement another method instead of time() due to 2 reasons: + * 1) com.codahale.metrics.Timer.Context cannot be inhereted - it has only a package-private constructor + * 2) we want to avoid direct dependency to com.codahale.metrics.Timer.Context in other Cassandra classes + */ + Context startTime(); +} diff --git a/src/java/org/apache/cassandra/metrics/TrieMemtableMetricsView.java b/src/java/org/apache/cassandra/metrics/TrieMemtableMetricsView.java index e01254043998..c97a4bc770a1 100644 --- a/src/java/org/apache/cassandra/metrics/TrieMemtableMetricsView.java +++ b/src/java/org/apache/cassandra/metrics/TrieMemtableMetricsView.java @@ -18,7 +18,6 @@ package org.apache.cassandra.metrics; -import com.codahale.metrics.Counter; import static com.codahale.metrics.MetricRegistry.name; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; diff --git a/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java index 98363d413db4..f8ff74840c73 100644 --- a/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java @@ -20,8 +20,6 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Timer; import com.codahale.metrics.Gauge; public class ViewWriteMetrics extends ClientRequestMetrics diff --git a/src/java/org/apache/cassandra/repair/RepairCoordinator.java b/src/java/org/apache/cassandra/repair/RepairCoordinator.java index c4b7a9fd55c9..cfa473d9275b 100644 --- a/src/java/org/apache/cassandra/repair/RepairCoordinator.java +++ b/src/java/org/apache/cassandra/repair/RepairCoordinator.java @@ -51,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryOptions; diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 8c44c4286d36..a63b2f16ecdc 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -41,7 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import com.codahale.metrics.MetricRegistryListener; import com.codahale.metrics.SharedMetricRegistries; import org.apache.cassandra.audit.AuditLogManager; @@ -137,7 +137,7 @@ public NativeTransportService nativeTransportService() SharedMetricRegistries.getOrCreate("logback-metrics").addListener(new MetricRegistryListener.Base() { @Override - public void onMeterAdded(String metricName, Meter meter) + public void onMeterAdded(String metricName, com.codahale.metrics.Meter meter) { // Given metricName consists of appender name in logback.xml + "." + metric name. // We first separate appender name diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5e16bae9445c..4071f8e2732f 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -75,7 +75,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.audit.AuditLogOptions; import org.apache.cassandra.auth.AuthCacheService; diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java index 3412add1a252..33540b99cd29 100644 --- a/src/java/org/apache/cassandra/service/paxos/Paxos.java +++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java @@ -38,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import org.apache.cassandra.exceptions.CasWriteTimeoutException; import org.apache.cassandra.exceptions.ExceptionCode; import org.apache.cassandra.gms.FailureDetector; diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java index 8343b83b071e..9c4cfc266d69 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java @@ -24,7 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java index 4a56e6fe18bd..0c10d5c03ee1 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java @@ -26,7 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java index 46b30a927935..79718b110d98 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java @@ -20,7 +20,7 @@ import java.util.Map; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java index 3564ab93f70f..402ae62e38e3 100644 --- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java +++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java @@ -26,7 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.TCMMetrics; @@ -89,7 +89,7 @@ private Future fetchLogEntriesAndWaitInternal(InetAddressAndPor Promise fetchRes = new AsyncPromise<>(); logger.info("Fetching log from {}, at least {}", remote, awaitAtleast); - try (Timer.Context ctx = TCMMetrics.instance.fetchPeerLogLatency.time()) + try (Timer.Context ctx = TCMMetrics.instance.fetchPeerLogLatency.startTime()) { RemoteProcessor.sendWithCallbackAsync(fetchRes, Verb.TCM_FETCH_PEER_LOG_REQ, diff --git a/src/java/org/apache/cassandra/tcm/Processor.java b/src/java/org/apache/cassandra/tcm/Processor.java index fdb4cf23bb4f..dd86f2bdf75b 100644 --- a/src/java/org/apache/cassandra/tcm/Processor.java +++ b/src/java/org/apache/cassandra/tcm/Processor.java @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.tcm.log.Entry; diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index 0ea055b908f9..cd1190c22206 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -32,7 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.FailureDetector; @@ -165,7 +165,7 @@ public static ClusterMetadata fetchLogAndWait(CandidateIterator candidateIterato private static Future fetchLogAndWaitInternal(CandidateIterator candidates, LocalLog log) { - try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.time()) + try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.startTime()) { Promise remoteRequest = new AsyncPromise<>(); Epoch currentEpoch = log.metadata().epoch; diff --git a/src/java/org/apache/cassandra/tcm/Retry.java b/src/java/org/apache/cassandra/tcm/Retry.java index 3277531444a6..9199614f823d 100644 --- a/src/java/org/apache/cassandra/tcm/Retry.java +++ b/src/java/org/apache/cassandra/tcm/Retry.java @@ -22,7 +22,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.utils.Clock; diff --git a/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java b/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java index af504d35d362..d2a15439ca0e 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java +++ b/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.dht.Range; @@ -117,7 +117,7 @@ public ProgressBarrier withMessagingService(MessageDelivery messagingService) public boolean await() { - try (Timer.Context ctx = TCMMetrics.instance.progressBarrierLatency.time()) + try (Timer.Context ctx = TCMMetrics.instance.progressBarrierLatency.startTime()) { if (waitFor.is(Epoch.EMPTY)) return true; diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java index b47d0d9c668e..ed469f27750e 100644 --- a/src/java/org/apache/cassandra/transport/ServerConnection.java +++ b/src/java/org/apache/cassandra/transport/ServerConnection.java @@ -24,10 +24,11 @@ import org.slf4j.LoggerFactory; import io.netty.channel.Channel; -import com.codahale.metrics.Counter; +import org.apache.cassandra.metrics.Counter; import io.netty.handler.ssl.SslHandler; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.metrics.ThreadLocalCounter; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; @@ -38,7 +39,8 @@ public class ServerConnection extends Connection private volatile IAuthenticator.SaslNegotiator saslNegotiator; private final ClientState clientState; private volatile ConnectionStage stage; - public final Counter requests = new Counter(); + // TODO: clarify is it really a metric? + public final Counter requests = new ThreadLocalCounter(); ServerConnection(Channel channel, ProtocolVersion version, Connection.Tracker tracker) { diff --git a/src/java/org/apache/cassandra/utils/ReflectionUtils.java b/src/java/org/apache/cassandra/utils/ReflectionUtils.java index 289b89beb24b..aae5bb86d8cf 100644 --- a/src/java/org/apache/cassandra/utils/ReflectionUtils.java +++ b/src/java/org/apache/cassandra/utils/ReflectionUtils.java @@ -105,4 +105,36 @@ public static void clearMapField(Class clazz, Object instance, String throw new RuntimeException(String.format("Could not clear map field %s in class %s", mapName, clazz), ex); } } + + public static void setFieldToNull(Object object, String fieldName) + { + Class clazz = object.getClass(); + try + { + boolean set = false; + do + { + try + { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(object, null); + set = true; + } + catch (NoSuchFieldException e) + { + // NOTHING TO DO + } + clazz = clazz.getSuperclass(); + + } + while (clazz != Object.class); + if (!set) + throw new RuntimeException(String.format("Could not find field %s in %s", fieldName, object.getClass())); + } + catch (IllegalAccessException e) + { + throw new RuntimeException(String.format("Could not set field %s in %s", fieldName, object.getClass()), e); + } + } } diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java index 646c90e04d32..5b2e31e8401c 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java @@ -23,7 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java index f77ea2fab110..ad429b00b385 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java @@ -26,7 +26,7 @@ import com.google.common.base.Preconditions; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.metrics.MetricNameFactory; @@ -253,7 +253,7 @@ public WaitQueue hasRoom() public Timer.Context blockedTimerContext() { - return blockedOnAllocating.time(); + return blockedOnAllocating.startTime(); } } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index cf4b6cdd7a1a..891824e9d40e 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -114,6 +114,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.Sampler; +import org.apache.cassandra.metrics.ThreadLocalMetrics; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.NoPayload; @@ -955,6 +956,7 @@ public Future shutdown(boolean runOnExitThreads, boolean shutdownMessaging () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())), () -> ActiveRepairService.instance().shutdownNowAndWait(1L, MINUTES), () -> EpochAwareDebounce.instance.close(), + () -> ThreadLocalMetrics.shutdownCleaner(1L, MINUTES), SnapshotManager.instance::close ); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java index 7bedce183fa6..f381c0062717 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java @@ -26,8 +26,8 @@ import com.codahale.metrics.Counting; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Histogram; +import org.apache.cassandra.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; @@ -78,7 +78,7 @@ public Map getCounters(Predicate filter) @Override public double getHistogram(String name, MetricValue value) { - Histogram histogram = metricsRegistry.getHistograms().get(name); + Histogram histogram = (Histogram) metricsRegistry.getMetrics().get(name); return getValue(histogram, value); } @@ -86,10 +86,11 @@ public double getHistogram(String name, MetricValue value) public Map getHistograms(Predicate filter, MetricValue value) { Map values = new HashMap<>(); - for (Map.Entry e : metricsRegistry.getHistograms().entrySet()) + for (Map.Entry e : metricsRegistry.getMetrics().entrySet()) { - if (filter.test(e.getKey())) - values.put(e.getKey(), getValue(e.getValue(), value)); + Metric metric = e.getValue(); + if (metric instanceof Histogram && filter.test(e.getKey())) + values.put(e.getKey(), getValue((Histogram) metric, value)); } return values; } @@ -115,17 +116,17 @@ public Map getGauges(Predicate filter) @Override public double getMeter(String name, Rate value) { - return getRate(metricsRegistry.getMeters().get(name), value); + return getRate((Meter) metricsRegistry.getMetrics().get(name), value); } @Override public Map getMeters(Predicate filter, Rate rate) { Map values = new HashMap<>(); - for (Map.Entry e : metricsRegistry.getMeters().entrySet()) + for (Map.Entry e : metricsRegistry.getMetrics().entrySet()) { - if (filter.test(e.getKey())) - values.put(e.getKey(), getRate(e.getValue(), rate)); + if (e.getValue() instanceof Meter && filter.test(e.getKey())) + values.put(e.getKey(), getRate((Meter) e.getValue(), rate)); } return values; } diff --git a/test/distributed/org/apache/cassandra/distributed/test/auth/MutualTlsCertificateValidityPeriodTest.java b/test/distributed/org/apache/cassandra/distributed/test/auth/MutualTlsCertificateValidityPeriodTest.java index b2247b5b4b76..e49e7cb9bf6b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/auth/MutualTlsCertificateValidityPeriodTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/auth/MutualTlsCertificateValidityPeriodTest.java @@ -35,7 +35,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import com.codahale.metrics.Histogram; +import org.apache.cassandra.metrics.Histogram; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; diff --git a/test/microbench/org/apache/cassandra/test/microbench/MetersBench.java b/test/microbench/org/apache/cassandra/test/microbench/MetersBench.java new file mode 100644 index 000000000000..70081b8e0774 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/MetersBench.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongArray; + +import org.apache.cassandra.metrics.Meter; +import org.apache.cassandra.metrics.ThreadLocalMeter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 4, time = 10, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 8, time = 10, timeUnit = TimeUnit.SECONDS) +@Fork(value = 2, +jvmArgsAppend = { "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}) +@Threads(4) +@State(Scope.Benchmark) +public class MetersBench +{ + @Param({ "ThreadLocal", "Dropwizard"}) + private String type; + + @Param({"10"}) + private int metricsCount; + + @Param({"true"}) + private boolean polluteCpuCaches; + + private List meters; + + @Setup(Level.Trial) + public void setup() throws Throwable + { + meters = new ArrayList<>(metricsCount); + for (int i = 0; i < metricsCount; i++) + { + Meter meter; + switch (type) + { + case "ThreadLocal": + meter = new ThreadLocalMeter(); + break; + case "Dropwizard": + meter = new DropwizardMeter(); + break; + default: + throw new UnsupportedOperationException(); + } + meters.add(meter); + } + } + + private final AtomicLongArray anotherMemory = new AtomicLongArray(256 * 1024); + + @Setup(Level.Invocation) + public void polluteCpuCaches() { + if (polluteCpuCaches) + for (int i = 0; i < anotherMemory.length(); i++) + anotherMemory.incrementAndGet(i); + } + + @Benchmark + public void mark() { + for (Meter meter : meters) + meter.mark(); + } + + private static class DropwizardMeter implements Meter { + private final com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter(); + + @Override + public void mark() + { + meter.mark(); + } + + @Override + public void mark(long n) + { + meter.mark(n); + } + + @Override + public long getCount() + { + return meter.getCount(); + } + + @Override + public double getFifteenMinuteRate() + { + return meter.getFifteenMinuteRate(); + } + + @Override + public double getFiveMinuteRate() + { + return meter.getFiveMinuteRate(); + } + + @Override + public double getMeanRate() + { + return meter.getMeanRate(); + } + + @Override + public double getOneMinuteRate() + { + return meter.getOneMinuteRate(); + } + } +} diff --git a/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java new file mode 100644 index 000000000000..f03662863ca6 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.cassandra.metrics.Counter; +import org.apache.cassandra.metrics.ThreadLocalCounter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 8, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 2, + jvmArgsAppend = { "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}) +@Threads(4) +@State(Scope.Benchmark) +public class ThreadLocalMetricsBench +{ + @Param({"LongAdder", "PlainArray"}) + private String type; + + @Param({"50", "100"}) + private int metricsCount; + + private List counters; + + + @Setup(Level.Trial) + public void setup() throws Throwable + { + counters = new ArrayList<>(metricsCount); + for (int i = 0; i < metricsCount; i++) + { + Counter counter; + switch (type) + { + case "LongAdder": + counter = new LongAdderCounter(); + break; + case "PlainArray": + counter = new ThreadLocalCounter(); + break; + default: + throw new UnsupportedOperationException(); + } + counters.add(counter); + } + } + + private final AtomicLongArray anotherMemory = new AtomicLongArray(256 * 1024); + + @Setup(Level.Invocation) + public void polluteCpuCaches() { + for (int i = 0; i < anotherMemory.length(); i++) + anotherMemory.incrementAndGet(i); + } + + @Benchmark + public void increment() { + for (Counter counter : counters) + counter.inc(); + } + + public static class LongAdderCounter implements Counter + { + private final LongAdder counter = new LongAdder(); + @Override + public void inc() + { + counter.increment(); + } + + @Override + public void inc(long n) + { + counter.add(n); + } + + @Override + public void dec() + { + counter.decrement(); + } + + @Override + public void dec(long n) + { + counter.add(-n); + } + + @Override + public long getCount() + { + return counter.sum(); + } + } +} diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyMetricTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyMetricTest.java index 75e311e13a6f..e9bc2f4f0f65 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyMetricTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyMetricTest.java @@ -24,10 +24,7 @@ import org.junit.BeforeClass; import org.junit.Test; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistryListener; import com.codahale.metrics.Timer; import org.apache.cassandra.SchemaLoader; @@ -251,8 +248,9 @@ public void onGaugeRemoved(String name) } + // TODO: clarify how the test is affected @Override - public void onCounterAdded(String name, Counter counter) + public void onCounterAdded(String name, com.codahale.metrics.Counter counter) { counter.getCount(); } @@ -263,8 +261,9 @@ public void onCounterRemoved(String name) } + // TODO: clarify how the test is affected but switching to thread local metrics @Override - public void onHistogramAdded(String name, Histogram histogram) + public void onHistogramAdded(String name, com.codahale.metrics.Histogram histogram) { histogram.getCount(); } @@ -275,8 +274,9 @@ public void onHistogramRemoved(String name) } + // TODO: clarify how the test is affected but switching to thread local metrics @Override - public void onMeterAdded(String name, Meter meter) + public void onMeterAdded(String name, com.codahale.metrics.Meter meter) { meter.getCount(); } diff --git a/test/unit/org/apache/cassandra/db/virtual/BatchMetricsTableTest.java b/test/unit/org/apache/cassandra/db/virtual/BatchMetricsTableTest.java index ed187324bfb2..74d73b9e9bd3 100644 --- a/test/unit/org/apache/cassandra/db/virtual/BatchMetricsTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/BatchMetricsTableTest.java @@ -24,7 +24,7 @@ import org.junit.Before; import org.junit.Test; -import com.codahale.metrics.Histogram; +import org.apache.cassandra.metrics.Histogram; import com.codahale.metrics.Snapshot; import com.datastax.driver.core.ResultSet; import org.apache.cassandra.cql3.CQLTester; diff --git a/test/unit/org/apache/cassandra/metrics/CassandraMetricsRegistryTest.java b/test/unit/org/apache/cassandra/metrics/CassandraMetricsRegistryTest.java index b069d0330fd7..e32b2d604cca 100644 --- a/test/unit/org/apache/cassandra/metrics/CassandraMetricsRegistryTest.java +++ b/test/unit/org/apache/cassandra/metrics/CassandraMetricsRegistryTest.java @@ -35,9 +35,7 @@ import org.junit.Before; import org.junit.Test; -import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; -import com.codahale.metrics.Timer; import com.codahale.metrics.jvm.BufferPoolMetricSet; import com.codahale.metrics.jvm.GarbageCollectorMetricSet; import com.codahale.metrics.jvm.MemoryUsageGaugeSet; @@ -148,7 +146,7 @@ public void testDeltaHistogramSizeChange() public void testTimer() { long[] offsets = EstimatedHistogram.newOffsets(DecayingEstimatedHistogramReservoir.LOW_BUCKET_COUNT, false); - Timer timer = new Timer(CassandraMetricsRegistry.createReservoir(TimeUnit.MICROSECONDS)); + Timer timer = new ThreadLocalTimer(CassandraMetricsRegistry.createReservoir(TimeUnit.MICROSECONDS)); timer.update(42, TimeUnit.NANOSECONDS); timer.update(100, TimeUnit.NANOSECONDS); timer.update(42, TimeUnit.MICROSECONDS); diff --git a/test/unit/org/apache/cassandra/metrics/ClientMetricsTest.java b/test/unit/org/apache/cassandra/metrics/ClientMetricsTest.java index 0f1cd26f5051..0607521723b7 100644 --- a/test/unit/org/apache/cassandra/metrics/ClientMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/ClientMetricsTest.java @@ -24,7 +24,6 @@ import org.junit.Test; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.HostDistance; import com.datastax.driver.core.PoolingOptions; diff --git a/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java b/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java index 88060ffd2717..46334f3222f3 100644 --- a/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java @@ -24,7 +24,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import com.codahale.metrics.Histogram; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.Cluster; diff --git a/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java b/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java index f19fca526be7..dc2432ac4bc8 100644 --- a/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java @@ -26,8 +26,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; import com.codahale.metrics.Snapshot; import com.datastax.driver.core.QueryOptions; import org.apache.cassandra.cql3.CQLTester; diff --git a/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java b/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java index 6f8c8b982261..e7eb5da75ad8 100644 --- a/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java @@ -32,7 +32,6 @@ import org.junit.Test; import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricRegistry; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.virtual.CollectionVirtualTableAdapter; @@ -63,13 +62,12 @@ public static void setup() throws Exception public void beforeTest() { metricToNameMap.clear(); - MetricRegistry registry = new MetricRegistry(); - metricToNameMap.put(MetricType.METER, registry.meter("meter")); - metricToNameMap.put(MetricType.COUNTER, registry.counter("counter")); - metricToNameMap.put(MetricType.HISTOGRAM, registry.histogram("histogram")); - metricToNameMap.put(MetricType.TIMER, registry.timer("timer")); - metricToNameMap.put(MetricType.GAUGE, registry.gauge("gauge", () -> gaugeValue::get)); + metricToNameMap.put(MetricType.METER, CassandraMetricsRegistry.Metrics.meter("meter")); + metricToNameMap.put(MetricType.COUNTER, CassandraMetricsRegistry.Metrics.counter("counter")); + metricToNameMap.put(MetricType.HISTOGRAM, CassandraMetricsRegistry.Metrics.histogram("histogram")); + metricToNameMap.put(MetricType.TIMER, CassandraMetricsRegistry.Metrics.timer("timer")); + metricToNameMap.put(MetricType.GAUGE, CassandraMetricsRegistry.Metrics.gauge("gauge", () -> gaugeValue::get)); CassandraMetricsRegistry.metricGroups.forEach(group -> { MetricNameFactory factory = new DefaultNameFactory(group, "jmx.virtual"); diff --git a/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java b/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java new file mode 100644 index 000000000000..e94f67564d80 --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.LocalAwareExecutorPlus; + +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.junit.Assert.assertEquals; + +public class ThreadLocalCounterTest +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadLocalCounterTest.class); + + @Test + public void testLifecycleAndMultipleInstancesCreation() throws InterruptedException + { + final List> metricsPerIteration = new ArrayList<>(); + int METRICS_COUNT = 50; + int ITERATIONS_COUNT = 50; + long TASKS_COUNT = 100_000; + int THREADS = 10; + boolean DESTROY_COUNTERS_AT_THE_END_OF_ITERATION = true; + + for (int iteration = 0; iteration < ITERATIONS_COUNT; iteration++) + { + { + final List metrics = new ArrayList<>(); + for (int i = 0; i < METRICS_COUNT; i++) + metrics.add(new ThreadLocalCounter()); + metricsPerIteration.add(metrics); + } + + // note: these threads are FastThreadLocalThread instances + LocalAwareExecutorPlus executor = executorFactory() + .localAware() + .pooled("executor-" + iteration, THREADS); + + for (int i = 0; i < TASKS_COUNT; i++) + { + executor.submit(() -> { + for (List metricSet : metricsPerIteration) + for (Counter metric : metricSet) + metric.inc(); + }); + } + boolean allIncremented = false; + while (!allIncremented) + { + allIncremented = true; + for (int metricSetId = 0; metricSetId < metricsPerIteration.size(); metricSetId++) + for (Counter metric : metricsPerIteration.get(metricSetId)) + allIncremented &= TASKS_COUNT * (metricsPerIteration.size() - metricSetId) == metric.getCount(); + } + assertEquals(THREADS, ThreadLocalMetrics.getThreadLocalMetricsObjectsCount()); + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + for (int metricSetId = 0; metricSetId < metricsPerIteration.size(); metricSetId++) + for (Counter metric : metricsPerIteration.get(metricSetId)) + assertEquals(TASKS_COUNT * (metricsPerIteration.size() - metricSetId), metric.getCount()); + + if (DESTROY_COUNTERS_AT_THE_END_OF_ITERATION) + { + metricsPerIteration.clear(); + } + + LOGGER.info("id generator state: {}, free IDs: {}", + ThreadLocalMetrics.idGenerator.get(), + ThreadLocalMetrics.freeMetricIdSet); + LOGGER.info("iteration completed: {} / {}", iteration + 1, ITERATIONS_COUNT); + } + } + + @Test + public void testBasicOperations() + { + Counter counter = new ThreadLocalCounter(); + counter.inc(); + assertEquals(1, counter.getCount()); + counter.inc(15); + assertEquals(1 + 15, counter.getCount()); + counter.dec(10); + assertEquals(1 + 15 - 10, counter.getCount()); + counter.dec(); + assertEquals(1 + 15 - 10 - 1, counter.getCount()); + } +} diff --git a/test/unit/org/apache/cassandra/metrics/ThreadLocalHistogramTest.java b/test/unit/org/apache/cassandra/metrics/ThreadLocalHistogramTest.java new file mode 100644 index 000000000000..cf171587275c --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/ThreadLocalHistogramTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import org.junit.Assert; +import org.junit.Test; + +public class ThreadLocalHistogramTest +{ + @Test + public void testBasicOperations() + { + Histogram histogram = new ThreadLocalHistogram(new DecayingEstimatedHistogramReservoir()); + histogram.update(10); + Assert.assertEquals(1, histogram.getCount()); + histogram.update(20); + Assert.assertEquals(2, histogram.getCount()); + histogram.update(100); + Assert.assertEquals(3, histogram.getCount()); + } + + @Test + public void testReset() + { + ClearableHistogram histogram = new ClearableHistogram(new DecayingEstimatedHistogramReservoir()); + histogram.update(1); + histogram.update(1); + histogram.update(1); + Assert.assertEquals(3, histogram.getCount()); + histogram.reset(); + Assert.assertEquals(0, histogram.getCount()); + } +} diff --git a/test/unit/org/apache/cassandra/metrics/ThreadLocalMeterTest.java b/test/unit/org/apache/cassandra/metrics/ThreadLocalMeterTest.java new file mode 100644 index 000000000000..96ace60ce4c2 --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/ThreadLocalMeterTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +import com.codahale.metrics.Clock; +import org.apache.cassandra.Util; +import org.apache.cassandra.utils.MonotonicClock; +import org.apache.cassandra.utils.MonotonicClockTranslation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.offset; +import static org.junit.Assert.assertEquals; + +public class ThreadLocalMeterTest +{ + @Test + public void checkNoMark() + { + DeterministicClock clock = new DeterministicClock(0); + ThreadLocalMeter meter = new ThreadLocalMeter(clock); + com.codahale.metrics.Meter codahaleMeter = new com.codahale.metrics.Meter(clock); + clock.setTime(TimeUnit.SECONDS.toNanos(10)); + ThreadLocalMeter.tickAll(); + assertMeter(meter, codahaleMeter); + } + + @Test + public void constantRate() + { + DeterministicClock clock = new DeterministicClock(0); + ThreadLocalMeter meter = new ThreadLocalMeter(clock); + com.codahale.metrics.Meter codahaleMeter = new com.codahale.metrics.Meter(clock); + + long seconds = TimeUnit.MINUTES.toSeconds(15); + for (long i = 0; i < seconds; i++) + { + ThreadLocalMeter.tickAll(); + meter.mark(); + codahaleMeter.mark(); + clock.setTime(TimeUnit.SECONDS.toNanos(i + 1)); + } + + assertMeter(meter, codahaleMeter); + } + + @Test + public void marksEventsAndUpdatesRatesAndCount() + { + DeterministicClock clock = new DeterministicClock(0); + ThreadLocalMeter meter = new ThreadLocalMeter(clock); + com.codahale.metrics.Meter codahaleMeter = new com.codahale.metrics.Meter(clock); + + clock.setTime(TimeUnit.SECONDS.toNanos(10)); + ThreadLocalMeter.tickAll(); + meter.mark(); + meter.mark(2); + codahaleMeter.mark(); + codahaleMeter.mark(2); + + assertMeter(meter, codahaleMeter); + } + + @Test + public void pseudoRandomRateSimulation() + { + Random random = new Random(1); + DeterministicClock clock = new DeterministicClock(0); + ThreadLocalMeter meter = new ThreadLocalMeter(clock); + com.codahale.metrics.Meter codahaleMeter = new com.codahale.metrics.Meter(clock); + + int rounds = 1000; + for (int i = 0; i < rounds; i++) + { + long n = random.nextInt(); + ThreadLocalMeter.tickAll(); + meter.mark(n); + codahaleMeter.mark(n); + clock.setTime(clock.now() + random.nextInt()); + } + assertMeter(meter, codahaleMeter); + } + + @Test // CASSANDRA-19332 + public void testMaxTicks() + { + DeterministicClock clock = new DeterministicClock(0); + ThreadLocalMeter threadLocalMeter = new ThreadLocalMeter(clock); + clock.setTime(Long.MAX_VALUE); + threadLocalMeter.mark(Long.MAX_VALUE); + ThreadLocalMeter.tickAll(); + final long secondNanos = TimeUnit.SECONDS.toNanos(1); + assertEquals(threadLocalMeter.getOneMinuteRate(), Double.MIN_NORMAL * secondNanos, 0.0); + assertEquals(threadLocalMeter.getFiveMinuteRate(), Double.MIN_NORMAL * secondNanos, 0.0); + assertEquals(threadLocalMeter.getFifteenMinuteRate(), Double.MIN_NORMAL * secondNanos, 0.0); + } + + @Test + public void testAllocationAndDestroy() + { + DeterministicClock clock = new DeterministicClock(0); + Random random = new Random(42); + List meters = new ArrayList<>(); + for (int i = 0; i < 10_000; i++) + { + boolean create = random.nextBoolean(); + if (create) + { + MeterPair pair = new MeterPair(); + pair.meter = new ThreadLocalMeter(clock); + pair.codahaleMeter = new com.codahale.metrics.Meter(clock); + meters.add(pair); + } + else if (!meters.isEmpty()) + { + int meterToRemove = random.nextInt(meters.size()); + meters.remove(meterToRemove); + } + ThreadLocalMeter.tickAll(); + for (MeterPair meterPair : meters) + { + meterPair.meter.mark(); + meterPair.codahaleMeter.mark(); + assertMeter(meterPair.meter, meterPair.codahaleMeter); + } + // note: Random.nextLong(long) is not available in Java 11 + clock.setTime(clock.now() + random.nextInt((int)TimeUnit.SECONDS.toNanos(10))); + } + + int NUMBER_OF_COUNTERS_PER_METER = 2; + Util.spinAssertEquals(NUMBER_OF_COUNTERS_PER_METER * meters.size(), () -> { + System.gc(); // to trigger PhantomReferences queuing and recycle unused Meter thread local counters + return ThreadLocalMetrics.getAllocatedMetricsCount(); + }, 20); + + ThreadLocalMeter.tickAll(); // to force recycling of empty weak references + Assert.assertEquals(meters.size(), ThreadLocalMeter.getTickingMetersCount()); + Assert.assertEquals(1, ThreadLocalMetrics.getThreadLocalMetricsObjectsCount()); + + } + + private static class MeterPair + { + ThreadLocalMeter meter; + com.codahale.metrics.Meter codahaleMeter; + + } + + private static void assertMeter(ThreadLocalMeter checkingMeter, com.codahale.metrics.Meter standardMeter) + { + assertThat(checkingMeter.getCount()).isEqualTo(standardMeter.getCount()); + assertThat(checkingMeter.getMeanRate()).isEqualTo(standardMeter.getMeanRate(), offset(0.001)); + assertThat(checkingMeter.getOneMinuteRate()).isEqualTo(standardMeter.getOneMinuteRate(), offset(0.001)); + assertThat(checkingMeter.getFiveMinuteRate()).isEqualTo(standardMeter.getFiveMinuteRate(), offset(0.001)); + assertThat(checkingMeter.getFifteenMinuteRate()).isEqualTo(standardMeter.getFifteenMinuteRate(), offset(0.001)); + } + + private static class DeterministicClock extends Clock implements MonotonicClock + { + private volatile long tickNs; + + public DeterministicClock(long initialTime) + { + tickNs = initialTime; + } + + public void setTime(long tickNs) + { + this.tickNs = tickNs; + } + + @Override + public long getTick() + { + return tickNs; + } + + @Override + public long now() + { + return tickNs; + } + + @Override + public long error() + { + return 0; + } + + @Override + public MonotonicClockTranslation translate() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isAfter(long instant) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isAfter(long now, long instant) + { + throw new UnsupportedOperationException(); + } + } +} diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index 95c72f4bc0b3..2475fb07eb1e 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -46,7 +46,7 @@ import org.junit.BeforeClass; import org.junit.Test; -import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.Timer; import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; diff --git a/test/unit/org/apache/cassandra/test/asserts/ExtendedAssertions.java b/test/unit/org/apache/cassandra/test/asserts/ExtendedAssertions.java index fc6040e1b99f..8e1c8caf5783 100644 --- a/test/unit/org/apache/cassandra/test/asserts/ExtendedAssertions.java +++ b/test/unit/org/apache/cassandra/test/asserts/ExtendedAssertions.java @@ -18,9 +18,9 @@ package org.apache.cassandra.test.asserts; -import com.codahale.metrics.Counter; +import org.apache.cassandra.metrics.Counter; import com.codahale.metrics.Counting; -import com.codahale.metrics.Histogram; +import org.apache.cassandra.metrics.Histogram; import com.codahale.metrics.Snapshot; import org.assertj.core.api.AbstractObjectAssert; diff --git a/test/unit/org/apache/cassandra/tools/OfflineToolUtils.java b/test/unit/org/apache/cassandra/tools/OfflineToolUtils.java index 9a5ae8e8b88d..195eb7cf0d91 100644 --- a/test/unit/org/apache/cassandra/tools/OfflineToolUtils.java +++ b/test/unit/org/apache/cassandra/tools/OfflineToolUtils.java @@ -78,6 +78,7 @@ public abstract class OfflineToolUtils // and may still be active when we check "Attach Listener", // spawned in intellij IDEA "JNA Cleaner", // spawned by JNA + "ThreadLocalMetrics-Cleaner", }; static final String[] NON_DEFAULT_MEMTABLE_THREADS = diff --git a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java index 1e8e16a662ac..459bf8f3e27a 100644 --- a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java +++ b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import com.codahale.metrics.Meter; +import org.apache.cassandra.metrics.Meter; import com.google.common.base.Ticker; import org.awaitility.Awaitility; @@ -294,7 +294,12 @@ private void testThrowOnOverload(int payloadSize, SimpleClient client) protected static Meter getRequestDispatchedMeter() { String metricName = "org.apache.cassandra.metrics.Client.RequestDispatched"; - Map metrics = CassandraMetricsRegistry.Metrics.getMeters((name, metric) -> name.equals(metricName)); + Map metrics = CassandraMetricsRegistry.Metrics.getMetrics() + .entrySet() + .stream() + .filter((entry) -> entry.getKey().equals(metricName) && entry.getValue() instanceof Meter) + .collect(Collectors.toMap(Map.Entry::getKey, element -> (Meter) element.getValue())); + if (metrics.size() != 1) fail(String.format("Expected a single registered metric for request dispatched, found %s",metrics.size())); return metrics.get(metricName);