Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multipart Upload with GCS .NET SDK #13021

Open
joaopaulopmedeiros opened this issue May 27, 2024 · 6 comments
Open

Multipart Upload with GCS .NET SDK #13021

joaopaulopmedeiros opened this issue May 27, 2024 · 6 comments
Assignees
Labels
api: storage Issues related to the Cloud Storage API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@joaopaulopmedeiros
Copy link

joaopaulopmedeiros commented May 27, 2024

Hey guys, I recently developed a proof of concept about on-demand csv files generation.
The goal is to retrieve data from a relational database, map to csv and then upload it to bucket on cloud given a chunksize (eg 5MB).
I've read the docs and tried to use Resumable Upload feature, but my file gets overwritten.

A "complete" method would be very useful...but i didn't find anything about it. Could you help out here?

Successful sample code with AWS provider:

using System.Buffers;
using System.Globalization;
using System.IO;

using Amazon.Runtime;
using Amazon.S3;
using Amazon.S3.Model;

using CsvHelper;

using Report.Generator.Infra.Repositories;

namespace Report.Generator.Infra.Generators
{
    public class CsvAWSReportGenerator
    {
        private readonly IAmazonS3 _s3Client;

        public CsvAWSReportGenerator(IAmazonS3 s3Client)
        {
            _s3Client = s3Client;
        }

        public async Task GenerateAsync(string bucketName, string keyName, int blockSize)
        {
            byte[] buffer = ArrayPool<byte>.Shared.Rent(blockSize);
            int bufferPosition = 0;

            List<UploadPartResponse> uploadResponses = new();
            InitiateMultipartUploadRequest initiateRequest = new()
            {
                BucketName = bucketName,
                Key = keyName
            };

            InitiateMultipartUploadResponse initResponse =
                await _s3Client.InitiateMultipartUploadAsync(initiateRequest);

            try
            {
                using var memoryStream = new MemoryStream();
                using var writer = new StreamWriter(memoryStream);
                using var csvWriter = new CsvWriter(writer, CultureInfo.InvariantCulture);

                int partNumber = 1;

                await foreach (var product in ProductRepository.FetchProductsAsync())
                {
                    memoryStream.SetLength(0);
                    csvWriter.WriteRecord(product);
                    await csvWriter.NextRecordAsync();
                    await writer.FlushAsync();
                    memoryStream.Position = 0;

                    while (memoryStream.Position < memoryStream.Length)
                    {
                        int bytesToRead = Math.Min(blockSize - bufferPosition, (int)(memoryStream.Length - memoryStream.Position));
                        int bytesRead = await memoryStream.ReadAsync(buffer, bufferPosition, bytesToRead);
                        bufferPosition += bytesRead;

                        if (bufferPosition == blockSize)
                        {
                            await UploadPartAsync(buffer, bufferPosition, bucketName, keyName, initResponse.UploadId, partNumber++, uploadResponses);
                            bufferPosition = 0;
                        }
                    }
                }

                if (bufferPosition > 0)
                {
                    await UploadPartAsync(buffer, bufferPosition, bucketName, keyName, initResponse.UploadId, partNumber, uploadResponses);
                }

                ArrayPool<byte>.Shared.Return(buffer);

                CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest
                {
                    BucketName = bucketName,
                    Key = keyName,
                    UploadId = initResponse.UploadId
                };

                completeRequest.AddPartETags(uploadResponses);

                CompleteMultipartUploadResponse completeUploadResponse =
                    await _s3Client.CompleteMultipartUploadAsync(completeRequest);
            }
            catch (Exception exception)
            {
                Console.WriteLine("An AmazonS3Exception was thrown: {0}", exception.Message);

                AbortMultipartUploadRequest abortMPURequest = new AbortMultipartUploadRequest
                {
                    BucketName = bucketName,
                    Key = keyName,
                    UploadId = initResponse.UploadId
                };
                await _s3Client.AbortMultipartUploadAsync(abortMPURequest);
            }
        }

        private async Task UploadPartAsync(byte[] buffer, int bufferLength, string bucketName, string keyName, string uploadId, int partNumber, List<UploadPartResponse> uploadResponses)
        {
            using var partStream = new MemoryStream(buffer, 0, bufferLength);
            UploadPartRequest uploadRequest = new UploadPartRequest
            {
                BucketName = bucketName,
                Key = keyName,
                UploadId = uploadId,
                PartNumber = partNumber,
                PartSize = bufferLength,
                InputStream = partStream
            };

            uploadRequest.StreamTransferProgress += new EventHandler<StreamTransferProgressArgs>(UploadPartProgressEventCallback);
            uploadResponses.Add(await _s3Client.UploadPartAsync(uploadRequest));
        }

        public void UploadPartProgressEventCallback(object sender, StreamTransferProgressArgs e)
        {
            Console.WriteLine("{0}/{1}", e.TransferredBytes, e.TotalBytes);
        }
    }
}

PS: I've seen GCS does have a XML multipart upload API.

@jskeet
Copy link
Collaborator

jskeet commented May 27, 2024

Showing the S3 code doesn't really help us know what your GCS code looks like. Please provide a minimal example using Google.Cloud.Storage.V1.

@joaopaulopmedeiros
Copy link
Author

joaopaulopmedeiros commented May 27, 2024

@jskeet
Example with UploadObjectAsync method. I suppose my problem is that I can't specify my part number.

