Skip to content

Commit 38b5650

Browse files
authored
Fix ByteBuf Memory Leak in PutOperation.encryptChunk() (#3169)
Fixes ByteBuf memory leak in PutOperation.encryptChunk() when KMS throws exception after retainedDuplicate() is evaluated. Root Cause: Java evaluates constructor arguments left-to-right. In the original code: If kms.getRandomKey() throws GeneralSecurityException after buf.retainedDuplicate() has alreadybeen evaluated and incremented the refCount, the retained duplicate is orphaned and leaks because: 1. The constructor never completes, so ownership never transfers to EncryptJob 2. The exception handler has no reference to clean up the orphaned ByteBuf The ix temp holds onto reference for proper handling on exception. Added testMinimal_RetainedDuplicateArgumentEvaluationLeak() which: - Creates a ByteBuf with refCount=1 - Simulates the argument evaluation pattern where retainedDuplicate() executes before exception - Verifies no leak occurs with proper cleanup
1 parent aa1d91b commit 38b5650

File tree

2 files changed

+102
-2
lines changed

2 files changed

+102
-2
lines changed

ambry-router/src/main/java/com/github/ambry/router/PutOperation.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1580,23 +1580,31 @@ private void compressChunk(boolean outputDirectMemory) {
15801580
* Submits encrypt job for the given {@link PutChunk} and processes the callback for the same
15811581
*/
15821582
private void encryptChunk() {
1583+
ByteBuf retainedCopy = null;
15831584
try {
15841585
logger.trace("{}: Chunk at index {} moves to {} state", loggingContext, chunkIndex, ChunkState.Encrypting);
15851586
state = ChunkState.Encrypting;
15861587
chunkEncryptReadyAtMs = time.milliseconds();
15871588
encryptJobMetricsTracker.onJobSubmission();
15881589
logger.trace("{}: Submitting encrypt job for chunk at index {}", loggingContext, chunkIndex);
1590+
retainedCopy = isMetadataChunk() ? null : buf.retainedDuplicate();
15891591
cryptoJobHandler.submitJob(
15901592
new EncryptJob(passedInBlobProperties.getAccountId(), passedInBlobProperties.getContainerId(),
1591-
isMetadataChunk() ? null : buf.retainedDuplicate(), ByteBuffer.wrap(chunkUserMetadata),
1592-
kms.getRandomKey(), cryptoService, kms, options, encryptJobMetricsTracker, this::encryptionCallback));
1593+
retainedCopy, ByteBuffer.wrap(chunkUserMetadata), kms.getRandomKey(),
1594+
cryptoService, kms, options, encryptJobMetricsTracker, this::encryptionCallback));
15931595
} catch (GeneralSecurityException e) {
1596+
if (retainedCopy != null) {
1597+
retainedCopy.release();
1598+
}
15941599
encryptJobMetricsTracker.incrementOperationError();
15951600
logger.trace("{}: Exception thrown while generating random key for chunk at index {}", loggingContext,
15961601
chunkIndex, e);
15971602
setOperationExceptionAndComplete(new RouterException(
15981603
"GeneralSecurityException thrown while generating random key for chunk at index " + chunkIndex, e,
15991604
RouterErrorCode.UnexpectedInternalError));
1605+
} finally {
1606+
// ownership transferred to EncryptJob or cleaned up on exception
1607+
retainedCopy = null;
16001608
}
16011609
}
16021610

