Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.StringArrayMap;
import com.linkedin.data.template.StringArrayMapMap;
import com.linkedin.data.template.StringArrayMapMapArray;
import com.linkedin.testing.AnnotatedAspectBar;
import com.linkedin.testing.AnnotatedAspectFoo;
import com.linkedin.testing.BarAspect;
import com.linkedin.testing.CommonAspect;
import com.linkedin.testing.CollectionAnnotatedAspectBar;
import com.linkedin.testing.SearchAnnotatedAspectBar;

import java.util.Optional;
Expand Down Expand Up @@ -116,4 +120,30 @@ public void parseAspectWithOnlySearchIndexAnnotations() {
}

// TODO: if add support for disallowing certain search annotations, add tests for them

@Test
public void parseAspectWithCollectionAnnotations() {
final Optional<GmaAnnotation> gma =
new GmaAnnotationParser().parse((RecordDataSchema) DataTemplateUtil.getSchema(CollectionAnnotatedAspectBar.class));

StringArray fooPaths = new StringArray("x.y", "x.z");
StringArrayMap fooMap = new StringArrayMap();
fooMap.put("paths", fooPaths);
StringArrayMapMap foo = new StringArrayMapMap();
foo.put("foo", fooMap);

StringArray barPaths = new StringArray("abc", "def");
StringArrayMap barMap = new StringArrayMap();
barMap.put("paths", barPaths);
StringArrayMapMap bar = new StringArrayMapMap();
bar.put("bar", barMap);

StringArrayMapMapArray primaryKeys = new StringArrayMapMapArray(foo, bar);
assertThat(gma).contains(new GmaAnnotation().setCollection(
new CollectionAnnotation()
.setIsCollection(true)
.setPrimaryKeys(primaryKeys)
.setDefaultUpdateBehavior(UpdateBehavior.REPLACE_BY_ACTOR)
));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace com.linkedin.metadata.annotations

/**
* Annotation for collections-related attributes
*/
record CollectionAnnotation {
/**
* Whether a model is a collection
*/
isCollection: optional boolean

/**
* A list of key-values representing the primary keys to use for updating collection-typed fields. The keys are
* strings representing collections field names. The values are lists of field masks indicating the key(s) to be used for updates on that field.
*
* e.g. [{"my_struct.foo": {"paths": ["x.y", "x.z"]}}] -> the foo field of my_struct is a list, which has primary keys of x.y (where x is a struct
* and y is a field of x) and x.z (where z is also a field of x). A real world example could be lineage where x is an Upstream struct,
* x.y is the dataset urn, and x.z is the lastmodifiedby value.
*/
primaryKeys: optional array[map[string, map[string, array[string]]]]

/**
* How to handle updates. The default/implicit behavior is REPLACE_BY_KEY
*/
defaultUpdateBehavior: optional enum UpdateBehavior {
REPLACE_BY_KEY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you plan to support delete?

Copy link
Contributor Author

@jsdonn jsdonn Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can do it here at the model level, since all AssetLabel instances will have to have the same UpdateBehavior. Instead, I'm thinking to use the IngestionMode field of IngestionParams to specify what kind of update is to be performed. So something like:

LabelService
    -> update(new label)
             -> call assetService.update(mode=LIVE) w/ default UpdateBehavior
    -> delete(existing label)
             -> call assetService.update(mode=DELETE_FROM_COLLECTION) which will override the UpdateBehavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or even more generic than the LabelService, maybe we can have a CollectionsService just for handling Collections-type aspects

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we do the collection, let's not have a separate service for it then. Probably you can explore the existing delete api we already have in MMG. And saying that we have default update behavior and default delete behavior etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I can also update the delete API to include IngestionParams

REPLACE_BY_ACTOR,
REPLACE_ALL
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ record GmaAnnotation {
* Information about GMA Search functionality.
*/
search: optional SearchAnnotation

/**
* Information about collections
*/
collection: optional CollectionAnnotation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have another annotation defined in the plugin, how will that work? can we consolidate the two here? I will prefer us to deprecate the ppl annotation here at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The annotation on the internal model will get translated to this pdl annotation (there will be logic in metadata-models-plugin to do this). We cannot fully deprecate the pdl annotation without either moving datahub-gma into a LI MP or starting to rely on proto in datahub-gma.

Essentially, the proto annotation (i.e. (.proto.mg.collection={...}) will get translated into this pdl annotation @gma.collection = {...}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to process the annotation and do some customization on the annotation, can we just do it in MGA (i.e. a layer up of DAO). I think you will anyway need to check the update method and the collection type before we enter DAO layer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the main tradeoff for where we put the annotation processing logic is how many db operations need to be done?

if we put in MGA, we need to do

  1. get() to get the latest value of the collection so we may manipulate it before calling dao.add
    2a) getLatest() as part of the dao.add
    2b) saveLatest()

if we put in DAO, we just don't need the first get() so it will just be 1 read, 1 write.

but agree with you that in the second case, we will have to process the annotation in both MGA and in DAO.

I admit that the performance gained by going with this approach is insignificant, let's go just put the annotation and processing in MGA instead. thanks for the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I just saw this message, thanks for taking the suggestion. But for the part where we process the annotation on MGA, I would suggest you to call the add with preprocessFunction method to avoid the duplicate get call and make sure the update is atomic.


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace com.linkedin.testing

/**
* For unit tests
*/
@gma.collection = {
"isCollection" : true,
"primaryKeys" : [ {
"foo" : {
"paths" : [ "x.y", "x.z" ]
}
}, {
"bar" : {
"paths" : [ "abc", "def" ]
}
} ],
"defaultUpdateBehavior" : "REPLACE_BY_ACTOR"
}
record CollectionAnnotatedAspectBar {

/** For unit tests */
stringField: string

/** For unit tests */
boolField: boolean

/** For unit tests */
longField: long

/** For unit tests */
arrayField: array[string]
}