Skip to content

Commit 8896a70

Browse files
committed
Upgrade metrics library and remove depricated metrics
Patch by tjake; reviewed by aleksey for CASSANDRA-5657
1 parent bbb1592 commit 8896a70

File tree

82 files changed

+1532
-1746
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+1532
-1746
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
3.0
2+
* Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
23
* Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
34
* Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
45
* Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)

build.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@
368368

369369
<dependency groupId="org.apache.cassandra" artifactId="cassandra-all" version="${version}" />
370370
<dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift" version="${version}" />
371-
<dependency groupId="com.yammer.metrics" artifactId="metrics-core" version="2.2.0" />
371+
<dependency groupId="io.dropwizard.metrics" artifactId="metrics-core" version="3.1.0" />
372372
<dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
373373
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
374374
<dependency groupId="io.airlift" artifactId="airline" version="0.6" />
@@ -483,7 +483,7 @@
483483
<dependency groupId="com.boundary" artifactId="high-scale-lib"/>
484484
<dependency groupId="org.yaml" artifactId="snakeyaml"/>
485485
<dependency groupId="org.mindrot" artifactId="jbcrypt"/>
486-
<dependency groupId="com.yammer.metrics" artifactId="metrics-core"/>
486+
<dependency groupId="io.dropwizard.metrics" artifactId="metrics-core"/>
487487
<dependency groupId="com.addthis.metrics" artifactId="reporter-config"/>
488488
<dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.5"/>
489489
<dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />

lib/metrics-core-2.2.0.jar

-80.2 KB
Binary file not shown.

lib/metrics-core-3.1.0.jar

109 KB
Binary file not shown.

src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ public long getPendingTasks()
107107

108108
public int getTotalBlockedTasks()
109109
{
110-
return (int) metrics.totalBlocked.count();
110+
return (int) metrics.totalBlocked.getCount();
111111
}
112112

113113
public int getCurrentlyBlockedTasks()
114114
{
115-
return (int) metrics.currentBlocked.count();
115+
return (int) metrics.currentBlocked.getCount();
116116
}
117117

118118
public int getCoreThreads()

src/java/org/apache/cassandra/concurrent/JMXEnabledSharedExecutorPool.java

Lines changed: 0 additions & 113 deletions
This file was deleted.

src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor implements JMXEnabledThreadPoolExecutorMBean
3737
{
3838
private final String mbeanName;
39-
private final ThreadPoolMetrics metrics;
39+
public final ThreadPoolMetrics metrics;
4040

4141
public JMXEnabledThreadPoolExecutor(String threadPoolName)
4242
{
@@ -132,30 +132,17 @@ public synchronized List<Runnable> shutdownNow()
132132
return super.shutdownNow();
133133
}
134134

135-
/**
136-
* Get the number of completed tasks
137-
*/
138-
public long getCompletedTasks()
139-
{
140-
return getCompletedTaskCount();
141-
}
142135

143-
/**
144-
* Get the number of tasks waiting to be executed
145-
*/
146-
public long getPendingTasks()
147-
{
148-
return getTaskCount() - getCompletedTaskCount();
149-
}
136+
150137

151138
public int getTotalBlockedTasks()
152139
{
153-
return (int) metrics.totalBlocked.count();
140+
return (int) metrics.totalBlocked.getCount();
154141
}
155142

156143
public int getCurrentlyBlockedTasks()
157144
{
158-
return (int) metrics.currentBlocked.count();
145+
return (int) metrics.currentBlocked.getCount();
159146
}
160147

161148
public int getCoreThreads()

src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,9 @@
1717
*/
1818
package org.apache.cassandra.concurrent;
1919

20-
/**
21-
* @see org.apache.cassandra.metrics.ThreadPoolMetrics
22-
*/
23-
@Deprecated
24-
public interface JMXEnabledThreadPoolExecutorMBean extends IExecutorMBean
25-
{
26-
/**
27-
* Get the number of tasks that had blocked before being accepted (or
28-
* rejected).
29-
*/
30-
public int getTotalBlockedTasks();
31-
32-
/**
33-
* Get the number of tasks currently blocked, waiting to be accepted by
34-
* the executor (because all threads are busy and the backing queue is full).
35-
*/
36-
public int getCurrentlyBlockedTasks();
3720

21+
public interface JMXEnabledThreadPoolExecutorMBean
22+
{
3823
/**
3924
* Returns core pool size of thread pool.
4025
*/

src/java/org/apache/cassandra/concurrent/SEPExecutor.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.atomic.AtomicInteger;
2525
import java.util.concurrent.atomic.AtomicLong;
2626

27+
import org.apache.cassandra.metrics.SEPMetrics;
2728
import org.apache.cassandra.utils.concurrent.SimpleCondition;
2829
import org.apache.cassandra.utils.concurrent.WaitQueue;
2930

@@ -35,6 +36,7 @@ public class SEPExecutor extends AbstractTracingAwareExecutorService
3536

3637
public final int maxWorkers;
3738
private final int maxTasksQueued;
39+
private final SEPMetrics metrics;
3840

3941
// stores both a set of work permits and task permits:
4042
// bottom 32 bits are number of queued tasks, in the range [0..maxTasksQueued] (initially 0)
@@ -43,8 +45,6 @@ public class SEPExecutor extends AbstractTracingAwareExecutorService
4345

4446
// producers wait on this when there is no room on the queue
4547
private final WaitQueue hasRoom = new WaitQueue();
46-
private final AtomicLong totalBlocked = new AtomicLong();
47-
private final AtomicInteger currentlyBlocked = new AtomicInteger();
4848
private final AtomicLong completedTasks = new AtomicLong();
4949

5050
volatile boolean shuttingDown = false;
@@ -53,12 +53,13 @@ public class SEPExecutor extends AbstractTracingAwareExecutorService
5353
// TODO: see if other queue implementations might improve throughput
5454
protected final ConcurrentLinkedQueue<FutureTask<?>> tasks = new ConcurrentLinkedQueue<>();
5555

56-
SEPExecutor(SharedExecutorPool pool, int maxWorkers, int maxTasksQueued)
56+
SEPExecutor(SharedExecutorPool pool, int maxWorkers, int maxTasksQueued, String jmxPath, String name)
5757
{
5858
this.pool = pool;
5959
this.maxWorkers = maxWorkers;
6060
this.maxTasksQueued = maxTasksQueued;
6161
this.permits.set(combine(0, maxWorkers));
62+
this.metrics = new SEPMetrics(this, jmxPath, name);
6263
}
6364

6465
protected void onCompletion()
@@ -116,10 +117,11 @@ else if (taskPermits >= maxTasksQueued)
116117
// if we're blocking, we might as well directly schedule a worker if we aren't already at max
117118
if (takeWorkPermit(true))
118119
pool.schedule(new Work(this));
119-
totalBlocked.incrementAndGet();
120-
currentlyBlocked.incrementAndGet();
120+
121+
metrics.totalBlocked.inc();
122+
metrics.currentBlocked.inc();
121123
s.awaitUninterruptibly();
122-
currentlyBlocked.decrementAndGet();
124+
metrics.currentBlocked.dec();
123125
}
124126
else // don't propagate our signal when we cancel, just cancel
125127
s.cancel();
@@ -207,6 +209,9 @@ public synchronized void shutdown()
207209
pool.executors.remove(this);
208210
if (getActiveCount() == 0)
209211
shutdown.signalAll();
212+
213+
// release metrics
214+
metrics.release();
210215
}
211216

212217
public synchronized List<Runnable> shutdownNow()
@@ -249,21 +254,6 @@ public int getActiveCount()
249254
return maxWorkers - workPermits(permits.get());
250255
}
251256

