Skip to content
Draft

t #584

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dao-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
compile externalDependency.commonsLang
implementation 'com.google.protobuf:protobuf-java:3.21.1'
implementation spec.product.pegasus.restliServer
compileOnly 'javax.persistence:javax.persistence-api:2.2'
dataModel project(':core-models')
dataModel project(':validators')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.persistence.OptimisticLockException;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down Expand Up @@ -503,6 +504,7 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN
@Nonnull AuditStamp auditStamp, @Nonnull EqualityTester<ASPECT> equalityTester,
@Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {

// ye main function h
final ASPECT oldValue = latest.getAspect() == null ? null : latest.getAspect();
final AuditStamp oldAuditStamp = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getAudit();
final Long oldEmitTime = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getEmitTime();
Expand Down Expand Up @@ -543,11 +545,15 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN
}
}

// yaha p existing row se timestamp aaya hamare paas
//
final AuditStamp optimisticLockAuditStamp = extractOptimisticLockForAspectFromIngestionParamsIfPossible(ingestionParams, aspectClass, urn);

// Logic determines whether an update to aspect should be persisted.
// ye banda galat h i feel
if (!shouldUpdateAspect(ingestionParams.getIngestionMode(), urn, oldValue, newValue, aspectClass, auditStamp, equalityTester,
oldAuditStamp, optimisticLockAuditStamp)) {
// throw new IllegalArgumentException("Should not update aspect");
return new AddResult<>(oldValue, oldValue, aspectClass);
}

Expand Down Expand Up @@ -810,6 +816,7 @@ private <ASPECT extends RecordTemplate> ASPECT unwrapAddResult(URN urn, @Nonnull
}

// return the new value for updates and the old value for deletions
// yaha se hamare liye to new value hi return hogi
return isDeletion ? oldValue : newValue;
}

Expand Down Expand Up @@ -868,6 +875,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<ASPECT>, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp,
int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {
// AspectUpdateLambda -> aspect class, updateLambda returns new value, ingestionmode -> live
return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda, ingestionParams), auditStamp, maxTransactionRetry, trackingContext);
}

Expand Down Expand Up @@ -925,6 +933,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdate

// default test mode is false being set in
// {@link #rawAdd(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)}}
// yaha p retry limit reach nahi hogi kuki hum result return kar dete h same same
final AddResult<ASPECT> result =
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext, isRawUpdate),
maxTransactionRetry);
Expand Down Expand Up @@ -1187,6 +1196,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<ASPECT>, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {
// updateLambda returns newValue
return add(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext, ingestionParams);
}

Expand Down Expand Up @@ -1222,6 +1232,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
// starting point
IngestionParams nonNullIngestionParams = ingestionParams == null
? new IngestionParams().setIngestionMode(IngestionMode.LIVE) : ingestionParams;
final IngestionParams nonNullIngestionParamsWithTestMode = !nonNullIngestionParams.hasTestMode()
Expand Down Expand Up @@ -2016,8 +2027,13 @@ private <ASPECT extends RecordTemplate> boolean shouldUpdateAspect(IngestionMode
AspectIngestionAnnotation annotation = findIngestionAnnotationForEntity(ingestionAnnotations, urn);
Mode mode = annotation == null || !annotation.hasMode() ? Mode.DEFAULT : annotation.getMode();

boolean isOutdatedEtag = aspectTimestampSkipWrite(eTagAuditStamp, oldValueAuditStamp);
if (isOutdatedEtag) {
throw new OptimisticLockException("Outdated Etag Exception");
// throw new OutdatedEtagException("Outdated Etag Exception");
}
final boolean shouldSkipBasedOnValueVersionAuditStamp =
oldAndNewEqual || aspectVersionSkipWrite(newValue, oldValue) || aspectTimestampSkipWrite(eTagAuditStamp, oldValueAuditStamp);
oldAndNewEqual || aspectVersionSkipWrite(newValue, oldValue) || isOutdatedEtag;

// Skip saving for the following scenarios
if (mode != Mode.FORCE_UPDATE
Expand Down
Loading