Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,13 @@ private ConfigKeys() {
public static final String CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED =
"child.controller.admin.topic.consumption.enabled";

/**
* Whether to enable admin operation system store or not.
* If yes, controller will register admin operation system store and process admin operations in rollback cases.
*/
public static final String CONTROLLER_ADMIN_OPERATION_SYSTEM_STORE_ENABLED =
"controller.admin.operation.system.store.enabled";

/**
* This config defines the source region of aggregate hybrid store real-time data when native replication is enabled
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public <T extends SpecificRecord> InternalAvroSpecificSerializer<T> getSerialize
if (magicByte.isPresent() || protocolVersionStoredInHeader) {
return new InternalAvroSpecificSerializer<>(this);
}
return new InternalAvroSpecificSerializer<>(this, 0);
return new InternalAvroSpecificSerializer<>(this, 0, this.getCurrentProtocolVersionSchema());
}

public int getCurrentProtocolVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,26 @@ private static class ReusableObjects {

private final BiConsumer<Integer, Schema> newSchemaEncountered;

protected InternalAvroSpecificSerializer(
AvroProtocolDefinition protocolDef,
Integer payloadOffsetOverride,
Schema compiledProtocolSchema) {
this(protocolDef, payloadOffsetOverride, (schemaId, schema) -> {}, compiledProtocolSchema);
}

protected InternalAvroSpecificSerializer(AvroProtocolDefinition protocolDef) {
this(protocolDef, null);
}

protected InternalAvroSpecificSerializer(AvroProtocolDefinition protocolDef, Integer payloadOffsetOverride) {
this(protocolDef, payloadOffsetOverride, (schemaId, schema) -> {});
this(protocolDef, payloadOffsetOverride, (schemaId, schema) -> {}, protocolDef.getCurrentProtocolVersionSchema());
}

protected InternalAvroSpecificSerializer(
AvroProtocolDefinition protocolDef,
Integer payloadOffsetOverride,
BiConsumer<Integer, Schema> newSchemaEncountered) {
BiConsumer<Integer, Schema> newSchemaEncountered,
Schema compiledProtocolSchema) {
// Magic byte handling
if (protocolDef.getMagicByte().isPresent()) {
this.magicByte = protocolDef.getMagicByte().get();
Expand All @@ -125,6 +133,8 @@ protected InternalAvroSpecificSerializer(
this.MAGIC_BYTE_LENGTH = 0;
}

this.compiledProtocol = compiledProtocolSchema;

// Protocol version handling
this.PROTOCOL_VERSION_OFFSET = MAGIC_BYTE_OFFSET + MAGIC_BYTE_LENGTH;
if (protocolDef.protocolVersionStoredInHeader) {
Expand Down Expand Up @@ -156,9 +166,8 @@ protected InternalAvroSpecificSerializer(
}
this.PAYLOAD_OFFSET = payloadOffsetOverride;
}
this.compiledProtocol = protocolDef.getCurrentProtocolVersionSchema();

Map<Integer, Schema> protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDef);
Map<Integer, Schema> protocolSchemaMap = Utils.getAllSchemasFromResources(protocolDef, compiledProtocolSchema);
int minimumSchemaId = protocolSchemaMap.keySet()
.stream()
.min(Integer::compareTo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public KafkaValueSerializer() {
}

public KafkaValueSerializer(BiConsumer<Integer, Schema> newSchemaEncountered) {
super(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE, null, newSchemaEncountered);
super(
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE,
null,
newSchemaEncountered,
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersionSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,12 @@ public static Schema getSchemaFromResource(String resourcePath) throws IOExcepti
}

public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefinition protocolDef) {
return getAllSchemasFromResources(protocolDef, protocolDef.getCurrentProtocolVersionSchema());
}

public static Map<Integer, Schema> getAllSchemasFromResources(
AvroProtocolDefinition protocolDef,
Schema compiledSchema) {
final int SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA =
InternalAvroSpecificSerializer.SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA;
final int SENTINEL_PROTOCOL_VERSION_USED_FOR_UNVERSIONED_PROTOCOL =
Expand All @@ -461,7 +467,6 @@ public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefini
}

byte compiledProtocolVersion = SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA;
String className = protocolDef.getClassName();
Map<Integer, Schema> protocolSchemaMap = new TreeMap<>();
int initialVersion;
if (currentProtocolVersion > 0) {
Expand All @@ -471,6 +476,7 @@ public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefini
}
final String sep = "/"; // TODO: Make sure that jar resources are always forward-slash delimited, even on Windows
int version = initialVersion;
String className = protocolDef.getClassName();
while (true) {
String versionPath = "avro" + sep;
if (currentProtocolVersion != SENTINEL_PROTOCOL_VERSION_USED_FOR_UNVERSIONED_PROTOCOL) {
Expand All @@ -480,7 +486,7 @@ public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefini
try {
Schema schema = Utils.getSchemaFromResource(versionPath);
protocolSchemaMap.put(version, schema);
if (schema.equals(protocolDef.getCurrentProtocolVersionSchema())) {
if (schema.equals(compiledSchema)) {
compiledProtocolVersion = (byte) version;
break;
}
Expand Down Expand Up @@ -523,7 +529,7 @@ public static Map<Integer, Schema> getAllSchemasFromResources(AvroProtocolDefini
if (intendedCurrentProtocol == null) {
throw new VeniceException(
"Failed to get schema for current version: " + currentProtocolVersion + " class: " + className);
} else if (!intendedCurrentProtocol.equals(protocolDef.getCurrentProtocolVersionSchema())) {
} else if (!intendedCurrentProtocol.equals(compiledSchema)) {
throw new VeniceException(
"The intended protocol version (" + currentProtocolVersion
+ ") does not match the compiled protocol version (" + compiledProtocolVersion + ").");
Expand Down
Loading
Loading