diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/rmd/RmdUtils.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/rmd/RmdUtils.java index f58c7520606..4cbf0fad1e2 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/rmd/RmdUtils.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/rmd/RmdUtils.java @@ -155,16 +155,23 @@ static public long getLastUpdateTimestamp(Object object) { for (Schema.Field field: ((GenericRecord) timestampRecord).getSchema().getFields()) { // if the field is a record, then we need to iterate through the fields of the record if (field.schema().getType().equals(Schema.Type.RECORD)) { - lastUpdatedTimestamp = Math.max( - lastUpdatedTimestamp, - (Long) ((GenericRecord) ((GenericRecord) timestampRecord).get(field.name())).get(TOP_LEVEL_TS_FIELD_NAME)); - for (long timestamp: (long[]) ((GenericRecord) ((GenericRecord) timestampRecord).get(field.name())) - .get(DELETED_ELEM_TS_FIELD_NAME)) { - lastUpdatedTimestamp = Math.max(lastUpdatedTimestamp, timestamp); + GenericRecord fieldRecord = (GenericRecord) ((GenericRecord) timestampRecord).get(field.name()); + lastUpdatedTimestamp = Math.max(lastUpdatedTimestamp, (Long) fieldRecord.get(TOP_LEVEL_TS_FIELD_NAME)); + + // Handle DELETED_ELEM_TS_FIELD_NAME as a List/Collection + Object deletedTimestamps = fieldRecord.get(DELETED_ELEM_TS_FIELD_NAME); + if (deletedTimestamps instanceof List) { + for (Object ts: (List) deletedTimestamps) { + lastUpdatedTimestamp = Math.max(lastUpdatedTimestamp, ((Number) ts).longValue()); + } } - for (long timestamp: (long[]) ((GenericRecord) ((GenericRecord) timestampRecord).get(field.name())) - .get(ACTIVE_ELEM_TS_FIELD_NAME)) { - lastUpdatedTimestamp = Math.max(lastUpdatedTimestamp, timestamp); + + // Handle ACTIVE_ELEM_TS_FIELD_NAME as a List/Collection + Object activeTimestamps = fieldRecord.get(ACTIVE_ELEM_TS_FIELD_NAME); + if (activeTimestamps instanceof List) { + for (Object ts: (List) activeTimestamps) { + lastUpdatedTimestamp = Math.max(lastUpdatedTimestamp, ((Number) ts).longValue()); + } } } else if (field.schema().getType().equals(Schema.Type.LONG)) { lastUpdatedTimestamp = diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/rmd/TestRmdUtils.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/rmd/TestRmdUtils.java index 124c5214a0f..22fe658afe9 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/rmd/TestRmdUtils.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/rmd/TestRmdUtils.java @@ -168,7 +168,56 @@ public void testExtractOffsetVectorFromRmd() { @Test public void testExtractLatestTimestampFromRmd() { - Assert.assertEquals(30L, RmdUtils.getLastUpdateTimestamp(rmdRecordWithValidPerFieldLevelTimestamp)); - Assert.assertEquals(0L, RmdUtils.getLastUpdateTimestamp(rmdRecordWithOnlyRootLevelTimestamp)); + Assert.assertEquals(RmdUtils.getLastUpdateTimestamp(rmdRecordWithValidPerFieldLevelTimestamp), 30L); + Assert.assertEquals(RmdUtils.getLastUpdateTimestamp(rmdRecordWithOnlyRootLevelTimestamp), 0L); + } + + @Test + public void testGetLastUpdateTimestampWithCollectionFields() { + RmdSchemaGeneratorV1 rmdSchemaGeneratorV1 = new RmdSchemaGeneratorV1(); + Schema timestampSchema = rmdSchemaGeneratorV1.generateMetadataSchema(TestWriteUtils.USER_WITH_STRING_MAP_SCHEMA); + Schema mapFieldSchema = timestampSchema.getField("timestamp").schema().getTypes().get(1).getField("value").schema(); + + GenericRecord mapFieldRecord = new GenericData.Record(mapFieldSchema); + long[] activeElemTs = { 100L, 200L, 150L }; + long[] deletedElemTs = { 50L, 75L, 300L }; // 300L should be the max + mapFieldRecord.put(ACTIVE_ELEM_TS_FIELD_NAME, activeElemTs); + mapFieldRecord.put(TOP_LEVEL_TS_FIELD_NAME, 25L); + mapFieldRecord.put(DELETED_ELEM_TS_FIELD_NAME, deletedElemTs); + + GenericRecord timestampRecord = + new GenericData.Record(timestampSchema.getField("timestamp").schema().getTypes().get(1)); + timestampRecord.put("key", 10L); + timestampRecord.put("age", 50L); + timestampRecord.put("value", mapFieldRecord); + + GenericRecord rmdRecord = new GenericData.Record(timestampSchema); + rmdRecord.put(TIMESTAMP_FIELD_NAME, timestampRecord); + + Assert.assertEquals(RmdUtils.getLastUpdateTimestamp(rmdRecord), 300L); + } + + @Test + public void testGetLastUpdateTimestampWithEmptyCollections() { + RmdSchemaGeneratorV1 rmdSchemaGeneratorV1 = new RmdSchemaGeneratorV1(); + Schema timestampSchema = rmdSchemaGeneratorV1.generateMetadataSchema(TestWriteUtils.USER_WITH_STRING_MAP_SCHEMA); + Schema mapFieldSchema = timestampSchema.getField("timestamp").schema().getTypes().get(1).getField("value").schema(); + + GenericRecord mapFieldRecord = new GenericData.Record(mapFieldSchema); + mapFieldRecord.put(ACTIVE_ELEM_TS_FIELD_NAME, Collections.emptyList()); + mapFieldRecord.put(TOP_LEVEL_TS_FIELD_NAME, 25L); + mapFieldRecord.put(DELETED_ELEM_TS_FIELD_NAME, Collections.emptyList()); + + GenericRecord timestampRecord = + new GenericData.Record(timestampSchema.getField("timestamp").schema().getTypes().get(1)); + timestampRecord.put("key", 100L); // This should be the max + timestampRecord.put("age", 50L); + timestampRecord.put("value", mapFieldRecord); + + GenericRecord rmdRecord = new GenericData.Record(timestampSchema); + rmdRecord.put(TIMESTAMP_FIELD_NAME, timestampRecord); + + // Should return 100L (max from key field since collections are empty) + Assert.assertEquals(RmdUtils.getLastUpdateTimestamp(rmdRecord), 100L); } }