Skip to content

Commit 13d60ce

Browse files
committed
Use a virtual threads friendly pool with Jackson
1 parent 35fcd1d commit 13d60ce

File tree

3 files changed

+344
-1
lines changed

3 files changed

+344
-1
lines changed
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
package io.vertx.core.json.jackson;
2+
3+
import java.lang.invoke.MethodHandle;
4+
import java.lang.invoke.MethodHandles;
5+
import java.lang.invoke.MethodType;
6+
import java.util.concurrent.atomic.AtomicReferenceArray;
7+
import java.util.function.Predicate;
8+
import java.util.stream.IntStream;
9+
10+
import com.fasterxml.jackson.core.util.BufferRecycler;
11+
import com.fasterxml.jackson.core.util.JsonRecyclerPools;
12+
import com.fasterxml.jackson.core.util.RecyclerPool;
13+
14+
/**
15+
* This is a custom implementation of the Jackson's {@link RecyclerPool} intended to work equally well with both
16+
* platform and virtual threads. This pool works regardless of the version of the JVM in use and internally uses
17+
* 2 distinct pools one for platform threads (which is exactly the same {@link ThreadLocal} based one provided
18+
* by Jackson out of the box) and the other designed for being virtual threads friendly. It switches between
19+
* the 2 only depending on the nature of thread (virtual or not) requiring the acquisition of a pooled resource,
20+
* obtained via {@link MethodHandle} to guarantee compatibility also with old JVM versions. The pool also guarantees
21+
* that the pooled resource is always released to the same internal pool from where it has been acquired, regardless
22+
* if the releasing thread is different from the one that originally made the acquisition.
23+
* <p>
24+
* The virtual thread friendly inner pool is implemented with N striped linked lists using a simple lock free
25+
* algorithm based on CAS. The striping is performed shuffling the id of the thread requiring to acquire a pooled
26+
* resource with a xorshift based computation. The resulting of this computation is also stored in the pooled resource,
27+
* bringing the twofold advantage of always releasing the resource in the same bucket from where it has been taken
28+
* regardless if the releasing thread is different from the one that did the acquisition and avoiding the need of
29+
* recalculating the position of that bucket also during the release. The heads of the linked lists are hold in an
30+
* {@link AtomicReferenceArray} where each head has a distance of 16 positions from the adjacent ones to prevent
31+
* the false sharing problem.
32+
*/
33+
public class HybridJacksonPool implements RecyclerPool<BufferRecycler> {
34+
35+
private static final HybridJacksonPool INSTANCE = new HybridJacksonPool();
36+
37+
private static final Predicate<Thread> isVirtual = VirtualPredicate.findIsVirtualPredicate();
38+
39+
private final RecyclerPool<BufferRecycler> nativePool = JsonRecyclerPools.threadLocalPool();
40+
41+
private static class VirtualPoolHolder {
42+
// Lazy on-demand initialization
43+
private static final StripedLockFreePool virtualPool = new StripedLockFreePool(Runtime.getRuntime().availableProcessors());
44+
}
45+
46+
private HybridJacksonPool() {
47+
// prevent external instantiation
48+
}
49+
50+
public static HybridJacksonPool getInstance() {
51+
return INSTANCE;
52+
}
53+
54+
@Override
55+
public BufferRecycler acquirePooled() {
56+
return isVirtual.test(Thread.currentThread()) ?
57+
VirtualPoolHolder.virtualPool.acquirePooled() :
58+
nativePool.acquirePooled();
59+
}
60+
61+
@Override
62+
public void releasePooled(BufferRecycler bufferRecycler) {
63+
if (bufferRecycler instanceof VThreadBufferRecycler) {
64+
// if it is a PooledBufferRecycler it has been acquired by a virtual thread, so it has to be release to the same pool
65+
VirtualPoolHolder.virtualPool.releasePooled(bufferRecycler);
66+
}
67+
// the native thread pool is based on ThreadLocal, so it doesn't have anything to do on release
68+
}
69+
70+
static class StripedLockFreePool implements RecyclerPool<BufferRecycler> {
71+
72+
private static final int CACHE_LINE_SHIFT = 4;
73+
74+
private static final int CACHE_LINE_PADDING = 1 << CACHE_LINE_SHIFT;
75+
76+
private final XorShiftThreadProbe threadProbe;
77+
78+
private final AtomicReferenceArray<Node> topStacks;
79+
80+
private final int stripesCount;
81+
82+
public StripedLockFreePool(int stripesCount) {
83+
if (stripesCount <= 0) {
84+
throw new IllegalArgumentException("Expecting a stripesCount that is larger than 0");
85+
}
86+
87+
this.stripesCount = stripesCount;
88+
int size = roundToPowerOfTwo(stripesCount);
89+
this.topStacks = new AtomicReferenceArray<>(size * CACHE_LINE_PADDING);
90+
91+
int mask = (size - 1) << CACHE_LINE_SHIFT;
92+
this.threadProbe = new XorShiftThreadProbe(mask);
93+
}
94+
95+
public int size() {
96+
return stackSizes().sum();
97+
}
98+
99+
public int[] stackStats() {
100+
return stackSizes().toArray();
101+
}
102+
103+
private IntStream stackSizes() {
104+
return IntStream.range(0, stripesCount).map(i -> {
105+
Node node = topStacks.get(i * CACHE_LINE_PADDING);
106+
return node == null ? 0 : node.level;
107+
});
108+
}
109+
110+
@Override
111+
public BufferRecycler acquirePooled() {
112+
int index = threadProbe.index();
113+
114+
Node currentHead = topStacks.get(index);
115+
while (true) {
116+
if (currentHead == null) {
117+
return new VThreadBufferRecycler(index);
118+
}
119+
120+
if (topStacks.compareAndSet(index, currentHead, currentHead.next)) {
121+
currentHead.next = null;
122+
return currentHead.value;
123+
} else {
124+
currentHead = topStacks.get(index);
125+
}
126+
}
127+
}
128+
129+
@Override
130+
public void releasePooled(BufferRecycler recycler) {
131+
VThreadBufferRecycler vThreadBufferRecycler = (VThreadBufferRecycler) recycler;
132+
Node newHead = new Node(vThreadBufferRecycler);
133+
134+
Node next = topStacks.get(vThreadBufferRecycler.slot);
135+
while (true) {
136+
newHead.level = next == null ? 1 : next.level + 1;
137+
if (topStacks.compareAndSet(vThreadBufferRecycler.slot, next, newHead)) {
138+
newHead.next = next;
139+
return;
140+
} else {
141+
next = topStacks.get(vThreadBufferRecycler.slot);
142+
}
143+
}
144+
}
145+
146+
private static class Node {
147+
final VThreadBufferRecycler value;
148+
Node next;
149+
int level = 0;
150+
151+
Node(VThreadBufferRecycler value) {
152+
this.value = value;
153+
}
154+
}
155+
}
156+
157+
private static class VThreadBufferRecycler extends BufferRecycler {
158+
private final int slot;
159+
160+
VThreadBufferRecycler(int slot) {
161+
this.slot = slot;
162+
}
163+
}
164+
165+
private static class VirtualPredicate {
166+
private static final MethodHandle virtualMh = findVirtualMH();
167+
168+
private static MethodHandle findVirtualMH() {
169+
try {
170+
return MethodHandles.publicLookup().findVirtual(Thread.class, "isVirtual",
171+
MethodType.methodType(boolean.class));
172+
} catch (Exception e) {
173+
return null;
174+
}
175+
}
176+
177+
private static Predicate<Thread> findIsVirtualPredicate() {
178+
if (virtualMh != null) {
179+
return new Predicate<Thread>() {
180+
@Override
181+
public boolean test(Thread thread) {
182+
try {
183+
return (boolean) virtualMh.invokeExact(thread);
184+
} catch (Throwable e) {
185+
throw new RuntimeException(e);
186+
}
187+
}
188+
};
189+
}
190+
191+
return new Predicate<Thread>() {
192+
@Override
193+
public boolean test(Thread thread) {
194+
return false;
195+
}
196+
};
197+
}
198+
}
199+
200+
/**
201+
* This class is used to hash the thread requiring a pooled resource using a multiplicative
202+
* Fibonacci hashing implementation. The resulting hash is then used to calculate the
203+
* index of the bucket in the pool from where the pooled resource has to be retrieved.
204+
*/
205+
private static class XorShiftThreadProbe {
206+
207+
private final int mask;
208+
209+
XorShiftThreadProbe(int mask) {
210+
this.mask = mask;
211+
}
212+
213+
public int index() {
214+
return probe() & mask;
215+
}
216+
217+
private int probe() {
218+
// Multiplicative Fibonacci hashing implementation
219+
// 0x9e3779b9 is the integral part of the Golden Ratio's fractional part 0.61803398875… (sqrt(5)-1)/2
220+
// multiplied by 2^32, which has the best possible scattering properties.
221+
int probe = (int) ((Thread.currentThread().getId() * 0x9e3779b9) & Integer.MAX_VALUE);
222+
// xorshift
223+
probe ^= probe << 13;
224+
probe ^= probe >>> 17;
225+
probe ^= probe << 5;
226+
return probe;
227+
}
228+
}
229+
230+
private static final int MAX_POW2 = 1 << 30;
231+
232+
private static int roundToPowerOfTwo(final int value) {
233+
if (value > MAX_POW2) {
234+
throw new IllegalArgumentException(
235+
"There is no larger power of 2 int for value:" + value + " since it exceeds 2^31.");
236+
}
237+
if (value < 0) {
238+
throw new IllegalArgumentException("Given value:" + value + ". Expecting value >= 0.");
239+
}
240+
final int nextPow2 = 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
241+
return nextPow2;
242+
}
243+
}

