Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.metadata.annotations.Mode;
import com.linkedin.metadata.annotations.UrnFilter;
import com.linkedin.metadata.annotations.UrnFilterArray;
import com.linkedin.metadata.aspect.SoftDeletedAspect;
import com.linkedin.metadata.backfill.BackfillMode;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.equality.DefaultEqualityTester;
Expand All @@ -38,6 +39,7 @@
import com.linkedin.metadata.dao.tracking.TrackingUtils;
import com.linkedin.metadata.dao.urnpath.UrnPathExtractor;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.events.ChangeType;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.events.IngestionTrackingContext;
Expand All @@ -59,6 +61,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -1668,7 +1671,7 @@ public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTe
aspectClasses == null ? getValidAspectTypes(_aspectUnionClass) : aspectClasses;
checkValidAspects(aspectToBackfill);
final Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> urnToAspects =
get(aspectToBackfill, urns);
get(aspectToBackfill, urns, true);
urnToAspects.forEach((urn, aspects) -> {
aspects.forEach((aspectClass, aspect) -> aspect.ifPresent(value -> backfill(mode, value, urn)));
});
Expand Down Expand Up @@ -1741,6 +1744,8 @@ public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTe
private <ASPECT extends RecordTemplate> void backfill(@Nonnull BackfillMode mode, @Nonnull ASPECT aspect,
@Nonnull URN urn) {

RecordTemplate oldValue = probeSoftDeletedValue(aspect, urn.toString());

if (mode == BackfillMode.MAE_ONLY
|| mode == BackfillMode.BACKFILL_ALL
|| mode == BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX) {
Expand All @@ -1749,9 +1754,9 @@ private <ASPECT extends RecordTemplate> void backfill(@Nonnull BackfillMode mode
IngestionTrackingContext trackingContext = buildIngestionTrackingContext(
TrackingUtils.getRandomUUID(), BACKFILL_EMITTER, System.currentTimeMillis());

_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, aspect.getClass(), null, trackingContext, ingestionMode);
_trackingProducer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, aspect, aspect.getClass(), null, trackingContext, ingestionMode);
} else {
_producer.produceAspectSpecificMetadataAuditEvent(urn, aspect, aspect, aspect.getClass(), null, ingestionMode);
_producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, aspect, aspect.getClass(), null, ingestionMode);
}
}
}
Expand Down Expand Up @@ -1878,6 +1883,38 @@ public <ASPECT extends RecordTemplate> Optional<AspectWithExtraInfo<ASPECT>> get
return getWithExtraInfo(aspectClass, urn, LATEST_VERSION);
}

/**
* Similar to {@link com.linkedin.metadata.dao.BaseReadDAO}'s {@link #get(Set, Set)} but includes a flag to be able to
* retrieve SoftDeletedAspect entries as well.
*
* <p>This is currently used to parse soft-deleted metadata in order to backfill the search index for deleted items.
*
* <p>This is not advised for general use cases since deleted metadata will be returned.
*
* @param aspectClasses the set of aspect classes to retrieve
* @param urns the set of URNs to retrieve
* @param includeSoftDeleted whether to include soft deleted aspects
* @return a map of URN to a map of aspect class to the corresponding aspect value
*/
@Nonnull
public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> get(
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nonnull Set<URN> urns, boolean includeSoftDeleted) {
final Set<AspectKey<URN, ? extends RecordTemplate>> keys = getKeysForLatestVersion(urns, aspectClasses);

final Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> results = new HashMap<>();
get(keys, includeSoftDeleted).forEach((key, value) -> {
final URN urn = key.getUrn();
results.putIfAbsent(urn, new HashMap<>());
results.get(urn).put(key.getAspectClass(), value);
});

return results;
}

@Nonnull
public abstract Map<AspectKey<URN, ? extends RecordTemplate>, Optional<? extends RecordTemplate>> get(
@Nonnull Set<AspectKey<URN, ? extends RecordTemplate>> keys, boolean includeSoftDeleted);

/**
* Generates a new string ID that's guaranteed to be globally unique.
*/
Expand Down Expand Up @@ -2093,4 +2130,22 @@ protected <ASPECT extends RecordTemplate> AspectUpdateResult aspectCallbackHelpe
}
return new AspectUpdateResult(newAspectValue, false);
}

