-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Bail out from barriers when barriers hit 410 Lease Not Found.
#47232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jeet1995
wants to merge
29
commits into
Azure:main
Choose a base branch
from
jeet1995:failFastBarrierLeaseNotFound
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
1a5cf3f
Enable 410-1022 on Head requests to bail out.
jeet1995 f571a5e
Bail fast on barrier on reads.
jeet1995 cf65b20
Enhance tests to ensure primary is contacted and barrier request coun…
jeet1995 617c961
Enhance tests to ensure barrier post the QuorumSelected phase is invo…
jeet1995 8899ea0
Merge branch 'main' of github.com:jeet1995/azure-sdk-for-java into fa…
jeet1995 f0d3331
Code comments
jeet1995 e3b0db0
Adding a way to run tests against a multi-region Strong account.
jeet1995 5d87a6f
Adding a way to run tests against a multi-region Strong account.
jeet1995 d46d999
Validate sub-status code too.
jeet1995 77c9c5a
Code cleanup.
jeet1995 6e98e04
Add CHANGELOG.md entry.
jeet1995 2ef0be6
Modify barrier hit criteria.
jeet1995 8206806
Merge branch 'main' of github.com:jeet1995/azure-sdk-for-java into fa…
jeet1995 42ef429
Modify barrier hit criteria.
jeet1995 ad13540
Add tests for barrier bail out in Bounded Staleness consistency.
jeet1995 41cec42
Add tests for barrier bail out in Bounded Staleness consistency.
jeet1995 e4f93b0
Refactoring
jeet1995 23a8dee
Addressing code comments.
jeet1995 cff675e
Verify write barrier criteria.
jeet1995 0fc232b
Verify barrier bail out criteria.
jeet1995 150f2d0
Verify barrier bail out criteria.
jeet1995 d3ee96e
Merge branch 'main' of https://github.com/jeet1995/azure-sdk-for-java…
jeet1995 84deb31
Managing merge.
jeet1995 a49950b
Fix compilation errors.
jeet1995 822ee0f
Fix tests.
jeet1995 bca781b
Fix tests.
jeet1995 510e9c4
Addressing comments.
jeet1995 a7683f2
Merge branch 'main' of https://github.com/jeet1995/azure-sdk-for-java…
jeet1995 6fce95b
Addressing comments.
jeet1995 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
543 changes: 543 additions & 0 deletions
543
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/BailOutFromBarrierE2ETests.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
149 changes: 149 additions & 0 deletions
149
...ava/com/azure/cosmos/implementation/directconnectivity/StoreResponseInterceptorUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.cosmos.implementation.directconnectivity; | ||
|
|
||
| import com.azure.cosmos.ConsistencyLevel; | ||
| import com.azure.cosmos.implementation.OperationType; | ||
| import com.azure.cosmos.implementation.RxDocumentServiceRequest; | ||
| import com.azure.cosmos.implementation.Utils; | ||
|
|
||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.BiFunction; | ||
|
|
||
| public class StoreResponseInterceptorUtils { | ||
|
|
||
| public static BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> forceBarrierFollowedByBarrierFailure( | ||
| ConsistencyLevel operationConsistencyLevel, | ||
| String regionName, | ||
| int maxAllowedFailureCount, | ||
| AtomicInteger failureCount, | ||
| int statusCode, | ||
| int subStatusCode) { | ||
|
|
||
| return (request, storeResponse) -> { | ||
|
|
||
| if (OperationType.Create.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
|
|
||
| long localLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN)); | ||
| long manipulatedGCLSN = localLsn - 1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGCLSN)); | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| if (OperationType.Read.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
|
|
||
| if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) { | ||
|
|
||
| long globalLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN)); | ||
| long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN)); | ||
| long manipulatedGlobalCommittedLSN = Math.min(globalLsn, itemLsn) - 1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGlobalCommittedLSN)); | ||
|
|
||
| return storeResponse; | ||
| } else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) { | ||
|
|
||
| long manipulatedItemLSN = -1; | ||
| long manipulatedGlobalLSN = 0; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN)); | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| if (OperationType.Head.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
| if (failureCount.incrementAndGet() <= maxAllowedFailureCount) { | ||
| throw Utils.createCosmosException(statusCode, subStatusCode, new Exception("An intercepted exception occurred. Check status and substatus code for details."), null); | ||
| } | ||
| } | ||
|
|
||
| return storeResponse; | ||
| }; | ||
| } | ||
|
|
||
| public static BiFunction<RxDocumentServiceRequest, StoreResponse, StoreResponse> forceSuccessfulBarriersOnReadUntilQuorumSelectionThenForceBarrierFailures( | ||
| ConsistencyLevel operationConsistencyLevel, | ||
| String regionName, | ||
| int allowedSuccessfulHeadRequestsWithoutBarrierBeingMet, | ||
| AtomicInteger successfulHeadRequestCount, | ||
| int maxAllowedFailureCount, | ||
| AtomicInteger failureCount, | ||
| int statusCode, | ||
| int subStatusCode) { | ||
|
|
||
| return (request, storeResponse) -> { | ||
|
|
||
| if (OperationType.Read.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
|
|
||
| if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) { | ||
|
|
||
| long globalLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN)); | ||
| long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN)); | ||
| long manipulatedGlobalCommittedLSN = Math.min(globalLsn, itemLsn) - 1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGlobalCommittedLSN)); | ||
|
|
||
| return storeResponse; | ||
| } else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) { | ||
|
|
||
| long manipulatedItemLSN = -1; | ||
| long manipulatedGlobalLSN = 0; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN)); | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| if (OperationType.Head.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { | ||
|
|
||
| if (successfulHeadRequestCount.incrementAndGet() <= allowedSuccessfulHeadRequestsWithoutBarrierBeingMet) { | ||
|
|
||
| if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) { | ||
|
|
||
| long localLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN)); | ||
| long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN)); | ||
| long manipulatedGCLSN = Math.min(localLsn, itemLsn) - 1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGCLSN)); | ||
|
|
||
| return storeResponse; | ||
| } else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) { | ||
|
|
||
| long manipulatedItemLSN = -1; | ||
| long manipulatedGlobalLSN = -1; | ||
|
|
||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN)); | ||
| storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN)); | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| return storeResponse; | ||
| } | ||
|
|
||
| if (failureCount.incrementAndGet() <= maxAllowedFailureCount) { | ||
| throw Utils.createCosmosException(statusCode, subStatusCode, new Exception("An intercepted exception occurred. Check status and substatus code for details."), null); | ||
| } | ||
| } | ||
|
|
||
| return storeResponse; | ||
| }; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[blocking]: wire up Netty bytebuf allocation tracking configs. cc: @FabianMeiswinkel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has been merged now - so, should be easy to copy the same settings