252-
public int getTotalBlockedTasks()
253-
{
254-
return (int) totalBlocked.get();
255-
}
256-
257-
public int getMaximumThreads()
258-
{
259-
return maxWorkers;
260-
}
261-
262-
public int getCurrentlyBlockedTasks()
263-
{
264-
return currentlyBlocked.get();
265-
}
266-
267257
private static int taskPermits(long both)
268258
{
269259
return (int) both;

src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
public class SharedExecutorPool
5555
{
5656

57+
public static final SharedExecutorPool SHARED = new SharedExecutorPool("SharedPool");
58+
5759
// the name assigned to workers in the pool, and the id suffix
5860
final String poolName;
5961
final AtomicLong workerId = new AtomicLong();
@@ -100,4 +102,11 @@ void maybeStartSpinningWorker()
100102
if (current == 0 && spinningCount.compareAndSet(0, 1))
101103
schedule(Work.SPINNING);
102104
}
105+
106+
public TracingAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
107+
{
108+
SEPExecutor executor = new SEPExecutor(this, maxConcurrency, maxQueuedTasks, jmxPath, name);
109+
executors.add(executor);
110+
return executor;
111+
}
103112
}

src/java/org/apache/cassandra/concurrent/Stage.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
*/
1818
package org.apache.cassandra.concurrent;
1919

20+
import java.util.Arrays;
21+
22+
import com.google.common.base.Predicate;
23+
import com.google.common.collect.Iterables;
24+
2025
public enum Stage
2126
{
2227
READ,
@@ -31,6 +36,17 @@ public enum Stage
3136
INTERNAL_RESPONSE,
3237
READ_REPAIR;
3338

39+
public static Iterable<Stage> jmxEnabledStages()
40+
{
41+
return Iterables.filter(Arrays.asList(values()), new Predicate<Stage>()
42+
{
43+
public boolean apply(Stage stage)
44+
{
45+
return stage != TRACING;
46+
}
47+
});
48+
}
49+
3450
public String getJmxType()
3551
{
3652
switch (this)

src/java/org/apache/cassandra/concurrent/StageManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int
8989

9090
private static TracingAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads)
9191
{
92-
return JMXEnabledSharedExecutorPool.SHARED.newExecutor(numThreads, Integer.MAX_VALUE, stage.getJmxName(), stage.getJmxType());
92+
return SharedExecutorPool.SHARED.newExecutor(numThreads, Integer.MAX_VALUE, stage.getJmxType(), stage.getJmxName());
9393
}
9494

9595
/**

0 commit comments

Comments
 (0)