Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Azure Device Behavior for Non-Existent Files #1066

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,29 +440,56 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr
// Lazily cache the blob entry for the segment being read
if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry))
{
var blobClients = pageBlobDirectory.GetPageBlobClient(GetSegmentBlobName(segmentId));
var entry = new BlobEntry(blobClients, blobClients.Default.GetProperties().Value.ETag, this);
blobs.TryAdd(segmentId, entry);
BlobEntry entry = new(this);
if (blobs.TryAdd(segmentId, entry))
{
var pageBlob = pageBlobDirectory.GetPageBlobClient(GetSegmentBlobName(segmentId));

// If segment size is -1 we use a default
var size = segmentSize == -1 ? MAX_PAGEBLOB_SIZE : segmentSize;

// If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement)
// After creation is done, we can call read.
_ = entry.CreateAsync(size, pageBlob);
}
// Otherwise, some other thread beat us to it. Okay to use their blobs.
blobEntry = blobs[segmentId];
}

ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id)
.ContinueWith((Task t) =>
{
if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
{
if (t.IsFaulted)
{
BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)");
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
}
else
{
BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id}");
request.Callback(0, request.NumBytes, request.Context);
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
TryReadAsync(blobEntry, (long)sourceAddress, (long)destinationAddress, readLength, id);
}

void TryReadAsync(BlobEntry blobEntry, long sourceAddress, long destinationAddress, uint readLength, long id)
{
// If pageBlob is null, it means the blob has not been created yet. We should wait for it to be created.
if (blobEntry.PageBlob.Default == null &&
blobEntry.TryQueueAction(() => ReadFromBlobAsync(blobEntry, sourceAddress, destinationAddress, readLength, id)))
{
return;
}
// Otherwise, we can proceed with the read.
ReadFromBlobAsync(blobEntry, sourceAddress, destinationAddress, readLength, id);
}

unsafe void ReadFromBlobAsync(BlobEntry blobEntry, long sourceAddress, long destinationAddress, uint readLength, long id)
{
ReadFromBlobUnsafeAsync(blobEntry, sourceAddress, destinationAddress, readLength, id)
.ContinueWith((Task t) =>
{
if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
{
if (t.IsFaulted)
{
BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)");
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
}
else
{
BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} for blob={blobEntry.ETag}");
request.Callback(0, request.NumBytes, request.Context);
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
}

/// <summary>
Expand Down Expand Up @@ -557,12 +584,12 @@ await BlobManager.PerformWithRetriesAsync(
}
}

unsafe Task ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sourceAddress, long destinationAddress, uint readLength, long id)
unsafe Task ReadFromBlobUnsafeAsync(BlobEntry blob, long sourceAddress, long destinationAddress, uint readLength, long id)
{
return ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength, id);
}

async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlobClients blob, long sourceAddress, uint readLength, long id)
async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobEntry blob, long sourceAddress, uint readLength, long id)
{
using (stream)
{
Expand All @@ -577,7 +604,7 @@ await BlobManager.PerformWithRetriesAsync(
"PageBlobClient.DownloadStreamingAsync",
"ReadFromDevice",
$"id={id} readLength={length} sourceAddress={sourceAddress + offset}",
blob.Default.Name,
blob.PageBlob.Default.Name,
1000 + (int)length / 1000,
true,
async (numAttempts) =>
Expand All @@ -589,7 +616,7 @@ await BlobManager.PerformWithRetriesAsync(

if (length > 0)
{
var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive;
var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.PageBlob.Default : blob.PageBlob.Aggressive;

var response = await client.DownloadStreamingAsync(
range: new Azure.HttpRange(sourceAddress + offset, length),
Expand Down