src/main/java/io/vertx/core/json/jackson/JacksonCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
*/
5454
public class JacksonCodec implements JsonCodec {
5555

56-
private static final JsonFactory factory = new JsonFactory();
56+
private static final JsonFactory factory = JsonFactory.builder().recyclerPool(HybridJacksonPool.getInstance()).build();
5757

5858
static {
5959
// Non-standard JSON but we allow C style comments in our JSON
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package io.vertx.core.json.jackson;
2+
3+
import java.lang.invoke.MethodHandle;
4+
import java.lang.invoke.MethodHandles;
5+
import java.lang.invoke.MethodType;
6+
import java.util.concurrent.CountDownLatch;
7+
8+
import com.fasterxml.jackson.core.util.BufferRecycler;
9+
import org.junit.Assume;
10+
import org.junit.Test;
11+
12+
import static org.junit.Assert.assertEquals;
13+
import static org.junit.Assert.assertSame;
14+
import static org.junit.Assert.assertTrue;
15+
16+
public class HybridJacksonPoolTest {
17+
18+
@Test
19+
public void testVirtualThreadPoolWithSingleThread() {
20+
HybridJacksonPool.StripedLockFreePool virtualPool = new HybridJacksonPool.StripedLockFreePool(4);
21+
BufferRecycler pooledResource = virtualPool.acquirePooled();
22+
assertEquals(0, virtualPool.size());
23+
virtualPool.releasePooled(pooledResource);
24+
assertEquals(1, virtualPool.size());
25+
26+
// The same thread should get the same pooled resource
27+
assertSame(pooledResource, virtualPool.acquirePooled());
28+
assertEquals(0, virtualPool.size());
29+
}
30+
31+
@Test
32+
public void testVirtualThreadPoolWithMultipleThreads() {
33+
// this test can run only on a jdk version that supports virtual threads
34+
Assume.assumeTrue(VirtualThreadRunner.hasVirtualThread());
35+
36+
int stripesCount = 4;
37+
HybridJacksonPool.StripedLockFreePool virtualPool = new HybridJacksonPool.StripedLockFreePool(stripesCount);
38+
int nThreads = 100;
39+
BufferRecycler[] resources = new BufferRecycler[nThreads];
40+
CountDownLatch latch = new CountDownLatch(nThreads);
41+
42+
for (int i = 0; i < nThreads; i++) {
43+
int threadIndex = i;
44+
VirtualThreadRunner.runOnVirtualThread(() -> {
45+
resources[threadIndex] = virtualPool.acquirePooled();
46+
latch.countDown();
47+
});
48+
}
49+
50+
try {
51+
latch.await();
52+
} catch (InterruptedException e) {
53+
throw new RuntimeException(e);
54+
}
55+
assertEquals(0, virtualPool.size());
56+
57+
for (int i = 0; i < nThreads; i++) {
58+
virtualPool.releasePooled(resources[i]);
59+
}
60+
61+
// check that all resources have been released back to the pool
62+
assertEquals(nThreads, virtualPool.size());
63+
64+
int avgResourcesNrPerStripe = nThreads / stripesCount;
65+
int minResourcesNrPerStripe = avgResourcesNrPerStripe / 2;
66+
int maxResourcesNrPerStripe = avgResourcesNrPerStripe * 2;
67+
68+
// check that all the stripes in the pool are reasonably balanced
69+
int[] poolStats = virtualPool.stackStats();
70+
for (int i = 0; i < stripesCount; i++) {
71+
assertTrue(poolStats[i] >= minResourcesNrPerStripe);
72+
assertTrue(poolStats[i] <= maxResourcesNrPerStripe);
73+
}
74+
}
75+
76+
private static class VirtualThreadRunner {
77+
static final MethodHandle virtualMh = findVirtualMH();
78+
79+
static MethodHandle findVirtualMH() {
80+
try {
81+
return MethodHandles.publicLookup().findStatic(Thread.class, "startVirtualThread",
82+
MethodType.methodType(Thread.class, Runnable.class));
83+
} catch (Exception e) {
84+
return null;
85+
}
86+
}
87+
88+
static boolean hasVirtualThread() {
89+
return virtualMh != null;
90+
}
91+
92+
static void runOnVirtualThread(Runnable runnable) {
93+
try {
94+
VirtualThreadRunner.virtualMh.invoke(runnable);
95+
} catch (Throwable e) {
96+
throw new RuntimeException(e);
97+
}
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)