Skip to content

Commit c287744

Browse files
committed
Add native Zstd input and output streams
1 parent ddfd001 commit c287744

17 files changed

+1676
-350
lines changed

src/main/java/io/airlift/compress/v3/zstd/ZstdHadoopInputStream.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,45 +24,67 @@ class ZstdHadoopInputStream
2424
extends HadoopInputStream
2525
{
2626
private final InputStream in;
27+
private final boolean useNative;
2728
private ZstdInputStream zstdInputStream;
2829

29-
public ZstdHadoopInputStream(InputStream in)
30+
public ZstdHadoopInputStream(InputStream in, boolean useNative)
3031
{
3132
this.in = requireNonNull(in, "in is null");
32-
zstdInputStream = new ZstdInputStream(in);
33+
this.useNative = useNative;
34+
}
35+
36+
private void createDecompressingStreamIfNecessary()
37+
throws IOException
38+
{
39+
if (zstdInputStream == null) {
40+
if (useNative) {
41+
zstdInputStream = new ZstdNativeInputStream(in);
42+
}
43+
else {
44+
zstdInputStream = new ZstdJavaInputStream(in);
45+
}
46+
}
3347
}
3448

3549
@Override
3650
public int read()
3751
throws IOException
3852
{
53+
createDecompressingStreamIfNecessary();
3954
return zstdInputStream.read();
4055
}
4156

4257
@Override
4358
public int read(byte[] b)
4459
throws IOException
4560
{
61+
createDecompressingStreamIfNecessary();
4662
return zstdInputStream.read(b);
4763
}
4864

4965
@Override
5066
public int read(byte[] outputBuffer, int outputOffset, int outputLength)
5167
throws IOException
5268
{
69+
createDecompressingStreamIfNecessary();
5370
return zstdInputStream.read(outputBuffer, outputOffset, outputLength);
5471
}
5572

5673
@Override
5774
public void resetState()
5875
{
59-
zstdInputStream = new ZstdInputStream(in);
76+
zstdInputStream = null;
6077
}
6178

6279
@Override
6380
public void close()
6481
throws IOException
6582
{
66-
zstdInputStream.close();
83+
if (zstdInputStream != null) {
84+
zstdInputStream.close();
85+
}
86+
else {
87+
in.close();
88+
}
6789
}
6890
}

