Skip to content

Commit 9103d8e

Browse files
committed
1 parent 02ade1b commit 9103d8e

File tree

9 files changed

+766
-2
lines changed

9 files changed

+766
-2
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
<groupId>org.lz4</groupId>
108108
<artifactId>lz4-java</artifactId>
109109
<version>1.8.0</version>
110-
<scope>test</scope>
110+
<!-- TODO move to test scope once we have XxHash32 -->
111111
</dependency>
112112

113113
<dependency>
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.airlift.compress.lz4;
15+
16+
import io.airlift.compress.Compressor;
17+
18+
import java.nio.Buffer;
19+
import java.nio.ByteBuffer;
20+
21+
import static io.airlift.compress.lz4.Lz4RawCompressor.MAX_TABLE_SIZE;
22+
import static io.airlift.compress.lz4.UnsafeUtil.getAddress;
23+
import static java.lang.String.format;
24+
import static java.util.Objects.requireNonNull;
25+
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;
26+
27+
/**
28+
* This class is not thread-safe
29+
*/
30+
public class Lz4FrameCompressor
31+
implements Compressor
32+
{
33+
private final int[] table = new int[MAX_TABLE_SIZE];
34+
35+
@Override
36+
public int maxCompressedLength(int uncompressedSize)
37+
{
38+
return Lz4FrameRawCompressor.maxCompressedLength(uncompressedSize);
39+
}
40+
41+
@Override
42+
public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
43+
{
44+
verifyRange(input, inputOffset, inputLength);
45+
verifyRange(output, outputOffset, maxOutputLength);
46+
47+
long inputAddress = ARRAY_BYTE_BASE_OFFSET + inputOffset;
48+
long outputAddress = ARRAY_BYTE_BASE_OFFSET + outputOffset;
49+
50+
return Lz4FrameRawCompressor.compress(
51+
input,
52+
inputAddress,
53+
inputLength,
54+
output,
55+
outputAddress,
56+
maxOutputLength,
57+
table);
58+
}
59+
60+
@Override
61+
public void compress(ByteBuffer inputBuffer, ByteBuffer outputBuffer)
62+
{
63+
if (true) {
64+
// TODO support byte buffers, see disabled tests
65+
throw new UnsupportedOperationException("This is disabled, does not work with direct buffers yet");
66+
}
67+
68+
// Java 9+ added an overload of various methods in ByteBuffer. When compiling with Java 11+ and targeting Java 8 bytecode
69+
// the resulting signatures are invalid for JDK 8, so accesses below result in NoSuchMethodError. Accessing the
70+
// methods through the interface class works around the problem
71+
// Sidenote: we can't target "javac --release 8" because Unsafe is not available in the signature data for that profile
72+
Buffer input = inputBuffer;
73+
Buffer output = outputBuffer;
74+
75+
Object inputBase;
76+
long inputAddress;
77+
int inputLimit;
78+
if (input.isDirect()) {
79+
inputBase = null;
80+
long address = getAddress(input);
81+
inputAddress = address + input.position();
82+
inputLimit = input.limit();
83+
}
84+
else if (input.hasArray()) {
85+
inputBase = input.array();
86+
inputAddress = ARRAY_BYTE_BASE_OFFSET + input.arrayOffset() + input.position();
87+
inputLimit = input.limit();
88+
}
89+
else {
90+
throw new IllegalArgumentException("Unsupported input ByteBuffer implementation " + input.getClass().getName());
91+
}
92+
93+
Object outputBase;
94+
long outputAddress;
95+
int outputLimit;
96+
if (output.isDirect()) {
97+
outputBase = null;
98+
long address = getAddress(output);
99+
outputAddress = address + output.position();
100+
outputLimit = output.limit();
101+
}
102+
else if (output.hasArray()) {
103+
outputBase = output.array();
104+
outputAddress = ARRAY_BYTE_BASE_OFFSET + output.arrayOffset() + output.position();
105+
outputLimit = output.limit();
106+
}
107+
else {
108+
throw new IllegalArgumentException("Unsupported output ByteBuffer implementation " + output.getClass().getName());
109+
}
110+
111+
// HACK: Assure JVM does not collect Slice wrappers while compressing, since the
112+
// collection may trigger freeing of the underlying memory resulting in a segfault
113+
// There is no other known way to signal to the JVM that an object should not be
114+
// collected in a block, and technically, the JVM is allowed to eliminate these locks.
115+
synchronized (input) {
116+
synchronized (output) {
117+
int written = Lz4FrameRawCompressor.compress(
118+
inputBase,
119+
inputAddress,
120+
inputLimit,
121+
outputBase,
122+
outputAddress,
123+
outputLimit,
124+
table);
125+
output.position(output.position() + written);
126+
}
127+
}
128+
}
129+
130+
private static void verifyRange(byte[] data, int offset, int length)
131+
{
132+
requireNonNull(data, "data is null");
133+
if (offset < 0 || length < 0 || offset + length > data.length) {
134+
throw new IllegalArgumentException(format("Invalid offset or length (%s, %s) in array of length %s", offset, length, data.length));
135+
}
136+
}
137+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.airlift.compress.lz4;
15+
16+
import io.airlift.compress.Decompressor;
17+
import io.airlift.compress.MalformedInputException;
18+
19+
import java.nio.Buffer;
20+
import java.nio.ByteBuffer;
21+
22+
import static io.airlift.compress.lz4.UnsafeUtil.getAddress;
23+
import static java.lang.String.format;
24+
import static java.util.Objects.requireNonNull;
25+
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;
26+
27+
public class Lz4FrameDecompressor
28+
implements Decompressor
29+
{
30+
@Override
31+
public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
32+
throws MalformedInputException
33+
{
34+
verifyRange(input, inputOffset, inputLength);
35+
verifyRange(output, outputOffset, maxOutputLength);
36+
37+
return Lz4FrameRawDecompressor.decompress(
38+
input,
39+
ARRAY_BYTE_BASE_OFFSET + inputOffset,
40+
inputLength,
41+
output,
42+
ARRAY_BYTE_BASE_OFFSET + outputOffset,
43+
maxOutputLength);
44+
}
45+
46+
@Override
47+
public void decompress(ByteBuffer inputBuffer, ByteBuffer outputBuffer)
48+
throws MalformedInputException
49+
{
50+
if (true) {
51+
// TODO support byte buffers, see disabled tests
52+
throw new UnsupportedOperationException("This is disabled, does not work with direct buffers yet");
53+
}
54+
55+
// Java 9+ added an overload of various methods in ByteBuffer. When compiling with Java 11+ and targeting Java 8 bytecode
56+
// the resulting signatures are invalid for JDK 8, so accesses below result in NoSuchMethodError. Accessing the
57+
// methods through the interface class works around the problem
58+
// Sidenote: we can't target "javac --release 8" because Unsafe is not available in the signature data for that profile
59+
Buffer input = inputBuffer;
60+
Buffer output = outputBuffer;
61+
62+
Object inputBase;
63+
long inputAddress;
64+
int inputLimit;
65+
if (input.isDirect()) {
66+
inputBase = null;
67+
long address = getAddress(input);
68+
inputAddress = address + input.position();
69+
inputLimit = input.limit();
70+
}
71+
else if (input.hasArray()) {
72+
inputBase = input.array();
73+
inputAddress = ARRAY_BYTE_BASE_OFFSET + input.arrayOffset() + input.position();
74+
inputLimit = input.limit();
75+
}
76+
else {
77+
throw new IllegalArgumentException("Unsupported input ByteBuffer implementation " + input.getClass().getName());
78+
}
79+
80+
Object outputBase;
81+
long outputAddress;
82+
int outputLimit;
83+
if (output.isDirect()) {
84+
outputBase = null;
85+
long address = getAddress(output);
86+
outputAddress = address + output.position();
87+
outputLimit = output.limit();
88+
}
89+
else if (output.hasArray()) {
90+
outputBase = output.array();
91+
outputAddress = ARRAY_BYTE_BASE_OFFSET + output.arrayOffset() + output.position();
92+
outputLimit = output.limit();
93+
}
94+
else {
95+
throw new IllegalArgumentException("Unsupported output ByteBuffer implementation " + output.getClass().getName());
96+
}
97+
98+
// HACK: Assure JVM does not collect Slice wrappers while decompressing, since the
99+
// collection may trigger freeing of the underlying memory resulting in a segfault
100+
// There is no other known way to signal to the JVM that an object should not be
101+
// collected in a block, and technically, the JVM is allowed to eliminate these locks.
102+
synchronized (input) {
103+
synchronized (output) {
104+
int written = Lz4FrameRawDecompressor.decompress(inputBase, inputAddress, inputLimit, outputBase, outputAddress, outputLimit);
105+
output.position(output.position() + written);
106+
}
107+
}
108+
}
109+
110+
public static long getDecompressedSize(byte[] input, int offset, int length)
111+
{
112+
int baseAddress = ARRAY_BYTE_BASE_OFFSET + offset;
113+
return Lz4FrameRawDecompressor.getDecompressedSize(input, baseAddress, length);
114+
}
115+
116+
private static void verifyRange(byte[] data, int offset, int length)
117+
{
118+
requireNonNull(data, "data is null");
119+
if (offset < 0 || length < 0 || offset + length > data.length) {
120+
throw new IllegalArgumentException(format("Invalid offset or length (%s, %s) in array of length %s", offset, length, data.length));
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)