Skip to content

Commit fff2950

Browse files
authored
Support --only-ticks-over with async-profiler (#470)
1 parent 7222735 commit fff2950

File tree

8 files changed

+338
-20
lines changed

8 files changed

+338
-20
lines changed

spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -112,22 +112,21 @@ public Sampler start(SparkPlatform platform) throws UnsupportedOperationExceptio
112112
throw new IllegalArgumentException("samplingInterval = " + this.samplingInterval);
113113
}
114114

115-
boolean canUseAsyncProfiler = AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
115+
AsyncProfilerAccess asyncProfiler = AsyncProfilerAccess.getInstance(platform);
116+
116117
boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null;
118+
boolean canUseAsyncProfiler = asyncProfiler.checkSupported(platform) && (!onlyTicksOverMode || platform.getTickReporter() != null);
117119

118120
if (this.mode == SamplerMode.ALLOCATION) {
119-
if (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform)) {
121+
if (!canUseAsyncProfiler || !asyncProfiler.checkAllocationProfilingSupported(platform)) {
120122
throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info.");
121123
}
122124
if (this.ignoreSleeping) {
123125
platform.getPlugin().log(Level.WARNING, "Ignoring sleeping threads is not supported in allocation profiling mode. Sleeping threads will be included in the results.");
124126
}
125-
if (onlyTicksOverMode) {
126-
platform.getPlugin().log(Level.WARNING, "'Only-ticks-over' is not supported in allocation profiling mode.");
127-
}
128127
}
129128

130-
if (onlyTicksOverMode || this.forceJavaSampler) {
129+
if (this.forceJavaSampler) {
131130
canUseAsyncProfiler = false;
132131
}
133132

@@ -139,14 +138,17 @@ public Sampler start(SparkPlatform platform) throws UnsupportedOperationExceptio
139138
SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper.get(), this.autoEndTime, this.background, this.ignoreSleeping);
140139