src/main/java/io/airlift/compress/v3/zstd/ZstdHadoopOutputStream.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@ class ZstdHadoopOutputStream
2424
extends HadoopOutputStream
2525
{
2626
private final OutputStream out;
27-
private boolean initialized;
27+
private final boolean useNative;
2828
private ZstdOutputStream zstdOutputStream;
2929

30-
public ZstdHadoopOutputStream(OutputStream out)
30+
public ZstdHadoopOutputStream(OutputStream out, boolean useNative)
3131
{
3232
this.out = requireNonNull(out, "out is null");
33+
this.useNative = useNative;
3334
}
3435

3536
@Override
@@ -70,10 +71,8 @@ public void close()
7071
throws IOException
7172
{
7273
try {
73-
// it the stream has never been initialized, create a valid empty file
74-
if (!initialized) {
75-
openStreamIfNecessary();
76-
}
74+
// If the stream has never been initialized, create a valid empty file
75+
openStreamIfNecessary();
7776
finish();
7877
}
7978
finally {
@@ -85,8 +84,12 @@ private void openStreamIfNecessary()
8584
throws IOException
8685
{
8786
if (zstdOutputStream == null) {
88-
initialized = true;
89-
zstdOutputStream = new ZstdOutputStream(out);
87+
if (useNative) {
88+
zstdOutputStream = new ZstdNativeOutputStream(out);
89+
}
90+
else {
91+
zstdOutputStream = new ZstdJavaOutputStream(out);
92+
}
9093
}
9194
}
9295
}

src/main/java/io/airlift/compress/v3/zstd/ZstdHadoopStreams.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,18 @@
2727
public class ZstdHadoopStreams
2828
implements HadoopStreams
2929
{
30+
private final boolean useNative;
31+
32+
public ZstdHadoopStreams()
33+
{
34+
this(true); // Default to native when available
35+
}
36+
37+
public ZstdHadoopStreams(boolean useNative)
38+
{
39+
this.useNative = useNative && ZstdNative.isEnabled();
40+
}
41+
3042
@Override
3143
public String getDefaultFileExtension()
3244
{
@@ -43,13 +55,13 @@ public List<String> getHadoopCodecName()
4355
public HadoopInputStream createInputStream(InputStream in)
4456
throws IOException
4557
{
46-
return new ZstdHadoopInputStream(in);
58+
return new ZstdHadoopInputStream(in, useNative);
4759
}
4860

4961
@Override
5062
public HadoopOutputStream createOutputStream(OutputStream out)
5163
throws IOException
5264
{
53-
return new ZstdHadoopOutputStream(out);
65+
return new ZstdHadoopOutputStream(out, useNative);
5466
}
5567
}

src/main/java/io/airlift/compress/v3/zstd/ZstdInputStream.java

Lines changed: 8 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -13,139 +13,14 @@
1313
*/
1414
package io.airlift.compress.v3.zstd;
1515

16-
import java.io.IOException;
1716
import java.io.InputStream;
18-
import java.util.Arrays;
1917

20-
import static io.airlift.compress.v3.zstd.Util.checkPositionIndexes;
21-
import static io.airlift.compress.v3.zstd.Util.checkState;
22-
import static java.lang.Math.max;
23-
import static java.util.Objects.requireNonNull;
24-
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;
25-
26-
public class ZstdInputStream
18+
/**
19+
* Abstract base class for zstd decompressing input streams.
20+
* <p>
21+
* This class provides a common type for both the pure Java implementation ({@link ZstdJavaInputStream})
22+
* and the native implementation ({@link ZstdNativeInputStream}).
23+
*/
24+
public abstract sealed class ZstdInputStream
2725
extends InputStream
28-
{
29-
private static final int MIN_BUFFER_SIZE = 4096;
30-
31-
private final InputStream inputStream;
32-
private final ZstdIncrementalFrameDecompressor decompressor = new ZstdIncrementalFrameDecompressor();
33-
34-
private byte[] inputBuffer = new byte[decompressor.getInputRequired()];
35-
private int inputBufferOffset;
36-
private int inputBufferLimit;
37-
38-
private byte[] singleByteOutputBuffer;
39-
40-
private boolean closed;
41-
42-
public ZstdInputStream(InputStream inputStream)
43-
{
44-
this.inputStream = requireNonNull(inputStream, "inputStream is null");
45-
}
46-
47-
@Override
48-
public int read()
49-
throws IOException
50-
{
51-
if (singleByteOutputBuffer == null) {
52-
singleByteOutputBuffer = new byte[1];
53-
}
54-
int readSize = read(singleByteOutputBuffer, 0, 1);
55-
checkState(readSize != 0, "A zero read size should never be returned");
56-
if (readSize != 1) {
57-
return -1;
58-
}
59-
return singleByteOutputBuffer[0] & 0xFF;
60-
}
61-
62-
@Override
63-
public int read(final byte[] outputBuffer, final int outputOffset, final int outputLength)
64-
throws IOException
65-
{
66-
if (closed) {
67-
throw new IOException("Stream is closed");
68-
}
69-
70-
if (outputBuffer == null) {
71-
throw new NullPointerException();
72-
}
73-
checkPositionIndexes(outputOffset, outputOffset + outputLength, outputBuffer.length);
74-
if (outputLength == 0) {
75-
return 0;
76-
}
77-
78-
final int outputLimit = outputOffset + outputLength;
79-
int outputUsed = 0;
80-
while (outputUsed < outputLength) {
81-
boolean enoughInput = fillInputBufferIfNecessary(decompressor.getInputRequired());
82-
if (!enoughInput) {
83-
if (decompressor.isAtStoppingPoint()) {
84-
return outputUsed > 0 ? outputUsed : -1;
85-
}
86-
throw new IOException("Not enough input bytes");
87-
}
88-
89-
decompressor.partialDecompress(
90-
inputBuffer,
91-
inputBufferOffset + ARRAY_BYTE_BASE_OFFSET,
92-
inputBufferLimit + ARRAY_BYTE_BASE_OFFSET,
93-
outputBuffer,
94-
outputOffset + outputUsed,
95-
outputLimit);
96-
97-
inputBufferOffset += decompressor.getInputConsumed();
98-
outputUsed += decompressor.getOutputBufferUsed();
99-
}
100-
return outputUsed;
101-
}
102-
103-
private boolean fillInputBufferIfNecessary(int requiredSize)
104-
throws IOException
105-
{
106-
if (inputBufferLimit - inputBufferOffset >= requiredSize) {
107-
return true;
108-
}
109-
110-
// compact existing buffered data to the front of the buffer
111-
if (inputBufferOffset > 0) {
112-
int copySize = inputBufferLimit - inputBufferOffset;
113-
System.arraycopy(inputBuffer, inputBufferOffset, inputBuffer, 0, copySize);
114-
inputBufferOffset = 0;
115-
inputBufferLimit = copySize;
116-
}
117-
118-
if (inputBuffer.length < requiredSize) {
119-
inputBuffer = Arrays.copyOf(inputBuffer, max(requiredSize, MIN_BUFFER_SIZE));
120-
}
121-
122-
while (inputBufferLimit < inputBuffer.length) {
123-
int readSize = inputStream.read(inputBuffer, inputBufferLimit, inputBuffer.length - inputBufferLimit);
124-
if (readSize < 0) {
125-
break;
126-
}
127-
inputBufferLimit += readSize;
128-
}
129-
return inputBufferLimit >= requiredSize;
130-
}
131-
132-
@Override
133-
public int available()
134-
throws IOException
135-
{
136-
if (closed) {
137-
return 0;
138-
}
139-
return decompressor.getRequestedOutputSize();
140-
}
141-
142-
@Override
143-
public void close()
144-
throws IOException
145-
{
146-
if (!closed) {
147-
closed = true;
148-
inputStream.close();
149-
}
150-
}
151-
}
26+
permits ZstdJavaInputStream, ZstdNativeInputStream {}

0 commit comments

Comments
 (0)