Skip to content

Commit b08da36

Browse files
committed
Change the eager reference counting of compression dictionaries to lazy
patch by Stefan Miklosovic; reviewed by Yifan Cai, Jyothsna Konisa for CASSANDRA-21074
1 parent f0ae1af commit b08da36

File tree

6 files changed

+75
-20
lines changed

6 files changed

+75
-20
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Change the eager reference counting of compression dictionaries to lazy (CASSANDRA-21074)
23
* Add cursor based optimized compaction path (CASSANDRA-20918)
34
* Ensure peers with LEFT status are expired from gossip state (CASSANDRA-21035)
45
* Optimize UTF8Validator.validate for ASCII prefixed Strings (CASSANDRA-21075)

src/java/org/apache/cassandra/db/compression/CompressionDictionary.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,16 @@
3636
import org.apache.cassandra.utils.concurrent.Ref;
3737

3838
/**
39-
* Interface for compression dictionaries with reference-counted lifecycle management.
39+
* Interface for compression dictionaries with opt-in reference-counted lifecycle management.
40+
* <p>
41+
* Dictionaries can be used in two modes:
42+
* <ul>
43+
* <li><b>Lightweight mode</b>: No native resources allocated. Suitable for export, serialization,
44+
* or scenarios where only the raw dictionary bytes are needed.</li>
45+
* <li><b>Managed mode</b>: Native compression/decompression resources allocated on-demand and
46+
* managed via reference counting. Required for caching and active use.</li>
47+
* </ul>
48+
* Call {@link #initRefLazily()} or {@link #tryRef()} to transition from lightweight to managed mode.
4049
*
4150
* <h2>Reference Counting Model</h2>
4251
* Compression dictionaries hold native resources that must be explicitly managed. This interface
@@ -109,6 +118,13 @@ default Kind kind()
109118
return dictId().kind;
110119
}
111120

121+
/**
122+
* Returns a reference from lazily initialized reference counter.
123+
*
124+
* @return reference to this dictionary; once initialized, the reference is the same as self-reference
125+
*/
126+
Ref<? extends CompressionDictionary> initRefLazily();
127+
112128
/**
113129
* Try to acquire a new reference to this dictionary.
114130
* Returns null if the dictionary is already released.
@@ -125,9 +141,11 @@ default Kind kind()
125141
/**
126142
* Get the self-reference of this dictionary.
127143
* This is used to release the primary reference held by the cache.
144+
* Self-reference is initialized after initRefLazily or tryRef
128145
*
129-
* @return the self-reference
146+
* @return the self-reference or null if not yet initialized.
130147
*/
148+
@Nullable
131149
Ref<? extends CompressionDictionary> selfRef();
132150

