Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,9 @@ private static RxDocumentServiceResponse mockRxDocumentServiceResponse(String co
HttpResponseStatus.OK.code(),
headers,
new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true),
blob.length);
blob.length,
null,
null);

RxDocumentServiceResponse documentServiceResponse = new RxDocumentServiceResponse(new DiagnosticsClientContext() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public StoreResponseBuilder withContent(String content) {
public StoreResponse build() {
ByteBuf buffer = getUTF8BytesOrNull(content);
if (buffer == null) {
return new StoreResponse(null, status, headers, null, 0);
return new StoreResponse(null, status, headers, null, 0, null, null);
}
return new StoreResponse(null, status, headers, new ByteBufInputStream(buffer, true), buffer.readableBytes());
return new StoreResponse(null, status, headers, new ByteBufInputStream(buffer, true), buffer.readableBytes(), null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public void validateAllSetValuesInCosmosBulkItemResponse() {
HttpResponseStatus.OK.code(),
headers,
new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true),
blob.length);
blob.length,
null,
null);

CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse(
new RxDocumentServiceResponse(null, storeResponse),
Expand Down Expand Up @@ -166,7 +168,9 @@ public void validateEmptyHeaderInCosmosBulkItemResponse() {
HttpResponseStatus.OK.code(),
new HashMap<>(),
new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true),
blob.length);
blob.length,
null,
null);

CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse(
new RxDocumentServiceResponse(null, storeResponse),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public void validateAllSetValuesInResponse() {
HttpResponseStatus.OK.code(),
headers,
new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true),
blob.length);
blob.length,
null,
null);

CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse(
new RxDocumentServiceResponse(null, storeResponse),
Expand Down Expand Up @@ -143,7 +145,9 @@ public void validateEmptyHeaderInResponse() {
HttpResponseStatus.OK.code(),
new HashMap<>(),
new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true),
blob.length);
blob.length,
null,
null);

CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse(
new RxDocumentServiceResponse(null, storeResponse),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void getLsnAndGlobalCommittedLsn() {
headers.put(WFConstants.BackendHeaders.LSN, "3");
headers.put(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, "2");

StoreResponse sr = new StoreResponse(null, 0, headers, null, 0);
StoreResponse sr = new StoreResponse(null, 0, headers, null, 0, null, null);
Utils.ValueHolder<Long> lsn = Utils.ValueHolder.initialize(-2l);
Utils.ValueHolder<Long> globalCommittedLsn = Utils.ValueHolder.initialize(-2l);
ConsistencyWriter.getLsnAndGlobalCommittedLsn(sr, lsn, globalCommittedLsn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void parsingBytesWithInvalidUT8Bytes() {
try {
byte[] bytes = hexStringToByteArray(invalidHexString);
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
JsonNodeStorePayload jsonNodeStorePayload = new JsonNodeStorePayload(new ByteBufInputStream(byteBuf), bytes.length, new HashMap<>());
JsonNodeStorePayload jsonNodeStorePayload = new JsonNodeStorePayload(new ByteBufInputStream(byteBuf), bytes.length, new HashMap<>(), null, null);
jsonNodeStorePayload.getPayload().toString();
} finally {
System.clearProperty("COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void stringContent() {
headerMap.put("key2", "value2");

ByteBuf buffer = getUTF8BytesOrNull(jsonContent);
StoreResponse sp = new StoreResponse(null, 200, headerMap, new ByteBufInputStream(buffer, true), buffer.readableBytes());
StoreResponse sp = new StoreResponse(null, 200, headerMap, new ByteBufInputStream(buffer, true), buffer.readableBytes(), null, null);

assertThat(sp.getStatus()).isEqualTo(200);
assertThat(sp.getResponseBodyAsJson().get("id").asText()).isEqualTo(content);
Expand All @@ -39,7 +39,7 @@ public void headerNamesAreCaseInsensitive() {
headerMap.put("KEY3", "value3");

ByteBuf buffer = getUTF8BytesOrNull(jsonContent);
StoreResponse sp = new StoreResponse(null, 200, headerMap, new ByteBufInputStream(buffer, true), buffer.readableBytes());
StoreResponse sp = new StoreResponse(null, 200, headerMap, new ByteBufInputStream(buffer, true), buffer.readableBytes(), null, null);

assertThat(sp.getStatus()).isEqualTo(200);
assertThat(sp.getResponseBodyAsJson().get("id").asText()).isEqualTo(content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public StoreResultDiagnosticsSerializerTests() throws IOException {
//TODO: add more test cases
@Test(groups = "unit")
public void storeResultDiagnosticsSerializerTests() {
StoreResponse storeResponse = new StoreResponse(null, 200, new HashMap<>(), null, 0);
StoreResponse storeResponse = new StoreResponse(null, 200, new HashMap<>(), null, 0, null, null);
StoreResult storeResult = new StoreResult(
storeResponse,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.azure.cosmos.implementation.DatabaseAccountLocation;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.implementation.Utils;
Expand Down Expand Up @@ -53,6 +55,7 @@
import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorResult;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType;
import com.azure.cosmos.test.faultinjection.JsonParseInterceptorHelper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -2101,6 +2104,72 @@ public void verifyLeasesOnRestart_AfterSplit() throws InterruptedException {
}
}

/**
* Tests that ChangeFeedProcessor gracefully handles StreamConstraintsException during JSON parsing.
*
* <p>This test uses a GLOBAL interceptor to inject a StreamConstraintsException on the first
* JSON parse operation across all threads (including CFP worker threads from thread pools).</p>
*
* <p><strong>IMPORTANT:</strong> This test should NOT run in parallel with other tests that use
* the JSON parse interceptor, as the interceptor is global and would cause interference.</p>
*
* <p>Expected behavior: The ChangeFeedProcessor should retry with reduced maxItemCount and
* eventually process all documents successfully.</p>
*/
@Test(groups = { "long-emulator" }, timeOut = 50000, singleThreaded = true)
public void changeFeedProcessorHandlesStreamConstraintsException() throws Exception {
CosmosAsyncContainer feedContainer = createFeedCollection(FEED_COLLECTION_THROUGHPUT);
CosmosAsyncContainer leaseContainer = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT);

try {
List<InternalObjectNode> createdDocuments = new ArrayList<>();
Map<String, JsonNode> receivedDocuments = new ConcurrentHashMap<>();

// Create documents
setupReadFeedDocuments(createdDocuments, feedContainer, FEED_COUNT);

// Set up GLOBAL interceptor to throw StreamConstraintsException once
// Works across all threads (main thread + CFP worker threads from thread pools)
// NOTE: Test marked as singleThreaded to prevent interference with parallel tests
try (AutoCloseable interceptor = JsonParseInterceptorHelper.injectStreamConstraintsExceptionOnce(OperationType.ReadFeed, ResourceType.Document)) {

ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions();
options.setMaxItemCount(100);
options.setStartFromBeginning(true);

ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.options(options)
.handleLatestVersionChanges(changeFeedProcessorHandler(receivedDocuments))
.buildChangeFeedProcessor();

try {
startChangeFeedProcessor(changeFeedProcessor);

// Wait for documents to be processed
Thread.sleep(10000);

safeStopChangeFeedProcessor(changeFeedProcessor);

// Verify all documents were processed despite the StreamConstraintsException
assertThat(receivedDocuments.size()).isEqualTo(FEED_COUNT);

logger.info("Successfully processed {} documents after handling StreamConstraintsException",
receivedDocuments.size());

} finally {
Thread.sleep(2000);
}
}

} finally {
safeDeleteCollection(feedContainer);
safeDeleteCollection(leaseContainer);
}
}

private void startChangeFeedProcessor(ChangeFeedProcessor changeFeedProcessor) {
changeFeedProcessor
.start()
Expand Down Expand Up @@ -2321,21 +2390,21 @@ private Consumer<List<ChangeFeedProcessorItem>> leasesChangeFeedProcessorHandler
};
}

@BeforeMethod(groups = { "query", "cfp-split" }, timeOut = 2 * SETUP_TIMEOUT, alwaysRun = true)
@BeforeMethod(groups = { "query", "cfp-split", "long-emulator" }, timeOut = 2 * SETUP_TIMEOUT, alwaysRun = true)
public void beforeMethod() {
}

@BeforeClass(groups = { "query", "cfp-split" }, timeOut = SETUP_TIMEOUT, alwaysRun = true)
@BeforeClass(groups = { "query", "cfp-split", "long-emulator" }, timeOut = SETUP_TIMEOUT, alwaysRun = true)
public void before_ChangeFeedProcessorTest() {
client = getClientBuilder().buildAsyncClient();
createdDatabase = getSharedCosmosDatabase(client);
}

@AfterMethod(groups = { "query", "cfp-split" }, timeOut = 3 * SHUTDOWN_TIMEOUT, alwaysRun = true)
@AfterMethod(groups = { "query", "cfp-split", "long-emulator" }, timeOut = 3 * SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterMethod() {
}

@AfterClass(groups = { "query", "cfp-split" }, timeOut = 2 * SHUTDOWN_TIMEOUT, alwaysRun = true)
@AfterClass(groups = { "query", "cfp-split", "long-emulator" }, timeOut = 2 * SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
safeClose(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.azure.cosmos.implementation.DatabaseAccountLocation;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.changefeed.pkversion.ServiceItemLease;
Expand All @@ -39,6 +41,7 @@
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.models.ThroughputResponse;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.test.faultinjection.JsonParseInterceptorHelper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -86,8 +89,7 @@ public class IncrementalChangeFeedProcessorTest extends TestSuiteBase {
private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper();

private CosmosAsyncDatabase createdDatabase;
// private final String databaseId = "testdb1";
// private final String hostName = "TestHost1";

private final String hostName = RandomStringUtils.randomAlphabetic(6);
private final int FEED_COUNT = 10;
private final int CHANGE_FEED_PROCESSOR_TIMEOUT = 5000;
Expand Down Expand Up @@ -1762,6 +1764,77 @@ public void readFeedDocumentsWithThroughputControl() throws InterruptedException
}
}

/**
* Tests that ChangeFeedProcessor gracefully handles StreamConstraintsException during JSON parsing.
*
* <p>This test uses a GLOBAL interceptor to inject a StreamConstraintsException on the first
* JSON parse operation across all threads (including CFP worker threads from thread pools).</p>
*
* <p><strong>IMPORTANT:</strong> This test should NOT run in parallel with other tests that use
* the JSON parse interceptor, as the interceptor is global and would cause interference.</p>
*
* <p>Expected behavior: The ChangeFeedProcessor should retry with reduced maxItemCount and
* eventually process all documents successfully.</p>
*/
@Test(groups = { "long-emulator" }, timeOut = 50000, singleThreaded = true)
public void changeFeedProcessorHandlesStreamConstraintsException() throws Exception {
CosmosAsyncContainer feedContainer = createFeedCollection(FEED_COLLECTION_THROUGHPUT);
CosmosAsyncContainer leaseContainer = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT);

try {
List<InternalObjectNode> createdDocuments = new ArrayList<>();
Map<String, JsonNode> receivedDocuments = new ConcurrentHashMap<>();

// Create documents
setupReadFeedDocuments(createdDocuments, feedContainer, FEED_COUNT);

// Set up GLOBAL interceptor to throw StreamConstraintsException once
// Works across all threads (main thread + CFP worker threads from thread pools)
// NOTE: Test marked as singleThreaded to prevent interference with parallel tests
try (AutoCloseable interceptor = JsonParseInterceptorHelper.injectStreamConstraintsExceptionOnce(OperationType.ReadFeed, ResourceType.Document)) {

ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions();
options.setMaxItemCount(100);
options.setStartFromBeginning(true);

ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.options(options)
.handleChanges(docs -> {
logger.info("Received {} documents", docs.size());
for (JsonNode doc : docs) {
receivedDocuments.put(doc.get("id").asText(), doc);
}
})
.buildChangeFeedProcessor();

try {
startChangeFeedProcessor(changeFeedProcessor);

// Wait for documents to be processed
Thread.sleep(10000);

safeStopChangeFeedProcessor(changeFeedProcessor);

// Verify all documents were processed despite the StreamConstraintsException
assertThat(receivedDocuments.size()).isEqualTo(FEED_COUNT);

logger.info("Successfully processed {} documents after handling StreamConstraintsException",
receivedDocuments.size());

} finally {
Thread.sleep(2000);
}
}

} finally {
safeDeleteCollection(feedContainer);
safeDeleteCollection(leaseContainer);
}
}

// Steps followed in this test
// 1. Ingest 10 documents into the feed container.
// 2. Start the LatestVersion / INCREMENTAL ChangeFeedProcessor with either startFromBeginning set to 'true' or 'false'.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.test.faultinjection;

import com.fasterxml.jackson.databind.JsonNode;

import java.io.IOException;
import java.util.Map;

/**
* Functional interface for intercepting JSON parsing in tests.
* This allows injecting faults during JSON deserialization for testing purposes.
*/
@FunctionalInterface
public interface JsonParseInterceptor {
/**
* Intercepts JSON parsing to allow fault injection.
*
* @param bytes the byte array containing JSON
* @param responseHeaders the response headers
* @param defaultParser the default parsing logic to delegate to
* @return the parsed JsonNode
* @throws IOException if parsing fails or fault is injected
*/
JsonNode intercept(
byte[] bytes,
Map<String, String> responseHeaders,
DefaultJsonParser defaultParser
) throws IOException;

/**
* Functional interface for the default JSON parsing logic.
*/
@FunctionalInterface
interface DefaultJsonParser {
JsonNode parse(byte[] bytes, Map<String, String> responseHeaders) throws IOException;
}
}
Loading