@@ -17,8 +17,8 @@ use std::io::{Read, Write};
17
17
use std:: ops:: Range ;
18
18
19
19
use bytes:: { Buf , BufMut , Bytes , BytesMut } ;
20
- use lz4:: Decoder ;
21
20
use risingwave_hummock_sdk:: VersionedComparator ;
21
+ use { lz4, zstd} ;
22
22
23
23
use super :: utils:: {
24
24
bytes_diff, var_u32_len, xxhash64_verify, BufExt , BufMutExt , CompressionAlgorithm ,
@@ -45,17 +45,25 @@ impl Block {
45
45
46
46
// Decompress.
47
47
let compression = CompressionAlgorithm :: decode ( & mut & buf[ buf. len ( ) - 9 ..buf. len ( ) - 8 ] ) ?;
48
+ let compressed_data = & buf[ ..buf. len ( ) - 9 ] ;
48
49
let buf = match compression {
49
50
CompressionAlgorithm :: None => buf. slice ( ..buf. len ( ) - 9 ) ,
50
51
CompressionAlgorithm :: Lz4 => {
51
- let mut decoder = Decoder :: new ( buf. reader ( ) )
52
- . map_err ( HummockError :: decode_error)
53
- . unwrap ( ) ;
52
+ let mut decoder = lz4:: Decoder :: new ( compressed_data. reader ( ) )
53
+ . map_err ( HummockError :: decode_error) ?;
54
54
let mut decoded = Vec :: with_capacity ( DEFAULT_BLOCK_SIZE ) ;
55
55
decoder
56
56
. read_to_end ( & mut decoded)
57
- . map_err ( HummockError :: decode_error)
58
- . unwrap ( ) ;
57
+ . map_err ( HummockError :: decode_error) ?;
58
+ Bytes :: from ( decoded)
59
+ }
60
+ CompressionAlgorithm :: Zstd => {
61
+ let mut decoder = zstd:: Decoder :: new ( compressed_data. reader ( ) )
62
+ . map_err ( HummockError :: decode_error) ?;
63
+ let mut decoded = Vec :: with_capacity ( DEFAULT_BLOCK_SIZE ) ;
64
+ decoder
65
+ . read_to_end ( & mut decoded)
66
+ . map_err ( HummockError :: decode_error) ?;
59
67
Bytes :: from ( decoded)
60
68
}
61
69
} ;
@@ -298,6 +306,21 @@ impl BlockBuilder {
298
306
result. map_err ( HummockError :: encode_error) . unwrap ( ) ;
299
307
writer. into_inner ( )
300
308
}
309
+ CompressionAlgorithm :: Zstd => {
310
+ let mut encoder =
311
+ zstd:: Encoder :: new ( BytesMut :: with_capacity ( self . buf . len ( ) ) . writer ( ) , 4 )
312
+ . map_err ( HummockError :: encode_error)
313
+ . unwrap ( ) ;
314
+ encoder
315
+ . write ( & self . buf [ ..] )
316
+ . map_err ( HummockError :: encode_error)
317
+ . unwrap ( ) ;
318
+ let writer = encoder
319
+ . finish ( )
320
+ . map_err ( HummockError :: encode_error)
321
+ . unwrap ( ) ;
322
+ writer. into_inner ( )
323
+ }
301
324
} ;
302
325
self . compression_algorithm . encode ( & mut buf) ;
303
326
let checksum = xxhash64_checksum ( & buf) ;
@@ -354,8 +377,13 @@ mod tests {
354
377
355
378
#[ test]
356
379
fn test_compressed_block_enc_dec ( ) {
380
+ inner_test_compressed ( CompressionAlgorithm :: Lz4 ) ;
381
+ inner_test_compressed ( CompressionAlgorithm :: Zstd ) ;
382
+ }
383
+
384
+ fn inner_test_compressed ( algo : CompressionAlgorithm ) {
357
385
let options = BlockBuilderOptions {
358
- compression_algorithm : CompressionAlgorithm :: Lz4 ,
386
+ compression_algorithm : algo ,
359
387
..Default :: default ( )
360
388
} ;
361
389
let mut builder = BlockBuilder :: new ( options) ;
0 commit comments