Skip to content

Commit 6c291ab

Browse files
author
Minh Nguyen
committed
Add test
Change the protocolMap to ConcurrentHashMap Fix spotbug Fix spotbug
1 parent ac09987 commit 6c291ab

File tree

16 files changed

+529
-28
lines changed

16 files changed

+529
-28
lines changed

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,13 @@ private ConfigKeys() {
526526
public static final String CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED =
527527
"child.controller.admin.topic.consumption.enabled";
528528

529+
/**
530+
* Whether to enable admin operation system store or not.
531+
* If yes, controller will register admin operation system store and process admin operations in rollback cases.
532+
*/
533+
public static final String CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED =
534+
"controller.admin.operation.system.store.enabled";
535+
529536
/**
530537
* This config defines the source region of aggregate hybrid store real-time data when native replication is enabled
531538
*/

internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ public <T extends SpecificRecord> InternalAvroSpecificSerializer<T> getSerialize
303303
if (magicByte.isPresent() || protocolVersionStoredInHeader) {
304304
return new InternalAvroSpecificSerializer<>(this);
305305
}
306-
return new InternalAvroSpecificSerializer<>(this, 0);
306+
return new InternalAvroSpecificSerializer<>(this, 0, this.getCurrentProtocolVersionSchema());
307307
}
308308

309309
public int getCurrentProtocolVersion() {

internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,26 @@ private static class ReusableObjects {
104104

105105
private final BiConsumer<Integer, Schema> newSchemaEncountered;
106106

107+
protected InternalAvroSpecificSerializer(
108+
AvroProtocolDefinition protocolDef,
109+
Integer payloadOffsetOverride,
110+
Schema compiledProtocolSchema) {
111+
this(protocolDef, payloadOffsetOverride, (schemaId, schema) -> {}, compiledProtocolSchema);
112+
}
113+
107114
protected InternalAvroSpecificSerializer(AvroProtocolDefinition protocolDef) {
108115
this(protocolDef, null);
109116
}
110117

111118
protected InternalAvroSpecificSerializer(AvroProtocolDefinition protocolDef, Integer payloadOffsetOverride) {
112-
this(protocolDef, payloadOffsetOverride, (schemaId, schema) -> {});
119+
this(protocolDef, payloadOffsetOverride, (schemaId, schema) -> {}, protocolDef.getCurrentProtocolVersionSchema());
113120
}
114121

115122
protected InternalAvroSpecificSerializer(
116123
AvroProtocolDefinition protocolDef,
117124
Integer payloadOffsetOverride,
118-
BiConsumer<Integer, Schema> newSchemaEncountered) {
125+
BiConsumer<Integer, Schema> newSchemaEncountered,
126+
Schema compiledProtocolSchema) {
119127
// Magic byte handling
120128
if (protocolDef.getMagicByte().isPresent()) {
121129
this.magicByte = protocolDef.getMagicByte().get();
@@ -125,6 +133,8 @@ protected InternalAvroSpecificSerializer(
125133
this.MAGIC_BYTE_LENGTH = 0;
126134
}
127135

136+
this.compiledProtocol = compiledProtocolSchema;
137+
128138
// Protocol version handling
129139
this.PROTOCOL_VERSION_OFFSET = MAGIC_BYTE_OFFSET + MAGIC_BYTE_LENGTH;
130140
if (protocolDef.protocolVersionStoredInHeader) {
@@ -156,9 +166,8 @@ protected InternalAvroSpecificSerializer(
156166
}
157167
this.PAYLOAD_OFFSET = payloadOffsetOverride;
158168
}
159-
this.compiledProtocol = protocolDef.getCurrentProtocolVersionSchema();
160169

161-
Map<Integer, Schema> protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDef);
170+
Map<Integer, Schema> protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDef, compiledProtocolSchema);
162171
int minimumSchemaId = protocolSchemaMap.keySet()
163172
.stream()
164173
.min(Integer::compareTo)

internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/KafkaValueSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ public KafkaValueSerializer() {
2020
}
2121

2222
public KafkaValueSerializer(BiConsumer<Integer, Schema> newSchemaEncountered) {
23-
super(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE, null, newSchemaEncountered);
23+
super(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE, null, newSchemaEncountered, null);
2424
}
2525
}

internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,12 @@ public static Schema getSchemaFromResource(String resourcePath) throws IOExcepti
442442
}
443443

444444
public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefinition protocolDef) {
445+
return getAllSchemasFromResources(protocolDef, protocolDef.getCurrentProtocolVersionSchema());
446+
}
447+
448+
public static Map<Integer, Schema> getAllSchemasFromResources(
449+
AvroProtocolDefinition protocolDef,
450+
Schema compiledSchema) {
445451
final int SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA =
446452
InternalAvroSpecificSerializer.SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA;
447453
final int SENTINEL_PROTOCOL_VERSION_USED_FOR_UNVERSIONED_PROTOCOL =
@@ -461,7 +467,6 @@ public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefini
461467
}
462468

463469
byte compiledProtocolVersion = SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA;
464-
String className = protocolDef.getClassName();
465470
Map<Integer, Schema> protocolSchemaMap = new TreeMap<>();
466471
int initialVersion;
467472
if (currentProtocolVersion > 0) {
@@ -471,6 +476,7 @@ public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefini
471476
}
472477
final String sep = "/"; // TODO: Make sure that jar resources are always forward-slash delimited, even on Windows
473478
int version = initialVersion;
479+
String className = protocolDef.getClassName();
474480
while (true) {
475481
String versionPath = "avro" + sep;
476482
if (currentProtocolVersion != SENTINEL_PROTOCOL_VERSION_USED_FOR_UNVERSIONED_PROTOCOL) {
@@ -480,7 +486,7 @@ public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefini
480486
try {
481487
Schema schema = Utils.getSchemaFromResource(versionPath);
482488
protocolSchemaMap.put(version, schema);
483-
if (schema.equals(protocolDef.getCurrentProtocolVersionSchema())) {
489+
if (schema.equals(compiledSchema)) {
484490
compiledProtocolVersion = (byte) version;
485491
break;
486492
}
@@ -523,7 +529,7 @@ public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefini
523529
if (intendedCurrentProtocol == null) {
524530
throw new VeniceException(
525531
"Failed to get schema for current version: " + currentProtocolVersion + " class: " + className);
526-
} else if (!intendedCurrentProtocol.equals(protocolDef.getCurrentProtocolVersionSchema())) {
532+
} else if (!intendedCurrentProtocol.equals(compiledSchema)) {
527533
throw new VeniceException(
528534
"The intended protocol version (" + currentProtocolVersion
529535
+ ") does not match the compiled protocol version (" + compiledProtocolVersion + ").");

0 commit comments

Comments
 (0)