Skip to content

Commit 1c5a295

Browse files
authored
[FLINK-37463][state/forst] make serialzer in ForStMapState Threadlocal (#26286)
1 parent 0b75ad1 commit 1c5a295

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBBunchPutRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
public class ForStDBBunchPutRequest<K, N, UK, UV> extends ForStDBPutRequest<K, N, Map<UK, UV>> {
4545

4646
/** Serializer for the user values. */
47-
final TypeSerializer<UV> userValueSerializer;
47+
final ThreadLocal<TypeSerializer<UV>> userValueSerializer;
4848

4949
/** The data outputStream used for value serializer, which should be thread-safe. */
5050
final ThreadLocal<DataOutputSerializer> valueSerializerView;
@@ -100,7 +100,7 @@ public byte[] buildSerializedKey(UK userKey) throws IOException {
100100
public byte[] buildSerializedValue(UV singleValue) throws IOException {
101101
DataOutputSerializer outputView = valueSerializerView.get();
102102
outputView.clear();
103-
userValueSerializer.serialize(singleValue, outputView);
103+
userValueSerializer.get().serialize(singleValue, outputView);
104104
return outputView.getCopyOfBuffer();
105105
}
106106

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ public class ForStMapState<K, N, UK, UV> extends AbstractMapState<K, N, UK, UV>
7373
final ThreadLocal<DataInputDeserializer> valueDeserializerView;
7474

7575
/** Serializer for the user keys. */
76-
final TypeSerializer<UK> userKeySerializer;
76+
final ThreadLocal<TypeSerializer<UK>> userKeySerializer;
7777

7878
/** Serializer for the user values. */
79-
final TypeSerializer<UV> userValueSerializer;
79+
final ThreadLocal<TypeSerializer<UV>> userValueSerializer;
8080

8181
/** Number of bytes required to prefix the key groups. */
8282
private final int keyGroupPrefixBytes;
@@ -100,8 +100,8 @@ public ForStMapState(
100100
this.valueSerializerView = ThreadLocal.withInitial(valueSerializerViewInitializer);
101101
this.keyDeserializerView = ThreadLocal.withInitial(keyDeserializerViewInitializer);
102102
this.valueDeserializerView = ThreadLocal.withInitial(valueDeserializerViewInitializer);
103-
this.userKeySerializer = stateDescriptor.getUserKeySerializer();
104-
this.userValueSerializer = stateDescriptor.getSerializer();
103+
this.userKeySerializer = ThreadLocal.withInitial(stateDescriptor::getUserKeySerializer);
104+
this.userValueSerializer = ThreadLocal.withInitial(stateDescriptor::getSerializer);
105105
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
106106
}
107107

@@ -124,15 +124,15 @@ public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException {
124124
return builder.build();
125125
}
126126
UK userKey = (UK) contextKey.getUserKey(); // map get
127-
return builder.buildCompositeKeyUserKey(userKey, userKeySerializer);
127+
return builder.buildCompositeKeyUserKey(userKey, userKeySerializer.get());
128128
}
129129

130130
@Override
131131
public byte[] serializeValue(UV value) throws IOException {
132132
DataOutputSerializer outputView = valueSerializerView.get();
133133
outputView.clear();
134134
outputView.writeBoolean(false);
135-
userValueSerializer.serialize(value, outputView);
135+
userValueSerializer.get().serialize(value, outputView);
136136
return outputView.getCopyOfBuffer();
137137
}
138138

@@ -141,13 +141,13 @@ public UV deserializeValue(byte[] valueBytes) throws IOException {
141141
DataInputDeserializer inputView = valueDeserializerView.get();
142142
inputView.setBuffer(valueBytes);
143143
boolean isNull = inputView.readBoolean();
144-
return isNull ? null : userValueSerializer.deserialize(inputView);
144+
return isNull ? null : userValueSerializer.get().deserialize(inputView);
145145
}
146146

147147
public UK deserializeUserKey(byte[] userKeyBytes, int userKeyOffset) throws IOException {
148148
DataInputDeserializer inputView = keyDeserializerView.get();
149149
inputView.setBuffer(userKeyBytes, userKeyOffset, userKeyBytes.length - userKeyOffset);
150-
return userKeySerializer.deserialize(inputView);
150+
return userKeySerializer.get().deserialize(inputView);
151151
}
152152

153153
@Override

0 commit comments

Comments
 (0)