@VisibleForTesting
@Nonnull
protected static <ASPECT extends RecordTemplate> RecordTemplate probeSoftDeletedValue(
@Nonnull ASPECT aspect, @Nonnull String urn) {
if (aspect instanceof SoftDeletedAspect) {
SoftDeletedAspect softDeletedAspect = (SoftDeletedAspect) aspect;
if (!softDeletedAspect.hasGma_deleted_content()) {
throw new NoSuchElementException(
String.format("SoftDeletedAspect found for urn <%s> does not have gma_deleted_content field, cannot be backfilled.", urn));
}
return RecordUtils.toRecordTemplate(
softDeletedAspect.getGma_deleted_content().getCanonicalName(),
softDeletedAspect.getGma_deleted_content().getAspect());
}
// if not a soft deleted aspect, return the original aspect as-is
return aspect;
}
}
16 changes: 8 additions & 8 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseReadDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.linkedin.metadata.validator.AspectValidator;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -78,13 +77,7 @@ public <ASPECT extends RecordTemplate> Optional<ASPECT> get(@Nonnull Class<ASPEC
public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> get(
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nonnull Set<URN> urns) {


final Set<AspectKey<URN, ? extends RecordTemplate>> keys = new HashSet<>();
for (URN urn : urns) {
for (Class<? extends RecordTemplate> aspect : aspectClasses) {
keys.add(new AspectKey<>(aspect, urn, LATEST_VERSION));
}
}
final Set<AspectKey<URN, ? extends RecordTemplate>> keys = getKeysForLatestVersion(urns, aspectClasses);

final Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> results = new HashMap<>();
get(keys).entrySet().forEach(entry -> {
Expand Down Expand Up @@ -134,4 +127,11 @@ protected void checkValidAspect(@Nonnull Class<? extends RecordTemplate> aspectC
protected void checkValidAspects(@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses) {
aspectClasses.forEach(aspectClass -> checkValidAspect(aspectClass));
}

protected Set<AspectKey<URN, ? extends RecordTemplate>> getKeysForLatestVersion(
@Nonnull Set<URN> urns, @Nonnull Set<Class<? extends RecordTemplate>> aspectClasses) {
return urns.stream()
.flatMap(urn -> aspectClasses.stream().map(aspectClass -> new AspectKey<>(aspectClass, urn, LATEST_VERSION)))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ public static RecordTemplate toRecordTemplate(@Nonnull String className, @Nonnul
return toRecordTemplate(clazz, dataMap);
}

/**
* The string-only version of the toRecordTemplate methods.
*
* @param className FQCN of the record class extending RecordTemplate
* @param jsonString a JSON string serialized using {@link JacksonDataTemplateCodec}
* @return the created {@link RecordTemplate}
*/
@Nonnull
public static RecordTemplate toRecordTemplate(@Nonnull String className, @Nonnull String jsonString) {
return toRecordTemplate(className, toDataMap(jsonString));
}

/**
* Gets the aspect from the aspect class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.aspect.AuditedAspect;
import com.linkedin.metadata.aspect.SoftDeletedAspect;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.ingestion.AspectCallbackMapKey;
import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient;
Expand Down Expand Up @@ -41,6 +43,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -227,6 +230,13 @@ public long newNumericId(String namespace, int maxTransactionRetry) {
return Collections.emptyMap();
}

@Override
@Nonnull
public Map<AspectKey<FooUrn, ? extends RecordTemplate>, Optional<? extends RecordTemplate>> get(
Set<AspectKey<FooUrn, ? extends RecordTemplate>> aspectKeys, boolean includeSoftDeleted) {
return Collections.emptyMap();
}

@Override
@Nonnull
public Map<AspectKey<FooUrn, ? extends RecordTemplate>, AspectWithExtraInfo<? extends RecordTemplate>> getWithExtraInfo(
Expand Down Expand Up @@ -802,4 +812,28 @@ public void testAspectCallbackHelperWithUnregisteredAspect() throws URISyntaxExc
// Verify that the result is the same as the input aspect since it's not registered
assertEquals(result.getUpdatedAspect(), foo);
}

@Test
public void testProbeSoftDeletedValue() throws URISyntaxException {
FooUrn urn = new FooUrn(1);

// bypass (non-soft-deleted) pathway
AspectBar barExpected = new AspectBar().setValue("foo");
RecordTemplate fooActual = BaseLocalDAO.probeSoftDeletedValue(barExpected, urn.toString());
assertEquals(fooActual, barExpected);

// missing gma_deleted_content pathway
SoftDeletedAspect softDeletedAspectExpected = new SoftDeletedAspect();
assertThrows(NoSuchElementException.class,
() -> BaseLocalDAO.probeSoftDeletedValue(softDeletedAspectExpected, urn.toString()));

// ideal soft-delete pathway
AuditedAspect softDeletedContent = new AuditedAspect();
softDeletedContent.setAspect("{\"value\": \"foo\"}");
softDeletedContent.setCanonicalName("com.linkedin.testing.AspectBar");

softDeletedAspectExpected.setGma_deleted_content(softDeletedContent);
RecordTemplate softDeletedAspectIdealActual = BaseLocalDAO.probeSoftDeletedValue(softDeletedAspectExpected, urn.toString());
assertEquals(softDeletedAspectIdealActual, barExpected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public void testToRecordTemplate() throws IOException {

assertEquals(actual2.getClass(), AspectFoo.class);
assertEquals(actual2, expected);

RecordTemplate actual3 = RecordUtils.toRecordTemplate("com.linkedin.testing.AspectFoo", jsonString);
assertEquals(actual3, expected);
}

@Test(expectedExceptions = ModelConversionException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.aspect.SoftDeletedAspect;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.exception.ModelConversionException;
Expand Down Expand Up @@ -791,7 +792,7 @@ protected <ASPECT extends RecordTemplate> AspectEntry<ASPECT> getLatest(@Nonnull
}
final ExtraInfo extraInfo = toExtraInfo(latest);

if (isSoftDeletedAspect(latest, aspectClass)) {
if (isSoftDeletedAspect(latest)) {
return new AspectEntry<>(null, extraInfo, true);
}

Expand Down Expand Up @@ -1077,6 +1078,13 @@ protected <ASPECT extends RecordTemplate> void applyTimeBasedRetention(@Nonnull
@Nonnull
public Map<AspectKey<URN, ? extends RecordTemplate>, Optional<? extends RecordTemplate>> get(
@Nonnull Set<AspectKey<URN, ? extends RecordTemplate>> keys) {
return get(keys, false);
}

@Override
@Nonnull
public Map<AspectKey<URN, ? extends RecordTemplate>, Optional<? extends RecordTemplate>> get(
@Nonnull Set<AspectKey<URN, ? extends RecordTemplate>> keys, boolean includeSoftDeleted) {
if (keys.isEmpty()) {
return Collections.emptyMap();
}
Expand All @@ -1095,7 +1103,8 @@ protected <ASPECT extends RecordTemplate> void applyTimeBasedRetention(@Nonnull
.collect(Collectors.toMap(Function.identity(), key -> records.stream()
.filter(record -> matchKeys(key, record.getKey()))
.findFirst()
.flatMap(record -> toRecordTemplate(key.getAspectClass(), record))));
.flatMap(record -> includeSoftDeleted ? (Optional<RecordTemplate>) toRecordTemplateIncludeSoftDelete(key.getAspectClass(), record)
: (Optional<RecordTemplate>) toRecordTemplate(key.getAspectClass(), record))));
}

@Override
Expand Down Expand Up @@ -1259,14 +1268,14 @@ List<EbeanMetadataAspect> batchGetHelper(@Nonnull List<AspectKey<URN, ? extends
}

if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) {
return _localAccess.batchGetUnion(keys, keysCount, position, false, false);
return _localAccess.batchGetUnion(keys, keysCount, position, true, false);
}

if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) {
// Compare results from both new and old schemas
final List<EbeanMetadataAspect> resultsOldSchema = batchGetUnion(keys, keysCount, position);
final List<EbeanMetadataAspect> resultsNewSchema =
_localAccess.batchGetUnion(keys, keysCount, position, false, false);
_localAccess.batchGetUnion(keys, keysCount, position, false, false); // TODO: should soft-deleted aspects be included?
EBeanDAOUtils.compareResults(resultsOldSchema, resultsNewSchema, "batchGet");
return resultsOldSchema;
}
Expand Down Expand Up @@ -1466,16 +1475,25 @@ URN getUrn(@Nonnull String urn) {
@Nonnull
static <ASPECT extends RecordTemplate> Optional<ASPECT> toRecordTemplate(@Nonnull Class<ASPECT> aspectClass,
@Nonnull EbeanMetadataAspect aspect) {
if (isSoftDeletedAspect(aspect, aspectClass)) {
if (isSoftDeletedAspect(aspect)) {
return Optional.empty();
}
return Optional.of(RecordUtils.toRecordTemplate(aspectClass, aspect.getMetadata()));
}

@Nonnull
static <ASPECT extends RecordTemplate> Optional<? extends RecordTemplate> toRecordTemplateIncludeSoftDelete(
@Nonnull Class<ASPECT> aspectClass, @Nonnull EbeanMetadataAspect aspect) {
if (isSoftDeletedAspect(aspect)) {
return Optional.of(RecordUtils.toRecordTemplate(SoftDeletedAspect.class, aspect.getMetadata()));
}
return Optional.of(RecordUtils.toRecordTemplate(aspectClass, aspect.getMetadata()));
}

@Nonnull
static <ASPECT extends RecordTemplate> Optional<AspectWithExtraInfo<ASPECT>> toRecordTemplateWithExtraInfo(
@Nonnull Class<ASPECT> aspectClass, @Nonnull EbeanMetadataAspect aspect) {
if (aspect.getMetadata() == null || isSoftDeletedAspect(aspect, aspectClass)) {
if (aspect.getMetadata() == null || isSoftDeletedAspect(aspect)) {
return Optional.empty();
}
final ExtraInfo extraInfo = toExtraInfo(aspect);
Expand Down
Loading