Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@

import static java.lang.Thread.*;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.powermock.api.mockito.PowerMockito.*;


/**
Expand Down Expand Up @@ -281,9 +283,9 @@ record = new NamedBlobRecord(account.getName(), container.getName(), blobName, b
namedBlobDb.get(account.getName(), container.getName(), blobName).get());
}

// fails
@Test
public void testDeleteWithMultipleVersions() throws Exception {
setupMockForMinStaleCount();
Account account = accountService.getAllAccounts().iterator().next();
Container container = account.getAllContainers().iterator().next();
String blobName = "testDeleteWithMultipleVersions-" + TestUtils.getRandomKey(10);
Expand Down Expand Up @@ -368,6 +370,7 @@ public void testDeleteWithMultipleVersions() throws Exception {
*/
@Test
public void testCleanupBlobsPipeline() throws Exception {
setupMockForMinStaleCount();
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
calendar.add(Calendar.DATE, -config.staleDataRetentionDays);
long staleCutoffTime = calendar.getTimeInMillis();
Expand Down Expand Up @@ -449,7 +452,7 @@ public void testCleanupBlobsPipeline() throws Exception {
*/
@Test
public void testCleanupBlobStaleCase1() throws Exception {

setupMockForMinStaleCount();
Account account = accountService.getAllAccounts().iterator().next();
Container container = account.getAllContainers().iterator().next();
String blobId = getBlobId(account, container);
Expand All @@ -474,6 +477,7 @@ public void testCleanupBlobStaleCase1() throws Exception {
*/
@Test
public void testCleanupBlobStaleCase2() throws Exception {
setupMockForMinStaleCount();
Account account = accountService.getAllAccounts().iterator().next();
Container container = account.getAllContainers().iterator().next();
String blobId = getBlobId(account, container);
Expand Down Expand Up @@ -504,6 +508,7 @@ public void testCleanupBlobStaleCase2() throws Exception {
*/
@Test
public void testCleanupBlobGoodCase1() throws Exception {
setupMockForMinStaleCount();
Account account = accountService.getAllAccounts().iterator().next();
Container container = account.getAllContainers().iterator().next();
String blobId = getBlobId(account, container);
Expand All @@ -522,6 +527,7 @@ public void testCleanupBlobGoodCase1() throws Exception {
*/
@Test
public void testCleanupBlobGoodCase2() throws Exception {
setupMockForMinStaleCount();
Account account = accountService.getAllAccounts().iterator().next();
Container container = account.getAllContainers().iterator().next();
String blobId = getBlobId(account, container);
Expand All @@ -546,7 +552,7 @@ public void testCleanupBlobGoodCase2() throws Exception {
*/
@Test
public void testCleanupBlobGoodCase3() throws Exception {

setupMockForMinStaleCount();
Account account = accountService.getAllAccounts().iterator().next();
Container container = account.getAllContainers().iterator().next();
String blobId = getBlobId(account, container);
Expand All @@ -567,6 +573,7 @@ public void testCleanupBlobGoodCase3() throws Exception {
*/
@Test
public void testCleanupBlobGoodCase4() throws Exception {
setupMockForMinStaleCount();
Account account = accountService.getAllAccounts().iterator().next();
Container container = account.getAllContainers().iterator().next();
String blobId = getBlobId(account, container);
Expand Down Expand Up @@ -594,6 +601,7 @@ public void testCleanupBlobGoodCase4() throws Exception {
*/
@Test
public void testCleanupBlobGoodCase5() throws Exception {
setupMockForMinStaleCount();
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
calendar.add(Calendar.DATE, -config.staleDataRetentionDays);
long staleCutoffTime = calendar.getTimeInMillis();
Expand Down Expand Up @@ -624,6 +632,7 @@ public void testCleanupBlobGoodCase5() throws Exception {
*/
@Test
public void testCleanupBlobGoodCase6() throws Exception {
setupMockForMinStaleCount();
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
calendar.add(Calendar.DATE, -config.staleDataRetentionDays);
long staleCutoffTime = calendar.getTimeInMillis();
Expand Down Expand Up @@ -670,6 +679,7 @@ public void testCleanupBlobGoodCase6() throws Exception {

@Test
public void testRemovesOneOlderStaleInProgressBlob() throws Exception {
setupMockForMinStaleCount();
// Arrange: create an IN_PROGRESS blob and mark it stale by updating modified_ts
NamedBlobRecord record1 = createAndPutNamedBlob(getBlobIdFromService(), NamedBlobState.IN_PROGRESS, "new_cleaner");
updateModifiedTimestampByBlobName("new_cleaner", 20);
Expand All @@ -686,6 +696,7 @@ public void testRemovesOneOlderStaleInProgressBlob() throws Exception {

@Test
public void testRemovesTwoStaleInProgressBlobs() throws Exception {
setupMockForMinStaleCount();
// Arrange: create two IN_PROGRESS blob records with different blob IDs
NamedBlobRecord record1 = createAndPutNamedBlob("blob-id1", NamedBlobState.IN_PROGRESS, "new_cleaner");
time.sleep(5);
Expand All @@ -702,6 +713,7 @@ public void testRemovesTwoStaleInProgressBlobs() throws Exception {

@Test
public void testRemovesStaleInProgressBlobsAddedBeforeAndAfter() throws Exception {
setupMockForMinStaleCount();
// Arrange: put one IN_PROGRESS record, then update its timestamp to stale (20 days ago)
NamedBlobRecord record1 = createAndPutNamedBlob("blob-id1", NamedBlobState.IN_PROGRESS, "new_cleaner");
updateModifiedTimestampByBlobName("new_cleaner", 20);
Expand All @@ -720,6 +732,7 @@ public void testRemovesStaleInProgressBlobsAddedBeforeAndAfter() throws Exceptio

@Test
public void testIgnoresReadyBlobsWhenRemovingStale() throws Exception {
setupMockForMinStaleCount();
NamedBlobRecord record1 = createAndPutNamedBlob("blob-id1", NamedBlobState.IN_PROGRESS, "new_cleaner");
updateModifiedTimestampByBlobName("new_cleaner", 20);
time.sleep(5);
Expand All @@ -737,6 +750,7 @@ public void testIgnoresReadyBlobsWhenRemovingStale() throws Exception {

@Test
public void testRemovesOneStaleBlobWithSameBlobIdDifferentStates() throws Exception {
setupMockForMinStaleCount();
String blobId = getBlobIdFromService();
NamedBlobRecord record1 = createAndPutNamedBlob(blobId, NamedBlobState.IN_PROGRESS, "new_cleaner");
time.sleep(5);
Expand All @@ -752,6 +766,7 @@ public void testRemovesOneStaleBlobWithSameBlobIdDifferentStates() throws Except

@Test
public void testRemovesReadyAndInProgressStaleBlobs() throws Exception {
setupMockForMinStaleCount();
// Arrange: three blobs with mixed states (READY and IN_PROGRESS)
NamedBlobRecord record1 = createAndPutNamedBlob("blob-id1", NamedBlobState.READY, "new_cleaner");
time.sleep(5);
Expand All @@ -771,6 +786,7 @@ public void testRemovesReadyAndInProgressStaleBlobs() throws Exception {
*/
@Test
public void testMultiBlobNamesCase1NoPagination() throws Exception {
setupMockForMinStaleCount();
String[] trackedBlobIds = new String[3];
for (int i = 0; i < 6; i++) {
String blobId = "blob-id" + (i + 1);
Expand Down Expand Up @@ -798,6 +814,7 @@ public void testMultiBlobNamesCase1NoPagination() throws Exception {
*/
@Test
public void testMultiBlobNamesCase1PaginationEven() throws Exception {
setupMockForMinStaleCount();
for (int i = 0; i < 2486; i++) {
String blobId = "blob-id" + (i + 1);
NamedBlobState state = (i % 2 == 0) ? NamedBlobState.IN_PROGRESS : NamedBlobState.READY;
Expand All @@ -817,6 +834,7 @@ public void testMultiBlobNamesCase1PaginationEven() throws Exception {
*/
@Test
public void testMultiBlobNamesCase1PaginationOdd() throws Exception {
setupMockForMinStaleCount();
for (int i = 0; i < 2487; i++) {
String blobId = "blob-id" + (i + 1);
NamedBlobState state = (i % 2 == 0) ? NamedBlobState.IN_PROGRESS : NamedBlobState.READY;
Expand All @@ -836,6 +854,7 @@ public void testMultiBlobNamesCase1PaginationOdd() throws Exception {
*/
@Test
public void testMultiBlobNamesCase2NoPagination() throws Exception {
setupMockForMinStaleCount();
for (int i = 0; i < 6; i++) {
String blobId = "blob-id" + (i + 1);
String cleaner = "new_cleaner" + i;
Expand All @@ -854,6 +873,7 @@ public void testMultiBlobNamesCase2NoPagination() throws Exception {
*/
@Test
public void testMultiBlobNamesCase2Pagination() throws Exception {
setupMockForMinStaleCount();
for (int i = 0; i < 2344; i++) {
String blobId = "blob-id" + (i + 1);
String cleaner = "new_cleaner" + i;
Expand All @@ -873,6 +893,7 @@ public void testMultiBlobNamesCase2Pagination() throws Exception {
*/
@Test
public void testMultiBlobNamesCase3() throws Exception {
setupMockForMinStaleCount();
for (int i = 0; i < 1004; i++) {
String blobId = "blob-id" + (i + 1);
String cleaner = "new_cleaner" + i;
Expand Down Expand Up @@ -999,6 +1020,7 @@ public static String base64BlobIdToHex(String base64BlobId) {
* Helper method to run the for loop across containers to retrieve stale blobs
*/
public List<StaleNamedBlob> getStaleBlobList() throws Exception {
setupMockForMinStaleCount();
List<StaleNamedBlob> staleNamedBlobsList = new ArrayList<>();

Set<Container> containers = accountService.getContainersByStatus(Container.ContainerStatus.ACTIVE);
Expand All @@ -1019,4 +1041,10 @@ public List<StaleNamedBlob> getStaleBlobList() throws Exception {

return staleNamedBlobsList;
}

private void setupMockForMinStaleCount() throws SQLException {
MySqlNamedBlobDb mockDb = mock(MySqlNamedBlobDb.class);
when(mockDb.checkIfValidContainerForCleaning(any(Connection.class), any(Container.class)))
.thenReturn(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class MySqlNamedBlobDb implements NamedBlobDb {
private static final Logger logger = LoggerFactory.getLogger(MySqlNamedBlobDb.class);
private static final int MAX_NUMBER_OF_VERSIONS_IN_DELETE = 1000;
private static final int VERSION_BASE = 100000;
private static final int MIN_STALE_BLOBS_TO_ACTIVATE_CLEANER = 10000;

private final Time time;
private static final String MULTI_VERSION_PLACE_HOLDER = "MULTI_VERSION_PLACE_HOLDER";
Expand Down Expand Up @@ -178,6 +179,20 @@ public class MySqlNamedBlobDb implements NamedBlobDb {
+ "ORDER BY %s ASC, %s DESC " + "LIMIT ?", ACCOUNT_ID, CONTAINER_ID, BLOB_NAME, BLOB_ID, VERSION, BLOB_STATE,
MODIFIED_TS, DELETED_TS, NAMED_BLOBS_V2, BLOB_NAME, VERSION);

private static final String GET_MINIMUM_STALE_BLOB_COUNT = String.format(
"SELECT COUNT(*) AS single_occurrence_ready_count " +
"FROM ( " +
" SELECT %s " +
" FROM %s " +
" WHERE container_id = ? " +
" AND account_id = ? " +
" AND blob_state = ? " +
" GROUP BY %s, %s, %s " +
" HAVING COUNT(*) = 1 " +
") AS sub",
BLOB_NAME, NAMED_BLOBS_V2, BLOB_NAME, CONTAINER_ID, ACCOUNT_ID
);

private final AccountService accountService;
private final String localDatacenter;
private final List<String> remoteDatacenters;
Expand Down Expand Up @@ -384,6 +399,10 @@ public CompletableFuture<StaleBlobsWithLatestBlobName> pullStaleBlobs(Container
return executeGenericTransactionAsync(true, (connection) -> {
long startTime = this.time.milliseconds();
StaleBlobsWithLatestBlobName staleBlobsWithLatestBlobName = null;
Boolean res = checkIfValidContainerForCleaning(connection, container);
if (!(res)) {
return new StaleBlobsWithLatestBlobName(new ArrayList<>(), null);
}
List<StaleNamedBlob> potentialStaleNamedBlobResults = getAllBlobsForContainer(connection, container, blobName);
int resultSize = potentialStaleNamedBlobResults.size();
if (resultSize == 0) {
Expand Down Expand Up @@ -957,6 +976,28 @@ private List<StaleNamedBlob> getAllBlobsForContainer(Connection connection, Cont
return resultList;
}

public boolean checkIfValidContainerForCleaning(Connection connection, Container container) throws SQLException {
int minStaleCount = 0;
try (PreparedStatement statement = connection.prepareStatement(GET_MINIMUM_STALE_BLOB_COUNT)) {
statement.setInt(1, container.getId());
statement.setInt(2, container.getParentAccountId());
statement.setInt(3, NamedBlobState.READY.ordinal());

logger.info("Determining the minimum number of stale blobs: Query {}", statement.toString());

try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
minStaleCount = resultSet.getInt(1); // directly read the number returned by the query
}
}
} catch (SQLException e) {
logger.error("Error executing query: {}", e.getMessage());
throw e;
}

return minStaleCount >= MIN_STALE_BLOBS_TO_ACTIVATE_CLEANER;
}

/**
* Performs a batch soft delete on the provided list of stale blobs.
*
Expand Down
Loading