ambry-router/src/test/java/com/github/ambry/router/PutOperationTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1339,4 +1339,96 @@ private ResponseInfo getResponseInfo(RequestInfo requestInfo) throws IOException
13391339
NetworkReceive networkReceive = new NetworkReceive(null, mockServer.send(requestInfo.getRequest()), time);
13401340
return new ResponseInfo(requestInfo, null, networkReceive.getReceivedBytes().content());
13411341
}
1342+
1343+
/**
1344+
* KMS exception during EncryptJob constructor in PutOperation does not leak
1345+
*/
1346+
@Test
1347+
public void testProductionBug_KmsExceptionAfterRetainedDuplicateLeaksBuffer() throws Exception {
1348+
// Set up crypto infrastructure
1349+
String defaultKey = TestUtils.getRandomKey(64);
1350+
Properties cryptoProps = new Properties();
1351+
cryptoProps.setProperty("kms.default.container.key", defaultKey);
1352+
cryptoProps.setProperty("kms.random.key.size.in.bits", "256");
1353+
VerifiableProperties cryptoVerifiableProps = new VerifiableProperties(cryptoProps);
1354+
1355+
// Create a working KMS for reference
1356+
KeyManagementService<javax.crypto.spec.SecretKeySpec> workingKms =
1357+
new SingleKeyManagementServiceFactory(cryptoVerifiableProps, "test-cluster",
1358+
new com.codahale.metrics.MetricRegistry()).getKeyManagementService();
1359+
1360+
// Create a KMS that throws during getRandomKey()
1361+
KeyManagementService<javax.crypto.spec.SecretKeySpec> faultyKms =
1362+
new KeyManagementService<javax.crypto.spec.SecretKeySpec>() {
1363+
@Override
1364+
public void register(short accountId, short containerId) throws java.security.GeneralSecurityException {
1365+
}
1366+
1367+
@Override
1368+
public void register(String context) throws java.security.GeneralSecurityException {
1369+
}
1370+
1371+
@Override
1372+
public javax.crypto.spec.SecretKeySpec getKey(com.github.ambry.rest.RestRequest restRequest,
1373+
short accountId, short containerId) throws java.security.GeneralSecurityException {
1374+
return workingKms.getKey(restRequest, accountId, containerId);
1375+
}
1376+
1377+
@Override
1378+
public javax.crypto.spec.SecretKeySpec getKey(com.github.ambry.rest.RestRequest restRequest,
1379+
String context) throws java.security.GeneralSecurityException {
1380+
return workingKms.getKey(restRequest, context);
1381+
}
1382+
1383+
@Override
1384+
public javax.crypto.spec.SecretKeySpec getRandomKey() throws java.security.GeneralSecurityException {
1385+
// This will be called during EncryptJob constructor argument evaluation
1386+
throw new java.security.GeneralSecurityException("Simulated KMS failure during getRandomKey");
1387+
}
1388+
1389+
@Override
1390+
public void close() {
1391+
}
1392+
};
1393+
1394+
CryptoService<javax.crypto.spec.SecretKeySpec> cryptoService =
1395+
new GCMCryptoServiceFactory(cryptoVerifiableProps,
1396+
new com.codahale.metrics.MetricRegistry()).getCryptoService();
1397+
1398+
CryptoJobHandler cryptoJobHandler = new CryptoJobHandler(2);
1399+
1400+
// Create router config with encryption enabled
1401+
Properties routerProps = createBasicRouterProperties();
1402+
VerifiableProperties vProps = new VerifiableProperties(routerProps);
1403+
RouterConfig testRouterConfig = new RouterConfig(vProps);
1404+
1405+
// Create blob properties and content
1406+
BlobProperties blobProperties =
1407+
new BlobProperties(chunkSize, "serviceId", "memberId", "contentType", false, Utils.Infinite_Time,
1408+
Utils.getRandomShort(TestUtils.RANDOM), Utils.getRandomShort(TestUtils.RANDOM), true, null, null, null);
1409+
byte[] userMetadata = new byte[10];
1410+
byte[] content = new byte[chunkSize];
1411+
random.nextBytes(content);
1412+
ReadableStreamChannel channel = new ByteBufferReadableStreamChannel(ByteBuffer.wrap(content));
1413+
1414+
MockNetworkClient mockNetworkClient = new MockNetworkClient();
1415+
FutureResult<String> future = new FutureResult<>();
1416+
1417+
// Create PutOperation with faulty KMS
1418+
PutOperation op =
1419+
PutOperation.forUpload(testRouterConfig, routerMetrics, mockClusterMap, new LoggingNotificationSystem(),
1420+
new InMemAccountService(true, false), userMetadata, channel, PutBlobOptions.DEFAULT, future, null,
1421+
new RouterCallback(mockNetworkClient, new ArrayList<>()), null, faultyKms, cryptoService,
1422+
cryptoJobHandler, time, blobProperties, MockClusterMap.DEFAULT_PARTITION_CLASS, quotaChargeCallback,
1423+
compressionService);
1424+
1425+
op.startOperation();
1426+
1427+
// Fill chunks - this would have trigger the memory leak
1428+
op.fillChunks();
1429+
1430+
cryptoJobHandler.close();
1431+
1432+
// Let afterTest() check for leaks (it will pass or fail based on actual memory state)
1433+
}
13421434
}

0 commit comments

Comments
 (0)