using System.Buffers;

using System.Globalization;

using CsvHelper;

using Google.Cloud.Storage.V1;

using Report.Generator.Infra.Repositories;

namespace Report.Generator.Infra.Generators
{
    public class CsvGCSReportGenerator
    {
        private readonly StorageClient _storageClient;

        public CsvGCSReportGenerator(StorageClient storageClient)
        {
            _storageClient = storageClient;
        }

        public async Task GenerateAsync(string bucketName, string fileName, int blockSize)
        {
            byte[] buffer = ArrayPool<byte>.Shared.Rent(blockSize);
            int bufferPosition = 0;

            using var memoryStream = new MemoryStream();
            using var writer = new StreamWriter(memoryStream);
            using var csvWriter = new CsvWriter(writer, CultureInfo.InvariantCulture);

            int partNumber = 1;

            await foreach (var product in ProductRepository.FetchProductsAsync())
            {
                memoryStream.SetLength(0);
                csvWriter.WriteRecord(product);
                await csvWriter.NextRecordAsync();
                await writer.FlushAsync();
                memoryStream.Position = 0;

                while (memoryStream.Position < memoryStream.Length)
                {
                    int bytesToRead = Math.Min(blockSize - bufferPosition, (int)(memoryStream.Length - memoryStream.Position));
                    int bytesRead = await memoryStream.ReadAsync(buffer, bufferPosition, bytesToRead);
                    bufferPosition += bytesRead;

                    if (bufferPosition == blockSize)
                    {
                        await UploadPartAsync(buffer, bufferPosition, bucketName, fileName, partNumber++);
                        bufferPosition = 0;
                    }
                }
            }

            if (bufferPosition > 0)
            {
                await UploadPartAsync(buffer, bufferPosition, bucketName, fileName, partNumber);
            }

            ArrayPool<byte>.Shared.Return(buffer);
        }

        private async Task UploadPartAsync(byte[] buffer, int bufferLength, string bucketName, string fileName, int partNumber)
        {
            using var partStream = new MemoryStream(buffer, 0, bufferLength);
            await _storageClient.UploadObjectAsync(bucketName, $"{fileName}.csv", "text/csv", partStream);
            Console.WriteLine($"Uploaded part {partNumber}");
        }
    }
}

Previous attempt with Resumable Upload:
PS: Only worked puting all items into memory which I'm not supposed to do.

using System.Globalization;
using System.IO;
using System.Net.Mime;

using CsvHelper;

using Google.Apis.Upload;
using Google.Cloud.Storage.V1;

using Report.Generator.Domain.Entities;
using Report.Generator.Infra.Repositories;

namespace Report.Generator;

public class Program
{
    public async static Task Main()
    {
        Console.WriteLine($"Started at {DateTime.Now}");

        using var memoryStream = new MemoryStream();
        using var writer = new StreamWriter(memoryStream);
        using var csvWriter = new CsvWriter(writer, CultureInfo.InvariantCulture);

        csvWriter.WriteHeader<Product>();
        await csvWriter.NextRecordAsync();

        var client = await StorageClient.CreateAsync();

        var options = new UploadObjectOptions
        {
            ChunkSize = UploadObjectOptions.MinimumChunkSize
        };

        var uploadUri = await client.InitiateUploadSessionAsync(Environment.GetEnvironmentVariable("BUCKET_NAME"), "report.csv", "text/csv", contentLength: null, options);

        int batchSize = 100_000;

        await foreach (var product in ProductRepository.FetchUnbufferedProductsAsync(batchSize))
        {
            csvWriter.WriteRecord(product);
            csvWriter.NextRecord();
            Console.WriteLine(product.Title);
        }

        await writer.FlushAsync();
        memoryStream.Position = 0;

        IProgress<IUploadProgress> progress = new Progress<IUploadProgress>(
          p => Console.WriteLine($"bytes: {p.BytesSent}, status: {p.Status}")
        );

        var actualUploader = ResumableUpload.CreateFromUploadUri(uploadUri, memoryStream);

        actualUploader.ChunkSize = UploadObjectOptions.MinimumChunkSize * 2;

        actualUploader.ProgressChanged += progress.Report;

        await actualUploader.UploadAsync();

        Console.WriteLine($"Finished at {DateTime.Now}");
    }
}

@jskeet
Copy link
Collaborator

jskeet commented May 27, 2024

No, you can't - at least not with our libraries. I'm afraid it's a use-case we just don't support at the moment. Assuming I've understood you correctly, this is basically equivalent to this issue.

I think it's unlikely that we'll support this any time soon. What you could do is upload each part to a separate object, and then use the Compose operation (from StorageService; it's not exposed directly in StorageClient). to create a single object after the fact.

I'll reassign this to a member of the Storage team in case you have any further questions.

@jskeet jskeet assigned JesseLovelace and unassigned jskeet May 27, 2024
@joaopaulopmedeiros
Copy link
Author

Alright. Thanks a lot for feedback

@joaopaulopmedeiros
Copy link
Author

joaopaulopmedeiros commented May 27, 2024

@JesseLovelace, could you help with this feature? Maybe work together to address a solution.

@amanda-tarafa amanda-tarafa added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. api: storage Issues related to the Cloud Storage API. labels Jun 7, 2024
@JesseLovelace
Copy link
Contributor

My team will take a look and evaluate when we can get this done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the Cloud Storage API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

4 participants