133151
/**
@@ -137,9 +155,12 @@ default Kind kind()
137155
* This method is idempotent - calling it multiple times is safe and will only
138156
* release the self-reference once. Subsequent calls have no effect.
139157
* <p>
140-
* This method is typically used when creating a dictionary outside the cache
141-
* (e.g., in tests or temporary usage) and needing to clean it up. For dictionaries
142-
* managed by the cache, the cache's removal listener handles cleanup via
158+
* There is no need to call this method in the context of releasing references when an instance
159+
* is never put to a cache. That might be the case when we are constructing a dictionary object
160+
* just for the purpose of passing it to a user (e.g. on exporting and similar) where reference counting
161+
* is not necessary nor needed.
162+
* <p>
163+
* For dictionaries managed by the cache, the cache's removal listener handles cleanup via
143164
* {@code selfRef().release()}.
144165
*
145166
* @see #selfRef()
@@ -148,7 +169,9 @@ default Kind kind()
148169
@VisibleForTesting
149170
default void close()
150171
{
151-
selfRef().close();
172+
Ref<?> selfRef = selfRef();
173+
if (selfRef != null)
174+
selfRef.close();
152175
}
153176

154177
/**

src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.github.benmanes.caffeine.cache.RemovalCause;
3333
import org.apache.cassandra.config.DatabaseDescriptor;
3434
import org.apache.cassandra.db.compression.CompressionDictionary.DictId;
35+
import org.apache.cassandra.utils.concurrent.Ref;
3536

3637
/**
3738
* Manages caching and current dictionary state for compression dictionaries.
@@ -68,7 +69,9 @@ public CompressionDictionaryCache()
6869
{
6970
try
7071
{
71-
dictionary.selfRef().release();
72+
// The dictionary's selfRef should never be null when evicting from cache
73+
// but using close() to have better resiliency
74+
dictionary.close();
7275
}
7376
catch (Exception e)
7477
{
@@ -102,7 +105,14 @@ public void add(@Nullable CompressionDictionary compressionDictionary)
102105

103106
// Only update cache if not already in the cache
104107
DictId newDictId = compressionDictionary.dictId();
105-
cache.get(newDictId, id -> compressionDictionary);
108+
cache.get(newDictId, id -> {
109+
Ref<?> ref = compressionDictionary.initRefLazily();
110+
if (ref == null)
111+
{
112+
throw new IllegalStateException("Failed to acquire reference to compression dictionary");
113+
}
114+
return compressionDictionary;
115+
});
106116

107117
// Update current dictionary if we don't have one or the new one has a higher ID (newer)
108118
DictId currentId = currentDictId.get();

src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class ZstdCompressionDictionary implements CompressionDictionary, SelfRef
4343
// One ZstdDictDecompress and multiple ZstdDictCompress (per level) can be derived from the same raw dictionary content
4444
private final ConcurrentHashMap<Integer, ZstdDictCompress> zstdDictCompressPerLevel = new ConcurrentHashMap<>();
4545
private final AtomicReference<ZstdDictDecompress> dictDecompress = new AtomicReference<>();
46-
private final Ref<ZstdCompressionDictionary> selfRef;
46+
private volatile Ref<ZstdCompressionDictionary> selfRef;
4747

4848
@VisibleForTesting
4949
public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary)
@@ -58,7 +58,7 @@ public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary, int checks
5858
this.dictId = dictId;
5959
this.rawDictionary = rawDictionary;
6060
this.checksum = checksum;
61-
this.selfRef = new Ref<>(this, new Tidy(zstdDictCompressPerLevel, dictDecompress));
61+
this.selfRef = null;
6262
}
6363

6464
@Override
@@ -180,7 +180,7 @@ public ZstdDictDecompress dictionaryForDecompression()
180180
@Override
181181
public Ref<ZstdCompressionDictionary> tryRef()
182182
{
183-
return selfRef.tryRef();
183+
return initRefLazily().tryRef();
184184
}
185185

186186
@Override
@@ -192,11 +192,31 @@ public Ref<ZstdCompressionDictionary> selfRef()
192192
@Override
193193
public Ref<ZstdCompressionDictionary> ref()
194194
{
195-
return selfRef.ref();
195+
return initRefLazily().ref();
196+
}
197+
198+
@Override
199+
public Ref<ZstdCompressionDictionary> initRefLazily()
200+
{
201+
if (selfRef == null)
202+
{
203+
synchronized (this)
204+
{
205+
if (selfRef == null)
206+
{
207+
selfRef = new Ref<>(this, new Tidy(zstdDictCompressPerLevel, dictDecompress));
208+
}
209+
}
210+
}
211+
return selfRef;
196212
}
197213

198214
private void ensureNotReleased()
199215
{
216+
if (selfRef == null)
217+
throw new IllegalStateException("Dictionary ref is not initialized. " +
218+
"Call initRefLazily() or tryRef() first: " + dictId);
219+
200220
if (selfRef.globalCount() <= 0)
201221
throw new IllegalStateException("Dictionary has been released: " + dictId);
202222
}

test/unit/org/apache/cassandra/db/compression/CompressionDictionaryCacheTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import com.github.luben.zstd.ZstdDictTrainer;
3434
import org.apache.cassandra.config.DatabaseDescriptor;
35+
import org.apache.cassandra.utils.concurrent.Ref;
3536

3637
import static org.apache.cassandra.db.compression.CompressionDictionary.DictId;
3738
import static org.apache.cassandra.db.compression.CompressionDictionary.Kind;
@@ -427,7 +428,11 @@ private static void closeQuietly(ZstdCompressionDictionary resource)
427428
{
428429
try
429430
{
430-
resource.selfRef().release();
431+
Ref<ZstdCompressionDictionary> selfRef = resource.selfRef();
432+
if (selfRef != null && selfRef.globalCount() > 0)
433+
{
434+
selfRef.release();
435+
}
431436
}
432437
catch (Exception e)
433438
{

test/unit/org/apache/cassandra/db/compression/ZstdCompressionDictionaryTest.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public static void setUpClass()
6161
public void setUp()
6262
{
6363
dictionary = new ZstdCompressionDictionary(SAMPLE_DICT_ID, SAMPLE_DICT_DATA);
64+
dictionary.initRefLazily();
6465
}
6566

6667
@Test
@@ -81,9 +82,6 @@ public void testEqualsAndHashCode()
8182
assertThat(dictionary)
8283
.as("Dictionaries with different IDs should not be equal")
8384
.isNotEqualTo(differentIdDict);
84-
85-
dictionary2.selfRef().release();
86-
differentIdDict.selfRef().release();
8785
}
8886

8987
@Test
@@ -187,6 +185,8 @@ public void testDictionaryAccessWithoutReference()
187185
SAMPLE_DICT_DATA
188186
);
189187

188+
testDict.initRefLazily();
189+
190190
// Access some dictionaries first to initialize them
191191
testDict.dictionaryForCompression(3);
192192
testDict.dictionaryForDecompression();
@@ -260,7 +260,6 @@ public void testMultipleReferences()
260260
@Test
261261
public void testReferenceAfterClose()
262262
{
263-
// Release the self-reference
264263
dictionary.selfRef().release();
265264

266265
assertThatThrownBy(() -> dictionary.ref())
@@ -386,9 +385,6 @@ public void testSerializeDeserializeWithManager() throws Exception
386385
.as("Both deserializations should return identical dictionary")
387386
.isNotNull()
388387
.isEqualTo(dict2);
389-
390-
dict1.selfRef().release();
391-
dict2.selfRef().release();
392388
}
393389

394390
@Test

0 commit comments

Comments
 (0)