11package com .linkedin .venice .controller .kafka .consumer ;
22
33import static com .linkedin .venice .ConfigKeys .*;
4+ import static com .linkedin .venice .utils .AvroSchemaUtils .*;
45
6+ import com .linkedin .avroutil1 .compatibility .AvroCompatibilityHelper ;
7+ import com .linkedin .avroutil1 .compatibility .FieldBuilder ;
58import com .linkedin .venice .controller .VeniceParentHelixAdmin ;
69import com .linkedin .venice .controller .kafka .AdminTopicUtils ;
710import com .linkedin .venice .controller .kafka .protocol .admin .AdminOperation ;
3538import java .util .ArrayList ;
3639import java .util .Collections ;
3740import java .util .List ;
38- import java .util .Map ;
3941import java .util .Properties ;
4042import java .util .concurrent .TimeUnit ;
4143import org .apache .avro .Schema ;
6163public class AdminConsumptionWithProtocolRollbackIntegrationTest {
6264 private static final int TIMEOUT = 1 * Time .MS_PER_MINUTE ;
6365
64- private final AdminOperationSerializer adminOperationSerializer = new AdminOperationSerializer () ;
66+ private AdminOperationSerializer adminOperationSerializer ;
6567
6668 private static final String owner = "test_owner" ;
6769 private static final String keySchema = "\" string\" " ;
6870 private static final String valueSchema = "\" string\" " ;
6971 private static final int adminConsumptionMaxWorkerPoolSize = 3 ;
7072
7173 private VeniceTwoLayerMultiRegionMultiClusterWrapper venice ;
72- private AdminConsumerService adminConsumerService ;
7374 private ControllerClient parentControllerClient ;
7475 private VeniceParentHelixAdmin parentHelixAdmin ;
7576 private VeniceWriter <byte [], byte [], byte []> writer ;
@@ -102,7 +103,6 @@ public void setUp() {
102103 VeniceControllerWrapper parentController = venice .getParentControllers ().get (0 );
103104 parentControllerClient = new ControllerClient (venice .getClusterNames ()[0 ], parentController .getControllerUrl ());
104105 clusterName = venice .getClusterNames ()[0 ];
105- adminConsumerService = parentController .getAdminConsumerServiceByCluster (clusterName );
106106 parentHelixAdmin = (VeniceParentHelixAdmin ) parentController .getVeniceAdmin ();
107107 PubSubTopicRepository pubSubTopicRepository = parentHelixAdmin .getPubSubTopicRepository ();
108108 TopicManager topicManager = parentHelixAdmin .getTopicManager ();
@@ -114,6 +114,7 @@ public void setUp() {
114114 pubSubBrokerWrapper .getPubSubClientsFactory ().getProducerAdapterFactory ();
115115 writer = IntegrationTestPushUtils .getVeniceWriterFactory (pubSubBrokerWrapper , pubSubProducerAdapterFactory )
116116 .createVeniceWriter (new VeniceWriterOptions .Builder (adminTopic .getName ()).build ());
117+ adminOperationSerializer = parentHelixAdmin .getAdminOperationSerializer ();
117118 }
118119
119120 @ AfterClass
@@ -193,51 +194,48 @@ private void addSchemaToSchemaSystemStore(int schemaId, Schema schema) {
193194 * and register it to schema system store with new schema id
194195 */
195196 private int createNewSchemaAndRegister (int originalSchemaId ) {
196- Schema schema = AdminOperationSerializer .getSchema (originalSchemaId );
197+ Schema schema = adminOperationSerializer .getSchema (originalSchemaId );
197198
198199 int newSchemaId = originalSchemaId + 1 ;
199200 Schema newSchema = createNewSchema (schema );
200201 // We use the same schema and by-pass compatibility check for testing purpose
201202 addSchemaToSchemaSystemStore (newSchemaId , newSchema );
202203
203- AdminOperationSerializer adminOperationSerializer = parentHelixAdmin .getAdminOperationSerializer ();
204204 adminOperationSerializer .addSchema (newSchemaId , newSchema );
205205 return newSchemaId ;
206206 }
207207
208208 /**
209209 * Create a new schema by adding a new field to the original schema
210210 */
211- private Schema createNewSchema (Schema originalSchema ) {
212- // Convert schema to string
213- Schema newSchema = Schema .createRecord (
211+ private Schema createNewSchema (final Schema originalSchema ) {
212+ if (originalSchema == null ) {
213+ throw new IllegalArgumentException ("Original schema cannot be null" );
214+ }
215+
216+ // Create a new record schema with defensive copies
217+ final Schema newSchema = Schema .createRecord (
214218 originalSchema .getName (),
215219 originalSchema .getDoc (),
216220 originalSchema .getNamespace (),
217221 originalSchema .isError ());
218222
219- // Create a deep copy of all fields
220- List <Schema .Field > newFields = new ArrayList <>();
223+ // Deep copy fields
224+ final List <Schema .Field > newFields = new ArrayList <>();
221225 for (Schema .Field field : originalSchema .getFields ()) {
222- Schema .Field newField = new Schema .Field (
223- field .name (),
224- field .schema (), // This creates a new field with the same schema
225- field .doc (),
226- field .defaultVal ());
227- // Copy all properties
228- for (Map .Entry <String , Object > prop : field .getObjectProps ().entrySet ()) {
229- newField .addProp (prop .getKey (), prop .getValue ());
230- }
231- newFields .add (newField );
226+ // Defensive copy of schema (Avro schemas are immutable, so this is safe)
227+ FieldBuilder newField = deepCopySchemaFieldWithoutFieldProps (field );
228+ copyFieldProperties (newField , field );
229+ newFields .add (newField .build ());
232230 }
233231
234- // Add the new field
235- Schema .Field testField =
236- new Schema . Field ("testField" , Schema .create (Schema .Type .INT ), "Documentation for testField" , 0 );
232+ // Add extra field
233+ final Schema .Field testField = AvroCompatibilityHelper
234+ . createSchemaField ("testField" , Schema .create (Schema .Type .INT ), "Documentation for testField" , 0 );
237235 newFields .add (testField );
238236
239- // Set all fields at once
240- newSchema .setFields (newFields );
237+ // Set fields using defensive copy
238+ newSchema .setFields (Collections . unmodifiableList ( new ArrayList <>( newFields )) );
241239
242240 return newSchema ;
243241 }
@@ -302,4 +300,26 @@ private byte[] getStoreUpdateMessage(
302300 private int nextExecutionId () {
303301 return executionId ++;
304302 }
303+
304+ private static FieldBuilder deepCopySchemaFieldWithoutFieldProps (Schema .Field field ) {
305+ FieldBuilder fieldBuilder = AvroCompatibilityHelper .newField (null )
306+ .setName (field .name ())
307+ .setSchema (field .schema ())
308+ .setDoc (field .doc ())
309+ .setOrder (field .order ());
310+ // set default as AvroCompatibilityHelper builder might drop defaults if there is type mismatch
311+ if (field .hasDefaultValue ()) {
312+ fieldBuilder .setDefault (getFieldDefault (field ));
313+ }
314+ return fieldBuilder ;
315+ }
316+
317+ private static void copyFieldProperties (FieldBuilder fieldBuilder , Schema .Field field ) {
318+ AvroCompatibilityHelper .getAllPropNames (field ).forEach (k -> {
319+ String propValue = AvroCompatibilityHelper .getFieldPropAsJsonString (field , k );
320+ if (propValue != null ) {
321+ fieldBuilder .addProp (k , propValue );
322+ }
323+ });
324+ }
305325}
0 commit comments