Skip to content

Commit 42aef13

Browse files
authored
Merge pull request #6 from mercari/set-max-buffer
Set max buffer for kryo to serialize
2 parents 7cfa752 + 0f2a2c6 commit 42aef13

File tree

6 files changed

+57
-15
lines changed

6 files changed

+57
-15
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ Struct{
256256
| cipher_algorithm | cipher algorithm used for data encryption (currently supports only one AEAD cipher: AES/GCM/NoPadding) | string | AES/GCM/NoPadding | AES/GCM/NoPadding | low |
257257
| cipher_text_encoding | defines the encoding of the resulting ciphertext bytes (currently only supports 'base64') | string | base64 | base64 | low |
258258
| path_delimiter | path delimiter used as field name separator when referring to nested fields in the input record | string | . | non-empty string | low |
259+
| kryo_output_buffer_size | Initial buffer size for kryo to serialize. | int | 32 | int value | low |
260+
| kryo_output_buffer_size_max | Maximum buffer size for kryo to serialize. Default -1 corresponds to no upper limit (up to Integer.MAX_VALUE - 8 technically). | int | -1 | int value | low |
259261

260262
### Externalize configuration parameters
261263

kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/CipherField.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ public enum KeySource {
9595
public static final String CIPHER_MODE = "cipher_mode";
9696
public static final String KEY_SOURCE = "key_source";
9797
public static final String KMS_KEY_NAME = "kms_key_name";
98+
public static final String KRYO_OUTPUT_BUFFER_SIZE = "kryo_output_buffer_size";
99+
public static final String KRYO_OUTPUT_BUFFER_SIZE_MAX = "kryo_output_buffer_size_max";
98100

99101
private static final String PATH_DELIMITER_DEFAULT = ".";
100102
private static final String FIELD_MODE_DEFAULT = "ELEMENT";
@@ -105,6 +107,8 @@ public enum KeySource {
105107
private static final String KEY_SOURCE_DEFAULT = "CONFIG";
106108
private static final String CIPHER_DATA_KEYS_DEFAULT = "[]";
107109
private static final String KMS_KEY_NAME_DEFAULT = null;
110+
private static final int KRYO_OUTPUT_BUFFER_SIZE_DEFAULT = 32;
111+
private static final int KRYO_OUTPUT_BUFFER_SIZE_MAX_DEFAULT = -1;
108112

109113
public static final ConfigDef CONFIG_DEF =
110114
new ConfigDef()
@@ -207,7 +211,19 @@ public enum KeySource {
207211
KMS_KEY_NAME_DEFAULT,
208212
Importance.MEDIUM,
209213
"The GCP Cloud KMS key name for decrypting a data encryption key (DEK), "
210-
+ "if the DEK is encrypted with a key encryption key (KEK)");
214+
+ "if the DEK is encrypted with a key encryption key (KEK)")
215+
.define(
216+
KRYO_OUTPUT_BUFFER_SIZE,
217+
Type.INT,
218+
KRYO_OUTPUT_BUFFER_SIZE_DEFAULT,
219+
Importance.LOW,
220+
"Initial buffer size for kryo to serialize. Default is 32")
221+
.define(
222+
KRYO_OUTPUT_BUFFER_SIZE_MAX,
223+
Type.INT,
224+
KRYO_OUTPUT_BUFFER_SIZE_MAX_DEFAULT,
225+
Importance.LOW,
226+
"Maximum buffer size for kryo to serialize. Default -1 corresponds to no upper limit (up to Integer.MAX_VALUE - 8 technically).");
211227

212228
private static final String PURPOSE = "(de)cipher record fields";
213229

@@ -269,7 +285,9 @@ public void configure(Map<String, ?> props) {
269285
.stream()
270286
.collect(Collectors.toMap(FieldConfig::getName, Function.identity()));
271287
Kryptonite kryptonite = configureKryptonite(config);
272-
SerdeProcessor serdeProcessor = new KryoSerdeProcessor();
288+
SerdeProcessor serdeProcessor =
289+
new KryoSerdeProcessor(
290+
config.getInt(KRYO_OUTPUT_BUFFER_SIZE), config.getInt(KRYO_OUTPUT_BUFFER_SIZE_MAX));
273291
recordHandlerWithSchema =
274292
new SchemaawareRecordHandler(
275293
config,

kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/DataKeyConfig.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,6 @@ public int hashCode() {
7474

7575
@Override
7676
public String toString() {
77-
return "DataKeyConfig{"
78-
+ "identifier='"
79-
+ getIdentifier()
80-
+ "'}";
77+
return "DataKeyConfig{" + "identifier='" + getIdentifier() + "'}";
8178
}
8279
}

kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/RecordHandler.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,7 @@ public Object processField(Object object, String matchedPath) {
8282
}
8383
} catch (Exception e) {
8484
throw new DataException(
85-
"error: "
86-
+ cipherMode
87-
+ " of field path '"
88-
+ matchedPath
89-
+ "' failed unexpectedly",
90-
e);
85+
"error: " + cipherMode + " of field path '" + matchedPath + "' failed unexpectedly", e);
9186
}
9287
}
9388

kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/serdes/KryoSerdeProcessor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.esotericsoftware.kryo.Serializer;
2121
import com.esotericsoftware.kryo.io.Input;
2222
import com.esotericsoftware.kryo.io.Output;
23-
import java.io.ByteArrayOutputStream;
2423
import java.util.ArrayList;
2524
import java.util.List;
2625
import org.apache.kafka.connect.data.ConnectSchema;
@@ -34,15 +33,20 @@
3433
public class KryoSerdeProcessor implements SerdeProcessor {
3534

3635
private static final Logger LOGGER = LoggerFactory.getLogger(KryoSerdeProcessor.class);
36+
private final int outputBufSize;
37+
private final int outputBufMaxSize;
3738

38-
public KryoSerdeProcessor() {}
39+
public KryoSerdeProcessor(int outputBufSize, int outputBufMaxSize) {
40+
this.outputBufSize = outputBufSize;
41+
this.outputBufMaxSize = outputBufMaxSize;
42+
}
3943

4044
public byte[] objectToBytes(Object object, Class<?> clazz) {
4145
return objectToBytes(object);
4246
}
4347

4448
public byte[] objectToBytes(Object object) {
45-
Output output = new Output(new ByteArrayOutputStream());
49+
Output output = new Output(outputBufSize, outputBufMaxSize);
4650
KryoInstance.get().writeClassAndObject(output, object);
4751
return output.toBytes();
4852
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.github.hpgrahsl.kafka.connect.transforms.kryptonite.serdes;
2+
3+
import static org.junit.jupiter.api.Assertions.assertAll;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
6+
import org.junit.jupiter.api.DisplayName;
7+
import org.junit.jupiter.api.Test;
8+
9+
class KryoSerdeProcessorTest {
10+
11+
@Test
12+
@DisplayName("serialize and deserialize with variable sized objects")
13+
void serializeAndDeserializeWithVariableSized() {
14+
KryoSerdeProcessor processor = new KryoSerdeProcessor(32, -1);
15+
16+
String nullString = null;
17+
String small = "test";
18+
String large = new String(new char[100000]).replace("\0", "a");
19+
20+
assertAll(
21+
() ->
22+
assertEquals(nullString, processor.bytesToObject(processor.objectToBytes(nullString))),
23+
() -> assertEquals(small, processor.bytesToObject(processor.objectToBytes(small))),
24+
() -> assertEquals(large, processor.bytesToObject(processor.objectToBytes(large))));
25+
}
26+
}

0 commit comments

Comments
 (0)