Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace com.linkedin.metadata.internal

import com.linkedin.metadata.events.IngestionMode
import com.linkedin.metadata.events.IngestionTrackingContext
import pegasus.com.linkedin.metadata.events.IngestionAspectETag
import pegasus.com.linkedin.metadata.aspect.RequestMetadata

/**
* Record defining ingestion-related parameters that can be passed into DAO API calls.
Expand All @@ -25,7 +25,7 @@ record IngestionParams {
testMode: boolean = false

/**
* eTag for aspect ingestion optimistic locking
* Metadata for API request
*/
ingestionETags: optional array[IngestionAspectETag] = [ ]
request_metadata: optional RequestMetadata
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace pegasus.com.linkedin.metadata.aspect

/**
* *
* Metadata associated with an aspect
*/
record AspectMetadata {

/**
* aspect alias name
*/
alias: optional string = ""

/**
* used for optimistic locking in update API
*/
etag: optional string = ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace pegasus.com.linkedin.metadata.aspect

/**
* *
* Metadata for API request
*/
record RequestMetadata {

/**
* * Aspect metadata map
*/
aspect_metadata: optional array[AspectMetadata] = [ ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace pegasus.com.linkedin.metadata.aspect

/**
* *
* Metadata for API response
*/
record ResponseMetadata {

/**
* * Aspect metadata map
*/
aspect_metadata: optional array[AspectMetadata] = [ ]
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
import javax.persistence.Table;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import pegasus.com.linkedin.metadata.events.IngestionAspectETag;
import pegasus.com.linkedin.metadata.events.IngestionAspectETagArray;
import pegasus.com.linkedin.metadata.aspect.AspectMetadata;
import pegasus.com.linkedin.metadata.aspect.RequestMetadata;

import static com.linkedin.metadata.dao.EbeanLocalAccess.*;
import static com.linkedin.metadata.dao.EbeanMetadataAspect.*;
Expand Down Expand Up @@ -619,10 +619,10 @@ public <ASPECT extends RecordTemplate> AuditStamp extractOptimisticLockForAspect

AuditStamp optimisticLockAuditStamp = null;

final IngestionAspectETagArray ingestionAspectETags = ingestionParams.getIngestionETags();
final RequestMetadata requestMetadata = ingestionParams.getRequest_metadata();

if (ingestionAspectETags != null) {
for (IngestionAspectETag ingestionAspectETag: ingestionAspectETags) {
if (requestMetadata != null && requestMetadata.getAspect_metadata() != null) {
for (AspectMetadata aspectMetadata: requestMetadata.getAspect_metadata()) {

final String aspectAlias;

Expand All @@ -632,8 +632,8 @@ public <ASPECT extends RecordTemplate> AuditStamp extractOptimisticLockForAspect
continue;
}

if (aspectAlias != null && aspectAlias.equalsIgnoreCase(ingestionAspectETag.getAspect_alias())) {
Long decryptedETag = getDecryptedETag(ingestionAspectETag);
if (aspectAlias != null && aspectAlias.equalsIgnoreCase(aspectMetadata.getAlias())) {
Long decryptedETag = getDecryptedETag(aspectMetadata);
if (decryptedETag != null) {
optimisticLockAuditStamp = new AuditStamp();
optimisticLockAuditStamp.setTime(decryptedETag);
Expand All @@ -649,12 +649,12 @@ public <ASPECT extends RecordTemplate> AuditStamp extractOptimisticLockForAspect
* When eTag is null, it means this is a regular ingestion request, no read-modify-write consistency guarantee.
*/
@Nullable
private Long getDecryptedETag(@Nonnull IngestionAspectETag ingestionAspectETag) {
private Long getDecryptedETag(@Nonnull AspectMetadata aspectMetadata) {
try {
if (ingestionAspectETag.getEtag() == null) {
if (aspectMetadata.getEtag() == null) {
return null;
}
return ETagUtils.decrypt(ingestionAspectETag.getEtag());
return ETagUtils.decrypt(aspectMetadata.getEtag());
} catch (Exception e) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import pegasus.com.linkedin.metadata.events.IngestionAspectETag;
import pegasus.com.linkedin.metadata.events.IngestionAspectETagArray;
import pegasus.com.linkedin.metadata.aspect.AspectMetadata;
import pegasus.com.linkedin.metadata.aspect.AspectMetadataArray;
import pegasus.com.linkedin.metadata.aspect.RequestMetadata;

import static com.linkedin.common.AuditStamps.*;
import static com.linkedin.metadata.dao.internal.BaseGraphWriterDAO.RemovalOption.*;
Expand Down Expand Up @@ -4190,13 +4191,17 @@ public void testExtractOptimisticLockForAspectFromIngestionParamsIfPossible()

FooUrn urn = new FooUrn(1);

IngestionAspectETag ingestionAspectETag = new IngestionAspectETag();
ingestionAspectETag.setAspect_alias("aspectFoo");
long timestamp = 1750796203701L;
ingestionAspectETag.setEtag("KsFkRXtjaBGQf37HjdEjDQ==");

AspectMetadata aspectMetadata = new AspectMetadata();
aspectMetadata.setAlias("aspectFoo");
aspectMetadata.setEtag("KsFkRXtjaBGQf37HjdEjDQ==");

RequestMetadata requestMetadata = new RequestMetadata();
requestMetadata.setAspect_metadata(new AspectMetadataArray(aspectMetadata));

IngestionParams ingestionParams = new IngestionParams();
ingestionParams.setIngestionETags(new IngestionAspectETagArray(ingestionAspectETag));
ingestionParams.setRequest_metadata(requestMetadata);

AuditStamp result = dao.extractOptimisticLockForAspectFromIngestionParamsIfPossible(ingestionParams, AspectFoo.class,
urn);
Expand Down Expand Up @@ -4224,12 +4229,15 @@ public void testExtractOptimisticLockForAspectFromIngestionParamsIfPossibleAspec

FooUrn urn = new FooUrn(1);

IngestionAspectETag ingestionAspectETag = new IngestionAspectETag();
ingestionAspectETag.setAspect_alias("aspectBar");
ingestionAspectETag.setEtag("KsFkRXtjaBGQf37HjdEjDQ==");
AspectMetadata aspectMetadata = new AspectMetadata();
aspectMetadata.setAlias("aspectBar");
aspectMetadata.setEtag("KsFkRXtjaBGQf37HjdEjDQ==");

RequestMetadata requestMetadata = new RequestMetadata();
requestMetadata.setAspect_metadata(new AspectMetadataArray(aspectMetadata));

IngestionParams ingestionParams = new IngestionParams();
ingestionParams.setIngestionETags(new IngestionAspectETagArray(ingestionAspectETag));
ingestionParams.setRequest_metadata(requestMetadata);

AuditStamp result = dao.extractOptimisticLockForAspectFromIngestionParamsIfPossible(ingestionParams, AspectFoo.class,
urn);
Expand Down