141140
Sampler sampler;
142-
if (this.mode == SamplerMode.ALLOCATION) {
143-
sampler = new AsyncSampler(platform, settings, new SampleCollector.Allocation(interval, this.allocLiveOnly));
144-
} else if (canUseAsyncProfiler) {
145-
sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval));
146-
} else if (onlyTicksOverMode) {
147-
sampler = new JavaSampler(platform, settings, this.tickHook, this.ticksOver);
141+
if (canUseAsyncProfiler) {
142+
SampleCollector<?> collector = this.mode == SamplerMode.ALLOCATION
143+
? new SampleCollector.Allocation(interval, this.allocLiveOnly)
144+
: new SampleCollector.Execution(interval);
145+
sampler = onlyTicksOverMode
146+
? new AsyncSampler(platform, settings, collector, this.ticksOver)
147+
: new AsyncSampler(platform, settings, collector);
148148
} else {
149-
sampler = new JavaSampler(platform, settings);
149+
sampler = onlyTicksOverMode
150+
? new JavaSampler(platform, settings, this.tickHook, this.ticksOver)
151+
: new JavaSampler(platform, settings);
150152
}
151153

152154
sampler.start();

spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
/**
3030
* Data aggregator for {@link AsyncSampler}.
3131
*/
32-
public class AsyncDataAggregator extends AbstractDataAggregator {
32+
public class AsyncDataAggregator extends AbstractDataAggregator implements AutoCloseable {
3333

3434
/** A describer for async-profiler stack trace elements. */
3535
private static final StackTraceNode.Describer<AsyncStackTraceElement> STACK_TRACE_DESCRIBER = (element, parent) ->
@@ -79,4 +79,8 @@ private static boolean isSleeping(ProfileSegment element) {
7979
return false;
8080
}
8181

82+
@Override
83+
public void close() {
84+
85+
}
8286
}

spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ static AsyncProfilerJob createNew(AsyncProfilerAccess access, AsyncProfiler prof
8484
private int window;
8585
/** If the profiler should run in quiet mode */
8686
private boolean quiet;
87+
/** If the profiler needs to use the same clock as {@link System#nanoTime()} */
88+
private boolean forceNanoTime;
8789

8890
/** The file used by async-profiler to output data */
8991
private Path outputFile;
@@ -117,12 +119,13 @@ private void checkActive() {
117119
}
118120

119121
// Initialise the job
120-
public void init(SparkPlatform platform, SampleCollector<?> collector, ThreadDumper threadDumper, int window, boolean quiet) {
122+
public void init(SparkPlatform platform, SampleCollector<?> collector, ThreadDumper threadDumper, int window, boolean quiet, boolean forceNanoTime) {
121123
this.platform = platform;
122124
this.sampleCollector = collector;
123125
this.threadDumper = threadDumper;
124126
this.window = window;
125127
this.quiet = quiet;
128+
this.forceNanoTime = forceNanoTime;
126129
}
127130

128131
/**
@@ -151,6 +154,9 @@ public void start() {
151154
if (this.threadDumper instanceof ThreadDumper.Specific) {
152155
command.add("filter");
153156
}
157+
if (this.forceNanoTime) {
158+
command.add("clock=monotonic");
159+
}
154160

155161
// start the profiler
156162
String resp = execute(command.build()).trim();

spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import me.lucko.spark.common.sampler.SamplerSettings;
2828
import me.lucko.spark.common.sampler.SamplerType;
2929
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
30+
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
3031
import me.lucko.spark.common.tick.TickHook;
3132
import me.lucko.spark.common.util.SparkThreadFactory;
3233
import me.lucko.spark.common.ws.ViewerSocket;
@@ -53,6 +54,9 @@ public class AsyncSampler extends AbstractSampler {
5354
/** Responsible for aggregating and then outputting collected sampling data */
5455
private final AsyncDataAggregator dataAggregator;
5556

57+
/** Whether to force the sampler to use monotonic/nano time */
58+
private final boolean forceNanoTime;
59+
5660
/** Mutex for the current profiler job */
5761
private final Object[] currentJobMutex = new Object[0];
5862

@@ -66,10 +70,19 @@ public class AsyncSampler extends AbstractSampler {
6670
private ScheduledFuture<?> socketStatisticsTask;
6771

6872
public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector) {
73+
this(platform, settings, collector, new AsyncDataAggregator(settings.threadGrouper(), settings.ignoreSleeping()), false);
74+
}
75+
76+
public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector, int tickLengthThreshold) {
77+
this(platform, settings, collector, new TickedAsyncDataAggregator(settings.threadGrouper(), settings.ignoreSleeping(), platform.getTickReporter(), tickLengthThreshold), true);
78+
}
79+
80+
private AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector, AsyncDataAggregator dataAggregator, boolean forceNanoTime) {
6981
super(platform, settings);
7082
this.sampleCollector = collector;
83+
this.dataAggregator = dataAggregator;
84+
this.forceNanoTime = forceNanoTime;
7185
this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
72-
this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper(), settings.ignoreSleeping());
7386
this.scheduler = Executors.newSingleThreadScheduledExecutor(
7487
new ThreadFactoryBuilder()
7588
.setNameFormat("spark-async-sampler-worker-thread")
@@ -93,7 +106,7 @@ public void start() {
93106
int window = ProfilingWindowUtils.windowNow();
94107

95108
AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob();
96-
job.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background);
109+
job.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background, this.forceNanoTime);
97110
job.start();
98111
this.windowStatisticsCollector.recordWindowStartTime(window);
99112
this.currentJob = job;
@@ -131,7 +144,7 @@ private void rotateProfilerJob() {
131144
// start a new job
132145
int window = previousJob.getWindow() + 1;
133146
AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob();
134-
newJob.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background);
147+
newJob.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background, this.forceNanoTime);
135148
newJob.start();
136149
this.windowStatisticsCollector.recordWindowStartTime(window);
137150
this.currentJob = newJob;
@@ -204,6 +217,7 @@ public void stop(boolean cancelled) {
204217
this.scheduler.shutdown();
205218
this.scheduler = null;
206219
}
220+
this.dataAggregator.close();
207221
}
208222

209223
@Override
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* This file is part of spark.
3+
*
4+
* Copyright (c) lucko (Luck) <[email protected]>
5+
* Copyright (c) contributors
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
21+
package me.lucko.spark.common.sampler.async;
22+
23+
import me.lucko.spark.common.tick.TickReporter;
24+
25+
import java.util.Queue;
26+
import java.util.concurrent.ConcurrentLinkedQueue;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
import java.util.function.LongSupplier;
29+
30+
class ExceedingTicksFilter implements TickReporter.Callback {
31+
32+
/** The ticks that exceeded the threshold, cleared one-by-one when inserting data */
33+
private final Queue<ExceededTick> ticksOver = new ConcurrentLinkedQueue<>();
34+
35+
/** Counts the number of ticks aggregated */
36+
private final AtomicInteger tickCounter = new AtomicInteger();
37+
38+
/** Tick durations under this threshold will not be inserted, measured in milliseconds */
39+
private final int tickLengthThreshold;
40+
41+
/** The source to get the current nano time from */
42+
private final LongSupplier nanoTimeSource;
43+
44+
ExceedingTicksFilter(int tickLengthThreshold, LongSupplier nanoTimeSource) {
45+
this.tickLengthThreshold = tickLengthThreshold;
46+
this.nanoTimeSource = nanoTimeSource;
47+
}
48+
49+
public ExceedingTicksFilter(int tickLengthThreshold) {
50+
this(tickLengthThreshold, System::nanoTime);
51+
}
52+
53+
@Override
54+
public void onTick(double duration) {
55+
if (duration > this.tickLengthThreshold) {
56+
long end = this.nanoTimeSource.getAsLong();
57+
long start = (long) (end - (duration * 1_000_000)); // ms to ns
58+
this.ticksOver.add(new ExceededTick(start, end));
59+
this.tickCounter.getAndIncrement();
60+
}
61+
}
62+
63+
public int exceedingTicksCount() {
64+
return this.tickCounter.get();
65+
}
66+
67+
public boolean duringExceedingTick(long time) {
68+
while (true) {
69+
ExceededTick earliestExceeding = this.ticksOver.peek();
70+
if (earliestExceeding == null) {
71+
// no tick over threshold anymore
72+
return false;
73+
} else if (time - earliestExceeding.start < 0) {
74+
// segment happened before current exceeding
75+
return false;
76+
} else if (earliestExceeding.end - time < 0) {
77+
// segment happened after current exceeding,
78+
// but it might fall into the next one
79+
this.ticksOver.remove();
80+
} else {
81+
// segment falls exactly into exceeding, record it
82+
return true;
83+
}
84+
}
85+
}
86+
87+
private static final class ExceededTick {
88+
// times are in nanoseconds from System.nanoTime()
89+
private final long start;
90+
private final long end;
91+
92+
ExceededTick(long start, long end) {
93+
this.start = start;
94+
this.end = end;
95+
}
96+
}
97+
}

spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,16 @@ public class ProfileSegment {
4545
private final long value;
4646
/** The state of the thread. {@value #UNKNOWN_THREAD_STATE} if state is unknown */
4747
private final String threadState;
48+
/** The time at which this segment was recorded, as if it was produced by {@link System#nanoTime()} */
49+
private final long time;
4850

49-
private ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value, String threadState) {
51+
private ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value, String threadState, long time) {
5052
this.nativeThreadId = nativeThreadId;
5153
this.threadName = threadName;
5254
this.stackTrace = stackTrace;
5355
this.value = value;
5456
this.threadState = threadState;
57+
this.time = time;
5558
}
5659

5760
public int getNativeThreadId() {
@@ -74,6 +77,10 @@ public String getThreadState() {
7477
return this.threadState;
7578
}
7679

80+
public long getTime() {
81+
return this.time;
82+
}
83+
7784
public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event sample, String threadName, long value) {
7885
JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId);
7986
int len = stackTrace != null ? stackTrace.methods.length : 0;
@@ -90,7 +97,7 @@ public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event samp
9097
threadState = threadStateLookup.getOrDefault(executionSample.threadState, UNKNOWN_THREAD_STATE);
9198
}
9299

93-
return new ProfileSegment(sample.tid, threadName, stack, value, threadState);
100+
return new ProfileSegment(sample.tid, threadName, stack, value, threadState, sample.time);
94101
}
95102

96103
private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) {
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* This file is part of spark.
3+
*
4+
* Copyright (c) lucko (Luck) <[email protected]>
5+
* Copyright (c) contributors
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
21+
package me.lucko.spark.common.sampler.async;
22+
23+
import me.lucko.spark.common.sampler.ThreadGrouper;
24+
import me.lucko.spark.common.tick.TickReporter;
25+
import me.lucko.spark.proto.SparkSamplerProtos;
26+
27+
import java.util.concurrent.TimeUnit;
28+
29+
public class TickedAsyncDataAggregator extends AsyncDataAggregator {
30+
31+
/** The callback called when this aggregator is closed, to clean up resources */
32+
private final Runnable closeCallback;
33+
34+
/** Tick durations under this threshold will not be inserted, measured in milliseconds */
35+
private final long tickLengthThreshold;
36+
37+
private final ExceedingTicksFilter filter;
38+
39+
protected TickedAsyncDataAggregator(ThreadGrouper threadGrouper, boolean ignoreSleeping, TickReporter tickReporter, int tickLengthThreshold) {
40+
super(threadGrouper, ignoreSleeping);
41+
this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold);
42+
this.filter = new ExceedingTicksFilter(tickLengthThreshold);
43+
tickReporter.addCallback(this.filter);
44+
this.closeCallback = () -> tickReporter.removeCallback(this.filter);
45+
}
46+
47+
@Override
48+
public void insertData(ProfileSegment element, int window) {
49+
// with async-profiler clock=monotonic, the event time uses the same clock
50+
// as System.nanoTime(), so we can compare it directly
51+
long time = element.getTime();
52+
if (!this.filter.duringExceedingTick(time)) {
53+
return;
54+
}
55+
super.insertData(element, window);
56+
}
57+
58+
@Override
59+
public SparkSamplerProtos.SamplerMetadata.DataAggregator getMetadata() {
60+
return SparkSamplerProtos.SamplerMetadata.DataAggregator.newBuilder()
61+
.setType(SparkSamplerProtos.SamplerMetadata.DataAggregator.Type.TICKED)
62+
.setThreadGrouper(this.threadGrouper.asProto())
63+
.setTickLengthThreshold(this.tickLengthThreshold)
64+
.setNumberOfIncludedTicks(this.filter.exceedingTicksCount())
65+
.build();
66+
67+
}
68+
69+
@Override
70+
public void close() {
71+
this.closeCallback.run();
72+
}
73+
74+
}

0 commit comments

Comments
 (0)