Skip to content

Commit

Permalink
Queued Ingestion Client now supports retries (#308)
Browse files Browse the repository at this point in the history
Co-authored-by: ohad bitton <[email protected]>
  • Loading branch information
AsafMah and ohadbitt authored Jul 24, 2023
1 parent 04f2945 commit e2b288d
Show file tree
Hide file tree
Showing 20 changed files with 596 additions and 266 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased
### Added
* Automatic retries for queued ingestion
* `executeQuery`, `executeMgmt` to call with a specific type.

## [5.0.0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.nio.file.Files;
import java.util.zip.GZIPOutputStream;

class AzureStorageClient {
public class AzureStorageClient {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int GZIP_BUFFER_SIZE = 16384;
private static final int STREAM_BUFFER_SIZE = 16384;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.microsoft.azure.kusto.ingest;

import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.resources.ContainerWithSas;
import com.microsoft.azure.kusto.ingest.resources.ResourceWithSas;

import java.util.List;

public interface IngestionResourceManager {

/**
* Returns a list of containers with SAS tokens, ranked by their ingestion success rate, and then shuffled.
* You should use this method for each ingestion operation.
* @return List of containers with SAS tokens, ranked by their ingestion success rate, and then shuffled.
*/
List<ContainerWithSas> getShuffledContainers() throws IngestionClientException, IngestionServiceException;

/**
* Report the result of an ingestion operation.
* @param resource The resource that was used to ingest. Can be a container or a queue.
* @param success Whether the ingestion operation was successful.
*/
void reportIngestionResult(ResourceWithSas<?> resource, boolean success);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobClientBuilder;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.microsoft.azure.kusto.data.Ensure;
import com.microsoft.azure.kusto.data.HttpClientProperties;
import com.microsoft.azure.kusto.data.StreamingClient;
Expand All @@ -29,9 +30,7 @@
import java.io.SequenceInputStream;
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import java.sql.Blob;
import java.util.UUID;
import java.util.stream.Stream;

/**
* <p>ManagedStreamingIngestClient</p>
Expand All @@ -43,12 +42,12 @@
* If the size of the stream is bigger than {@value MAX_STREAMING_SIZE_BYTES}, it will fall back to the queued streaming client.
* <p>
*/
public class ManagedStreamingIngestClient extends IngestClientBase implements IngestClient {
public class ManagedStreamingIngestClient extends IngestClientBase implements QueuedIngestClient {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final int ATTEMPT_COUNT = 3;
public static final int MAX_STREAMING_SIZE_BYTES = 4 * 1024 * 1024;
public static final String MANAGED_STREAMING_INGEST_CLIENT = "ManagedStreamingIngestClient";
public static final String CLASS_NAME = ManagedStreamingIngestClient.class.getSimpleName();
final QueuedIngestClientImpl queuedIngestClient;
final StreamingIngestClient streamingIngestClient;
private final ExponentialRetry exponentialRetryTemplate;
Expand Down Expand Up @@ -367,12 +366,22 @@ protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo

@Override
protected String getClientType() {
return MANAGED_STREAMING_INGEST_CLIENT;
return CLASS_NAME;
}

@Override
public void close() throws IOException {
queuedIngestClient.close();
streamingIngestClient.close();
}

@Override
public void setQueueRequestOptions(RequestRetryOptions queueRequestOptions) {
queuedIngestClient.setQueueRequestOptions(queueRequestOptions);
}

@Override
public IngestionResourceManager getResourceManager() {
return queuedIngestClient.getResourceManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public interface QueuedIngestClient extends IngestClient {
* @param queueRequestOptions - Options to use when creating QueueClient
*/
void setQueueRequestOptions(RequestRetryOptions queueRequestOptions);

IngestionResourceManager getResourceManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,16 @@
package com.microsoft.azure.kusto.ingest;

import com.azure.data.tables.models.TableEntity;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.data.tables.models.TableServiceException;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.queue.QueueClient;
import com.azure.data.tables.models.TableServiceException;
import com.azure.storage.queue.models.QueueStorageException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.instrumentation.SupplierOneException;
import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
import com.microsoft.azure.kusto.ingest.result.IngestionStatusInTableDescription;
import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.result.TableReportIngestionResult;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.kusto.ingest.utils.ContainerWithSas;
import com.microsoft.azure.kusto.ingest.result.*;
import com.microsoft.azure.kusto.ingest.source.*;
import com.microsoft.azure.kusto.ingest.utils.IngestionUtils;
import com.microsoft.azure.kusto.ingest.utils.SecurityUtils;
import com.microsoft.azure.kusto.ingest.utils.TableWithSas;
Expand All @@ -46,13 +30,15 @@
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.*;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

public class QueuedIngestClientImpl extends IngestClientBase implements QueuedIngestClient {

public static final String CLASS_NAME = QueuedIngestClientImpl.class.getSimpleName();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int COMPRESSED_FILE_MULTIPLIER = 11;
public static final String QUEUED_INGEST_CLIENT_IMPL = "QueuedIngestClientImpl";
private final ResourceManager resourceManager;
private final AzureStorageClient azureStorageClient;
String connectionDataSource;
Expand Down Expand Up @@ -81,6 +67,11 @@ public void setQueueRequestOptions(RequestRetryOptions queueRequestOptions) {
this.resourceManager.setQueueRequestOptions(queueRequestOptions);
}

@Override
public IngestionResourceManager getResourceManager() {
return resourceManager;
}

@Override
protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
Expand Down Expand Up @@ -138,14 +129,8 @@ protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, Inge
tableStatuses.add(ingestionBlobInfo.getIngestionStatusInTable());
}

ObjectMapper objectMapper = Utils.getObjectMapper();
String serializedIngestionBlobInfo = objectMapper.writeValueAsString(ingestionBlobInfo);
QueueClient queueClient = resourceManager.getQueue().getQueue();
// trace postMessageToQueue
Map<String, String> attributes = new HashMap<>();
attributes.put("queue", queueClient.getQueueName());
MonitoredActivity.invoke(() -> azureStorageClient.postMessageToQueue(queueClient, serializedIngestionBlobInfo),
getClientType().concat(".postMessageToQueue"), attributes);
ResourceAlgorithms.postToQueueWithRetries(resourceManager, azureStorageClient, ingestionBlobInfo);

return reportToTable
? new TableReportIngestionResult(tableStatuses)
: new IngestionStatusResult(status);
Expand Down Expand Up @@ -182,17 +167,9 @@ protected IngestionResult ingestFromFileImpl(FileSourceInfo fileSourceInfo, Inge
ingestionProperties.getTableName(),
dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV.
shouldCompress ? CompressionType.gz : sourceCompressionType);
ContainerWithSas container = resourceManager.getTempStorage();
BlobContainerClient blobContainerClient = container.getContainer();
// trace uploadLocalFileToBlob
Map<String, String> attributes = new HashMap<>();
attributes.put("container", blobContainerClient.getBlobContainerName());
MonitoredActivity.invoke((SupplierOneException<Void, IOException>) () -> {
azureStorageClient.uploadLocalFileToBlob(file, blobName,
blobContainerClient, shouldCompress);
return null;
}, getClientType().concat(".uploadLocalFileToBlob"), attributes);
String blobPath = blobContainerClient.getBlobContainerUrl() + "/" + blobName + container.getSas();

String blobPath = ResourceAlgorithms.uploadLocalFileWithRetries(resourceManager, azureStorageClient, file, blobName, shouldCompress);

long rawDataSize = fileSourceInfo.getRawSizeInBytes() > 0L ? fileSourceInfo.getRawSizeInBytes()
: estimateFileRawSize(filePath, ingestionProperties.getDataFormat().isCompressible());

Expand Down Expand Up @@ -234,19 +211,15 @@ protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo
ingestionProperties.getTableName(),
dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV.
shouldCompress ? CompressionType.gz : streamSourceInfo.getCompressionType());
ContainerWithSas container = resourceManager.getTempStorage();
BlobContainerClient blobContainerClient = container.getContainer();
// trace uploadStreamToBlob
Map<String, String> attributes = new HashMap<>();
attributes.put("container", blobContainerClient.getBlobContainerName());
MonitoredActivity.invoke((SupplierTwoExceptions<Void, IOException, URISyntaxException>) () -> {
azureStorageClient.uploadStreamToBlob(streamSourceInfo.getStream(), blobName,
blobContainerClient, shouldCompress);
return null;
}, getClientType().concat(".uploadStreamToBlob"), attributes);
String blobPath = blobContainerClient.getBlobContainerUrl() + "/" + blobName + container.getSas();
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(
blobPath, 0, streamSourceInfo.getSourceId()); // TODO: check if we can get the rawDataSize locally - maybe add a countingStream

String blobPath = ResourceAlgorithms.uploadStreamToBlobWithRetries(resourceManager,
azureStorageClient,
streamSourceInfo.getStream(),
blobName,
shouldCompress);

BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath, 0, streamSourceInfo.getSourceId()); // TODO: check if we can get the rawDataSize
// locally - maybe add a countingStream

ingestionResult = ingestFromBlob(blobSourceInfo, ingestionProperties);
if (!streamSourceInfo.isLeaveOpen()) {
Expand All @@ -255,7 +228,7 @@ protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo
return ingestionResult;
} catch (BlobStorageException e) {
throw new IngestionServiceException("Failed to ingest from stream", e);
} catch (IOException | URISyntaxException e) {
} catch (IOException e) {
throw new IngestionClientException("Failed to ingest from stream", e);
} catch (IngestionServiceException e) {
throw e;
Expand All @@ -264,7 +237,7 @@ protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo

@Override
protected String getClientType() {
return QUEUED_INGEST_CLIENT_IMPL;
return CLASS_NAME;
}

private long estimateFileRawSize(String filePath, boolean isCompressible) {
Expand Down
Loading

0 comments on commit e2b288d

Please sign in to comment.