diff --git a/samples/batch/transcription-enabled-storage/Connector/BatchClient.cs b/samples/batch/transcription-enabled-storage/Connector/BatchClient.cs index f76a2fef5..2f3bbb963 100644 --- a/samples/batch/transcription-enabled-storage/Connector/BatchClient.cs +++ b/samples/batch/transcription-enabled-storage/Connector/BatchClient.cs @@ -7,6 +7,7 @@ namespace Connector { using System; using System.Collections.Generic; + using System.IO; using System.Net; using System.Net.Http; using System.Text; @@ -105,7 +106,14 @@ private static async Task PostAsync(string path, string subscriptionKey, st if (!responseMessage.IsSuccessStatusCode) { - var failureMessage = $"Failure: Status Code {responseMessage.StatusCode}, {responseMessage.Content.Headers}"; + var failureMessage = $"Failure - Status Code: {responseMessage.StatusCode}"; + + if (responseMessage.Content != null) + { + var body = await responseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); + failureMessage += $", Body: {body}"; + } + log.LogInformation(failureMessage); throw new WebException(failureMessage); } @@ -141,7 +149,14 @@ private static async Task DeleteAsync(string path, string subscriptionKey, TimeS if (!responseMessage.IsSuccessStatusCode) { - var failureMessage = $"Failure: Status Code {responseMessage.StatusCode}, {responseMessage.Content.Headers}"; + var failureMessage = $"Failure - Status Code: {responseMessage.StatusCode}"; + + if (responseMessage.Content != null) + { + var body = await responseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); + failureMessage += $", Body: {body}"; + } + log.LogInformation(failureMessage); throw new WebException(failureMessage); } @@ -181,6 +196,13 @@ private static async Task GetAsync(string path, string sub var failureMessage = $"Failure: Status Code {responseMessage.StatusCode}"; log.LogInformation(failureMessage); + + if (responseMessage.Content != null) + { + var body = await responseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); + failureMessage += $", Body: {body}"; + } + throw new WebException(failureMessage); } catch (OperationCanceledException) diff --git a/samples/batch/transcription-enabled-storage/Connector/Connector.csproj b/samples/batch/transcription-enabled-storage/Connector/Connector.csproj index c4914c68e..9ff6b1f36 100644 --- a/samples/batch/transcription-enabled-storage/Connector/Connector.csproj +++ b/samples/batch/transcription-enabled-storage/Connector/Connector.csproj @@ -18,21 +18,19 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - - + all runtime; build; native; contentfiles; analyzers; buildtransitive - diff --git a/samples/batch/transcription-enabled-storage/Connector/Constants.cs b/samples/batch/transcription-enabled-storage/Connector/Constants.cs new file mode 100644 index 000000000..c40546f85 --- /dev/null +++ b/samples/batch/transcription-enabled-storage/Connector/Constants.cs @@ -0,0 +1,30 @@ +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE.md file in the project root for full license information. +// + +namespace Connector.Constants +{ + public static class Constants + { + public const int MaxRetryLimit = 16; + + public const int DefaultRetryLimit = 4; + + public const int MaxInitialPollingDelayInMinutes = 120; + + public const int DefaultInitialPollingDelayInMinutes = 2; + + public const int DefaultMaxPollingDelayInMinutes = 120; + + public const char Delimiter = ';'; + + public const int MaxMessagesPerFunctionExecution = 10000; + + public const int DefaultMessagesPerFunctionExecution = 2000; + + public const int MaxFilesPerTranscriptionJob = 1000; + + public const int DefaultFilesPerTranscriptionJob = 100; + } +} \ No newline at end of file diff --git a/samples/batch/transcription-enabled-storage/Connector/StorageConnector.cs b/samples/batch/transcription-enabled-storage/Connector/StorageConnector.cs index 9bbf49c2d..e7bea1781 100644 --- a/samples/batch/transcription-enabled-storage/Connector/StorageConnector.cs +++ b/samples/batch/transcription-enabled-storage/Connector/StorageConnector.cs @@ -8,17 +8,17 @@ namespace Connector using System; using System.IO; using System.Linq; + using System.Text; using System.Threading.Tasks; using System.Web; using Azure.Storage; + using Azure.Storage.Blobs; using Azure.Storage.Sas; using Microsoft.Extensions.Logging; - using Microsoft.WindowsAzure.Storage; - using Microsoft.WindowsAzure.Storage.Blob; public class StorageConnector { - private CloudBlobClient CloudBlobClient; + private BlobServiceClient BlobServiceClient; private string AccountName; @@ -31,11 +31,10 @@ public StorageConnector(string storageConnectionString) throw new ArgumentNullException(nameof(storageConnectionString)); } - var storageAccount = CloudStorageAccount.Parse(storageConnectionString); - AccountName = GetValueFromConnectionString("AccountName", storageConnectionString); AccountKey = GetValueFromConnectionString("AccountKey", storageConnectionString); - CloudBlobClient = storageAccount.CreateCloudBlobClient(); + + BlobServiceClient = new BlobServiceClient(storageConnectionString); } public static string GetFileNameFromUri(Uri fileUri) @@ -80,13 +79,13 @@ public static string GetContainerNameFromUri(Uri fileUri) public static async Task DownloadFileFromSAS(string blobSas) { - var blob = new CloudBlockBlob(new Uri(blobSas)); + var blob = new BlobClient(new Uri(blobSas)); byte[] data; using (var memoryStream = new MemoryStream()) { - await blob.DownloadToStreamAsync(memoryStream).ConfigureAwait(false); + await blob.DownloadToAsync(memoryStream).ConfigureAwait(false); data = new byte[memoryStream.Length]; memoryStream.Position = 0; memoryStream.Read(data, 0, data.Length); @@ -99,49 +98,52 @@ public string CreateSas(Uri fileUri) { var containerName = GetContainerNameFromUri(fileUri); var fileName = GetFileNameFromUri(fileUri); - var container = CloudBlobClient.GetContainerReference(containerName); + var blobContainerClient = BlobServiceClient.GetBlobContainerClient(containerName); var sasBuilder = new BlobSasBuilder() { - BlobContainerName = container.Name, + BlobContainerName = blobContainerClient.Name, BlobName = fileName, Resource = "b", }; - sasBuilder.StartsOn = DateTime.UtcNow; + sasBuilder.StartsOn = DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(10)); sasBuilder.ExpiresOn = DateTime.UtcNow.AddDays(1); sasBuilder.SetPermissions(BlobContainerSasPermissions.Read); var sasToken = sasBuilder.ToSasQueryParameters(new StorageSharedKeyCredential(AccountName, AccountKey)).ToString(); - return $"{container.GetBlockBlobReference(fileName).Uri}?{sasToken}"; + return $"{blobContainerClient.GetBlobClient(fileName).Uri}?{sasToken}"; } public async Task WriteTextFileToBlobAsync(string content, string containerName, string fileName, ILogger log) { log.LogInformation($"Writing file {fileName} to container {containerName}."); - var container = CloudBlobClient.GetContainerReference(containerName); - var blockBlob = container.GetBlockBlobReference(fileName); + var container = BlobServiceClient.GetBlobContainerClient(containerName); + var blockBlob = container.GetBlobClient(fileName); - await blockBlob.UploadTextAsync(content).ConfigureAwait(false); + using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(content))) + { + await blockBlob.UploadAsync(stream).ConfigureAwait(false); + } } public async Task MoveFileAsync(string inputContainerName, string inputFileName, string outputContainerName, string outputFileName, ILogger log) { log.LogInformation($"Moving file {inputFileName} from container {inputContainerName} to {outputFileName} in container {outputContainerName}."); - var inputContainer = CloudBlobClient.GetContainerReference(inputContainerName); - var inputBlockBlob = inputContainer.GetBlockBlobReference(inputFileName); + var inputContainerClient = BlobServiceClient.GetBlobContainerClient(inputContainerName); + var inputBlockBlobClient = inputContainerClient.GetBlobClient(inputFileName); - if (!await inputBlockBlob.ExistsAsync().ConfigureAwait(false)) + if (!await inputBlockBlobClient.ExistsAsync().ConfigureAwait(false)) { log.LogError($"File {inputFileName} does not exist in container {inputContainerName}. Returning."); return; } - var outputContainer = CloudBlobClient.GetContainerReference(outputContainerName); - var outputBlockBlob = outputContainer.GetBlockBlobReference(outputFileName); + var outputContainerClient = BlobServiceClient.GetBlobContainerClient(outputContainerName); + var outputBlockBlobClient = outputContainerClient.GetBlobClient(outputFileName); - await outputBlockBlob.StartCopyAsync(inputBlockBlob).ConfigureAwait(false); - await inputBlockBlob.DeleteAsync().ConfigureAwait(false); + await outputBlockBlobClient.StartCopyFromUriAsync(inputBlockBlobClient.Uri).ConfigureAwait(false); + await inputBlockBlobClient.DeleteAsync().ConfigureAwait(false); } private static string GetValueFromConnectionString(string key, string connectionString) diff --git a/samples/batch/transcription-enabled-storage/FetchTranscription/FetchTranscription.csproj b/samples/batch/transcription-enabled-storage/FetchTranscription/FetchTranscription.csproj index aa24e9fa5..27a15b2c7 100644 --- a/samples/batch/transcription-enabled-storage/FetchTranscription/FetchTranscription.csproj +++ b/samples/batch/transcription-enabled-storage/FetchTranscription/FetchTranscription.csproj @@ -16,18 +16,18 @@ bin\Release\ - - + + + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - - all runtime; build; native; contentfiles; analyzers; buildtransitive - diff --git a/samples/batch/transcription-enabled-storage/FetchTranscription/FetchTranscriptionEnvironmentVariables.cs b/samples/batch/transcription-enabled-storage/FetchTranscription/FetchTranscriptionEnvironmentVariables.cs index 2a2b56802..c40cf9ebc 100644 --- a/samples/batch/transcription-enabled-storage/FetchTranscription/FetchTranscriptionEnvironmentVariables.cs +++ b/samples/batch/transcription-enabled-storage/FetchTranscription/FetchTranscriptionEnvironmentVariables.cs @@ -6,6 +6,7 @@ namespace FetchTranscriptionFunction { using System; + using Connector.Constants; using Connector.Enums; using Connector.Extensions; @@ -19,6 +20,12 @@ public static class FetchTranscriptionEnvironmentVariables public static readonly bool UseSqlDatabase = bool.TryParse(Environment.GetEnvironmentVariable(nameof(UseSqlDatabase), EnvironmentVariableTarget.Process), out UseSqlDatabase) && UseSqlDatabase; + public static readonly int InitialPollingDelayInMinutes = int.TryParse(Environment.GetEnvironmentVariable(nameof(InitialPollingDelayInMinutes), EnvironmentVariableTarget.Process), out InitialPollingDelayInMinutes) ? InitialPollingDelayInMinutes.ClampInt(2, Constants.MaxInitialPollingDelayInMinutes) : Constants.DefaultInitialPollingDelayInMinutes; + + public static readonly int MaxPollingDelayInMinutes = int.TryParse(Environment.GetEnvironmentVariable(nameof(MaxPollingDelayInMinutes), EnvironmentVariableTarget.Process), out MaxPollingDelayInMinutes) ? MaxPollingDelayInMinutes : Constants.DefaultMaxPollingDelayInMinutes; + + public static readonly int RetryLimit = int.TryParse(Environment.GetEnvironmentVariable(nameof(RetryLimit), EnvironmentVariableTarget.Process), out RetryLimit) ? RetryLimit.ClampInt(1, Constants.MaxRetryLimit) : Constants.DefaultRetryLimit; + public static readonly string AudioInputContainer = Environment.GetEnvironmentVariable(nameof(AudioInputContainer), EnvironmentVariableTarget.Process); public static readonly string AzureSpeechServicesKey = Environment.GetEnvironmentVariable(nameof(AzureSpeechServicesKey), EnvironmentVariableTarget.Process); @@ -37,16 +44,10 @@ public static class FetchTranscriptionEnvironmentVariables public static readonly string JsonResultOutputContainer = Environment.GetEnvironmentVariable(nameof(JsonResultOutputContainer), EnvironmentVariableTarget.Process); - public static readonly int RetryLimit = int.TryParse(Environment.GetEnvironmentVariable(nameof(RetryLimit), EnvironmentVariableTarget.Process), out RetryLimit) ? RetryLimit.ClampInt(1, MaxRetryLimit) : DefaultRetryLimit; - public static readonly string StartTranscriptionServiceBusConnectionString = Environment.GetEnvironmentVariable(nameof(StartTranscriptionServiceBusConnectionString), EnvironmentVariableTarget.Process); public static readonly string TextAnalyticsKey = Environment.GetEnvironmentVariable(nameof(TextAnalyticsKey), EnvironmentVariableTarget.Process); public static readonly string TextAnalyticsRegion = Environment.GetEnvironmentVariable(nameof(TextAnalyticsRegion), EnvironmentVariableTarget.Process); - - private const int MaxRetryLimit = 16; - - private const int DefaultRetryLimit = 4; } } diff --git a/samples/batch/transcription-enabled-storage/FetchTranscription/TextAnalytics.cs b/samples/batch/transcription-enabled-storage/FetchTranscription/TextAnalytics.cs index 2870b36ba..d85e5049a 100644 --- a/samples/batch/transcription-enabled-storage/FetchTranscription/TextAnalytics.cs +++ b/samples/batch/transcription-enabled-storage/FetchTranscription/TextAnalytics.cs @@ -202,8 +202,8 @@ private static List RemoveOverlappingEntities(List o.Offset); - List resultEntities = textAnalyticsEntities; + var orderedEntities = textAnalyticsEntities.OrderBy(o => o.Offset); + var resultEntities = orderedEntities.ToList(); bool foundOverlap; do diff --git a/samples/batch/transcription-enabled-storage/FetchTranscription/TranscriptionProcessor.cs b/samples/batch/transcription-enabled-storage/FetchTranscription/TranscriptionProcessor.cs index 3060e39b3..2c8b4867c 100644 --- a/samples/batch/transcription-enabled-storage/FetchTranscription/TranscriptionProcessor.cs +++ b/samples/batch/transcription-enabled-storage/FetchTranscription/TranscriptionProcessor.cs @@ -16,6 +16,7 @@ namespace FetchTranscriptionFunction using Connector.Enums; using Microsoft.Azure.ServiceBus; using Microsoft.Extensions.Logging; + using Microsoft.WindowsAzure.Storage; using Newtonsoft.Json; public static class TranscriptionProcessor @@ -39,8 +40,8 @@ public static async Task ProcessTranscriptionJobAsync(TranscriptionStartedMessag var transcriptionLocation = serviceBusMessage.TranscriptionLocation; log.LogInformation($"Received transcription at {transcriptionLocation} with name {jobName} from service bus message."); - serviceBusMessage.PollingCounter += 1; var messageDelayTime = GetMessageDelayTime(serviceBusMessage.PollingCounter); + serviceBusMessage.PollingCounter += 1; try { @@ -77,28 +78,35 @@ public static async Task ProcessTranscriptionJobAsync(TranscriptionStartedMessag log.LogError($"{errorMessage}"); await RetryOrFailJobAsync(serviceBusMessage, errorMessage, jobName, transcriptionLocation, subscriptionKey, log).ConfigureAwait(false); } - - throw; } catch (TimeoutException e) { log.LogInformation($"Timeout - re-enqueueing fetch transcription message. Exception message: {e.Message}"); await ServiceBusUtilities.SendServiceBusMessageAsync(FetchQueueClientInstance, serviceBusMessage.CreateMessageString(), log, messageDelayTime).ConfigureAwait(false); - throw; } catch (Exception e) { var errorMessage = $"Fetch Transcription in job with name {jobName} failed with Exception {e} and message {e.Message}."; log.LogError($"{errorMessage}"); await RetryOrFailJobAsync(serviceBusMessage, errorMessage, jobName, transcriptionLocation, subscriptionKey, log).ConfigureAwait(false); - throw; } } private static TimeSpan GetMessageDelayTime(int pollingCounter) { - var delayInMinutes = Math.Pow(2, Math.Min(pollingCounter, 7)); - return TimeSpan.FromMinutes(delayInMinutes); + if (pollingCounter == 0) + { + return TimeSpan.FromMinutes(FetchTranscriptionEnvironmentVariables.InitialPollingDelayInMinutes); + } + + var updatedDelay = Math.Pow(2, Math.Min(pollingCounter, 8)) * FetchTranscriptionEnvironmentVariables.InitialPollingDelayInMinutes; + + if ((int)updatedDelay > FetchTranscriptionEnvironmentVariables.MaxPollingDelayInMinutes) + { + return TimeSpan.FromMinutes(FetchTranscriptionEnvironmentVariables.MaxPollingDelayInMinutes); + } + + return TimeSpan.FromMinutes(updatedDelay); } private static async Task ProcessFailedTranscriptionAsync(string transcriptionLocation, string subscriptionKey, TranscriptionStartedMessage serviceBusMessage, Transcription transcription, string jobName, ILogger log) @@ -302,7 +310,7 @@ private static async Task ProcessReportFileAsync(TranscriptionReportFile transcr var failedTranscriptions = transcriptionReportFile.Details. Where(detail => !string.IsNullOrEmpty(detail.Status) && detail.Status.Equals("Failed", StringComparison.OrdinalIgnoreCase) && - !string.IsNullOrEmpty(detail.Source)); + !string.IsNullOrEmpty(detail.Source)); foreach (var failedTranscription in failedTranscriptions) { @@ -362,13 +370,20 @@ private static async Task WriteFailedJobLogToStorageAsync(TranscriptionStartedMe { var fileName = StorageConnector.GetFileNameFromUri(new Uri(audioFileInfo.FileUrl)); var errorFileName = fileName + ".txt"; - await StorageConnectorInstance.WriteTextFileToBlobAsync(errorMessage, errorOutputContainer, errorFileName, log).ConfigureAwait(false); - await StorageConnectorInstance.MoveFileAsync( - FetchTranscriptionEnvironmentVariables.AudioInputContainer, - fileName, - FetchTranscriptionEnvironmentVariables.ErrorFilesOutputContainer, - fileName, - log).ConfigureAwait(false); + try + { + await StorageConnectorInstance.WriteTextFileToBlobAsync(errorMessage, errorOutputContainer, errorFileName, log).ConfigureAwait(false); + await StorageConnectorInstance.MoveFileAsync( + FetchTranscriptionEnvironmentVariables.AudioInputContainer, + fileName, + FetchTranscriptionEnvironmentVariables.ErrorFilesOutputContainer, + fileName, + log).ConfigureAwait(false); + } + catch (StorageException e) + { + log.LogError($"Storage Exception {e} while writing error log to file and moving result"); + } } } } diff --git a/samples/batch/transcription-enabled-storage/FetchTranscription/host.json b/samples/batch/transcription-enabled-storage/FetchTranscription/host.json index a1a82f322..ccaf32f55 100644 --- a/samples/batch/transcription-enabled-storage/FetchTranscription/host.json +++ b/samples/batch/transcription-enabled-storage/FetchTranscription/host.json @@ -1,12 +1,4 @@ { "version": "2.0", - "functionTimeout": "00:30:00", - "extensions": { - "serviceBus": { - "messageHandlerOptions": { - "autoComplete": true, - "maxAutoRenewDuration": "00:30:00" - } - } - } + "functionTimeout": "00:15:00" } \ No newline at end of file diff --git a/samples/batch/transcription-enabled-storage/Setup/ArmTemplate.json b/samples/batch/transcription-enabled-storage/Setup/ArmTemplate.json index 04d98f669..66edf7dc0 100644 --- a/samples/batch/transcription-enabled-storage/Setup/ArmTemplate.json +++ b/samples/batch/transcription-enabled-storage/Setup/ArmTemplate.json @@ -279,9 +279,11 @@ "ErrorFilesOutputContainer": "error-files", "CreateHtmlResultFile": false, "TimerBasedExecution": true, - "MessagesPerFunctionExecution": 500, - "FilesPerTranscriptionJob": 16, + "MessagesPerFunctionExecution": 1000, + "FilesPerTranscriptionJob": 100, "RetryLimit": 4, + "InitialPollingDelayInMinutes": 2, + "MaxPollingDelayInMinutes": 180, "InstanceId": "[parameters('DeploymentId')]", "StorageAccountName": "[parameters('StorageAccount')]", "UseSqlDatabase": "[and(not(equals(parameters('SqlAdministratorLogin'),'')), not(equals(parameters('SqlAdministratorLoginPassword'),'')))]", @@ -695,8 +697,25 @@ "includedEventTypes": [ "Microsoft.Storage.BlobCreated" ], - "subjectBeginsWith": "[concat('/blobServices/default/containers/', variables('AudioInputContainer'))]", - "advancedFilters": [] + "advancedFilters": [ + { + "operatorType": "StringBeginsWith", + "key": "Subject", + "values": [ + "[concat('/blobServices/default/containers/', variables('AudioInputContainer'), '/blobs')]" + ] + }, + { + "operatorType": "StringContains", + "key": "data.api", + "values": [ + "FlushWithClose", + "PutBlob", + "PutBlockList", + "CopyBlob" + ] + } + ] }, "labels": [], "eventDeliverySchema": "EventGridSchema" @@ -747,13 +766,13 @@ "properties": { "AddDiarization": "[parameters('AddDiarization')]", "AddWordLevelTimestamps": "[parameters('AddWordLevelTimestamps')]", - "APPINSIGHTS_INSTRUMENTATIONKEY": "[reference(resourceId('Microsoft.Insights/components', variables('AppInsightsName')), '2015-05-01').InstrumentationKey]", + "APPLICATIONINSIGHTS_CONNECTION_STRING": "[reference(resourceId('Microsoft.Insights/components', variables('AppInsightsName')), '2015-05-01').ConnectionString]", "AudioInputContainer": "[variables('AudioInputContainer')]", "AzureServiceBus": "[listKeys(variables('AuthRuleRMK'),'2015-08-01').primaryConnectionString]", "AzureSpeechServicesKey": "[parameters('AzureSpeechServicesKey')]", "AzureSpeechServicesRegion": "[parameters('AzureSpeechServicesRegion')]", - "AzureWebJobsStorage": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('StorageAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('StorageAccountName')), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value)]", - "AzureWebJobsDashboard": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('StorageAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('StorageAccountName')), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value)]", + "AzureWebJobsStorage": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('StorageAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('StorageAccountName')), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=core.windows.net')]", + "AzureWebJobsDashboard": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('StorageAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('StorageAccountName')), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=core.windows.net')]", "CustomModelId": "[parameters('CustomModelId')]", "ErrorFilesOutputContainer": "[variables('ErrorFilesOutputContainer')]", "ErrorReportOutputContainer": "[variables('ErrorReportOutputContainer')]", @@ -761,6 +780,8 @@ "FilesPerTranscriptionJob": "[variables('FilesPerTranscriptionJob')]", "FUNCTIONS_EXTENSION_VERSION": "~3", "FUNCTIONS_WORKER_RUNTIME": "dotnet", + "InitialPollingDelayInMinutes": "[variables('InitialPollingDelayInMinutes')]", + "MaxPollingDelayInMinutes": "[variables('MaxPollingDelayInMinutes')]", "Locale": "[parameters('Locale')]", "MessagesPerFunctionExecution": "[variables('MessagesPerFunctionExecution')]", "ProfanityFilterMode": "[parameters('ProfanityFilterMode')]", @@ -803,14 +824,14 @@ "displayName": "WebAppSettings" }, "properties": { - "APPINSIGHTS_INSTRUMENTATIONKEY": "[reference(resourceId('Microsoft.Insights/components', variables('AppInsightsName')), '2015-05-01').InstrumentationKey]", + "APPLICATIONINSIGHTS_CONNECTION_STRING": "[reference(resourceId('Microsoft.Insights/components', variables('AppInsightsName')), '2015-05-01').ConnectionString]", "EntityRedactionSetting": "[parameters('EntityRedaction')]", "SentimentAnalysisSetting": "[parameters('SentimentAnalysis')]", "AudioInputContainer": "[variables('AudioInputContainer')]", "AzureServiceBus": "[listKeys(variables('AuthRuleRMK'),'2015-08-01').primaryConnectionString]", "AzureSpeechServicesKey": "[parameters('AzureSpeechServicesKey')]", - "AzureWebJobsStorage": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('StorageAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('StorageAccountName')), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value)]", - "AzureWebJobsDashboard": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('StorageAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('StorageAccountName')), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value)]", + "AzureWebJobsStorage": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('StorageAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('StorageAccountName')), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=core.windows.net')]", + "AzureWebJobsDashboard": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('StorageAccountName'), ';AccountKey=', listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('StorageAccountName')), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=core.windows.net')]", "CreateHtmlResultFile": "[variables('CreateHtmlResultFile')]", "DatabaseConnectionString": "[if(variables('UseSqlDatabase'), concat('Server=tcp:',reference(variables('SqlServerName'), '2014-04-01-preview').fullyQualifiedDomainName,',1433;Initial Catalog=',variables('DatabaseName'),';Persist Security Info=False;User ID=',parameters('SqlAdministratorLogin'),';Password=',parameters('SqlAdministratorLoginPassword'),';MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;'), '')]", "ErrorFilesOutputContainer": "[variables('ErrorFilesOutputContainer')]", @@ -819,6 +840,8 @@ "FUNCTIONS_EXTENSION_VERSION": "~3", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "HtmlResultOutputContainer": "[variables('HtmlResultOutputContainer')]", + "InitialPollingDelayInMinutes": "[variables('InitialPollingDelayInMinutes')]", + "MaxPollingDelayInMinutes": "[variables('MaxPollingDelayInMinutes')]", "JsonResultOutputContainer": "[variables('JsonResultOutputContainer')]", "RetryLimit": "[variables('RetryLimit')]", "StartTranscriptionServiceBusConnectionString": "[listKeys(variables('AuthRuleCT'),'2015-08-01').primaryConnectionString]", diff --git a/samples/batch/transcription-enabled-storage/StartTranscriptionByServiceBus/StartTranscriptionByServiceBus.csproj b/samples/batch/transcription-enabled-storage/StartTranscriptionByServiceBus/StartTranscriptionByServiceBus.csproj index 2f2cbbd28..d8e9ab404 100644 --- a/samples/batch/transcription-enabled-storage/StartTranscriptionByServiceBus/StartTranscriptionByServiceBus.csproj +++ b/samples/batch/transcription-enabled-storage/StartTranscriptionByServiceBus/StartTranscriptionByServiceBus.csproj @@ -22,7 +22,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive - + all diff --git a/samples/batch/transcription-enabled-storage/StartTranscriptionByServiceBus/host.json b/samples/batch/transcription-enabled-storage/StartTranscriptionByServiceBus/host.json index b9f92c0de..81e35b7b9 100644 --- a/samples/batch/transcription-enabled-storage/StartTranscriptionByServiceBus/host.json +++ b/samples/batch/transcription-enabled-storage/StartTranscriptionByServiceBus/host.json @@ -1,3 +1,3 @@ { - "version": "2.0" + "version": "2.0" } \ No newline at end of file diff --git a/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionByTimer.cs b/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionByTimer.cs index 38d3237f0..85e793c35 100644 --- a/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionByTimer.cs +++ b/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionByTimer.cs @@ -45,7 +45,6 @@ public static async Task Run([TimerTrigger("0 */2 * * * *")] TimerInfo myTimer, var transcriptionHelper = new StartTranscriptionHelper(log); log.LogInformation("Pulling messages from queue..."); - var messages = await MessageReceiverInstance.ReceiveAsync(StartTranscriptionEnvironmentVariables.MessagesPerFunctionExecution, TimeSpan.FromSeconds(MessageReceiveTimeoutInSeconds)).ConfigureAwait(false); if (messages == null || !messages.Any()) diff --git a/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionByTimer.csproj b/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionByTimer.csproj index 186366bb7..5e779668b 100644 --- a/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionByTimer.csproj +++ b/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionByTimer.csproj @@ -16,8 +16,8 @@ bin\Release\ - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionEnvironmentVariables.cs b/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionEnvironmentVariables.cs index 19007adad..a8d351078 100644 --- a/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionEnvironmentVariables.cs +++ b/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionEnvironmentVariables.cs @@ -6,6 +6,7 @@ namespace StartTranscriptionByTimer { using System; + using Connector.Constants; using Connector.Extensions; public static class StartTranscriptionEnvironmentVariables @@ -14,11 +15,17 @@ public static class StartTranscriptionEnvironmentVariables public static readonly bool AddWordLevelTimestamps = bool.TryParse(Environment.GetEnvironmentVariable(nameof(AddWordLevelTimestamps), EnvironmentVariableTarget.Process), out AddWordLevelTimestamps) && AddWordLevelTimestamps; - public static readonly int MessagesPerFunctionExecution = int.TryParse(Environment.GetEnvironmentVariable(nameof(MessagesPerFunctionExecution), EnvironmentVariableTarget.Process), out MessagesPerFunctionExecution) ? MessagesPerFunctionExecution.ClampInt(1, MaxMessagesPerFunctionExecution) : DefaultMessagesPerFunctionExecution; + public static readonly bool IsAzureGovDeployment = bool.TryParse(Environment.GetEnvironmentVariable(nameof(IsAzureGovDeployment), EnvironmentVariableTarget.Process), out IsAzureGovDeployment) && IsAzureGovDeployment; - public static readonly int FilesPerTranscriptionJob = int.TryParse(Environment.GetEnvironmentVariable(nameof(FilesPerTranscriptionJob), EnvironmentVariableTarget.Process), out FilesPerTranscriptionJob) ? FilesPerTranscriptionJob.ClampInt(1, MaxFilesPerTranscriptionJob) : DefaultFilesPerTranscriptionJob; + public static readonly int MessagesPerFunctionExecution = int.TryParse(Environment.GetEnvironmentVariable(nameof(MessagesPerFunctionExecution), EnvironmentVariableTarget.Process), out MessagesPerFunctionExecution) ? MessagesPerFunctionExecution.ClampInt(1, Constants.MaxMessagesPerFunctionExecution) : Constants.DefaultMessagesPerFunctionExecution; - public static readonly int RetryLimit = int.TryParse(Environment.GetEnvironmentVariable(nameof(RetryLimit), EnvironmentVariableTarget.Process), out RetryLimit) ? RetryLimit.ClampInt(1, MaxRetryLimit) : DefaultRetryLimit; + public static readonly int FilesPerTranscriptionJob = int.TryParse(Environment.GetEnvironmentVariable(nameof(FilesPerTranscriptionJob), EnvironmentVariableTarget.Process), out FilesPerTranscriptionJob) ? FilesPerTranscriptionJob.ClampInt(1, Constants.MaxFilesPerTranscriptionJob) : Constants.DefaultFilesPerTranscriptionJob; + + public static readonly int RetryLimit = int.TryParse(Environment.GetEnvironmentVariable(nameof(RetryLimit), EnvironmentVariableTarget.Process), out RetryLimit) ? RetryLimit.ClampInt(1, Constants.MaxRetryLimit) : Constants.DefaultRetryLimit; + + public static readonly int InitialPollingDelayInMinutes = int.TryParse(Environment.GetEnvironmentVariable(nameof(InitialPollingDelayInMinutes), EnvironmentVariableTarget.Process), out InitialPollingDelayInMinutes) ? InitialPollingDelayInMinutes.ClampInt(2, Constants.MaxInitialPollingDelayInMinutes) : Constants.DefaultInitialPollingDelayInMinutes; + + public static readonly int MaxPollingDelayInMinutes = int.TryParse(Environment.GetEnvironmentVariable(nameof(MaxPollingDelayInMinutes), EnvironmentVariableTarget.Process), out MaxPollingDelayInMinutes) ? MaxPollingDelayInMinutes : Constants.DefaultMaxPollingDelayInMinutes; public static readonly string AudioInputContainer = Environment.GetEnvironmentVariable(nameof(AudioInputContainer), EnvironmentVariableTarget.Process); @@ -49,17 +56,5 @@ public static class StartTranscriptionEnvironmentVariables public static readonly string SecondaryLocale = Environment.GetEnvironmentVariable(nameof(SecondaryLocale), EnvironmentVariableTarget.Process); public static readonly string StartTranscriptionServiceBusConnectionString = Environment.GetEnvironmentVariable(nameof(StartTranscriptionServiceBusConnectionString), EnvironmentVariableTarget.Process); - - private const int MaxMessagesPerFunctionExecution = 5000; - - private const int DefaultMessagesPerFunctionExecution = 500; - - private const int MaxFilesPerTranscriptionJob = 1000; - - private const int DefaultFilesPerTranscriptionJob = 16; - - private const int MaxRetryLimit = 16; - - private const int DefaultRetryLimit = 4; } } diff --git a/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionHelper.cs b/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionHelper.cs index a1d8836d8..fcdff490b 100644 --- a/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionHelper.cs +++ b/samples/batch/transcription-enabled-storage/StartTranscriptionByTimer/StartTranscriptionHelper.cs @@ -7,6 +7,7 @@ namespace StartTranscriptionByTimer { using System; using System.Collections.Generic; + using System.Diagnostics; using System.Globalization; using System.IO; using System.Linq; @@ -18,6 +19,7 @@ namespace StartTranscriptionByTimer using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Core; using Microsoft.Extensions.Logging; + using Microsoft.WindowsAzure.Storage; using Newtonsoft.Json; public class StartTranscriptionHelper @@ -30,15 +32,15 @@ public class StartTranscriptionHelper private readonly string SubscriptionKey = StartTranscriptionEnvironmentVariables.AzureSpeechServicesKey; - private readonly string SubscriptionRegion = StartTranscriptionEnvironmentVariables.AzureSpeechServicesRegion; - private readonly string ErrorReportContaineName = StartTranscriptionEnvironmentVariables.ErrorReportOutputContainer; private readonly string AudioInputContainerName = StartTranscriptionEnvironmentVariables.AudioInputContainer; private readonly int FilesPerTranscriptionJob = StartTranscriptionEnvironmentVariables.FilesPerTranscriptionJob; - private readonly string HostName = $"https://{StartTranscriptionEnvironmentVariables.AzureSpeechServicesRegion}.api.cognitive.microsoft.com/"; + private readonly string HostName = StartTranscriptionEnvironmentVariables.IsAzureGovDeployment ? + $"https://{StartTranscriptionEnvironmentVariables.AzureSpeechServicesRegion}.api.cognitive.microsoft.us/" : + $"https://{StartTranscriptionEnvironmentVariables.AzureSpeechServicesRegion}.api.cognitive.microsoft.com/"; private ILogger Logger; @@ -71,6 +73,9 @@ public async Task StartTranscriptionsAsync(IEnumerable messages, Messag chunkedMessages.Add(chunk); } + var stopwatch = new Stopwatch(); + stopwatch.Start(); + for (var i = 0; i < chunkedMessages.Count; i++) { var jobName = $"{startDateTime.ToString("yyyy-MM-ddTHH:mm:ss", CultureInfo.InvariantCulture)}_{i}"; @@ -78,17 +83,22 @@ public async Task StartTranscriptionsAsync(IEnumerable messages, Messag await StartBatchTranscriptionJobAsync(chunk, jobName).ConfigureAwait(false); await messageReceiver.CompleteAsync(chunk.Select(m => m.SystemProperties.LockToken)).ConfigureAwait(false); - // Renew lock on remaining messages - foreach (var remainingChunk in chunkedMessages.Skip(i + 1)) + // only renew lock after 2 minutes + if (stopwatch.Elapsed.TotalSeconds > 120) { - foreach (var message in remainingChunk) + foreach (var remainingChunk in chunkedMessages.Skip(i + 1)) { - await messageReceiver.RenewLockAsync(message).ConfigureAwait(false); + foreach (var message in remainingChunk) + { + await messageReceiver.RenewLockAsync(message).ConfigureAwait(false); + } } + + stopwatch.Restart(); } // Delay here to avoid throttling - await Task.Delay(2000).ConfigureAwait(false); + await Task.Delay(500).ConfigureAwait(false); } } @@ -113,7 +123,7 @@ public async Task StartTranscriptionAsync(Message message) Logger.LogInformation($"Primary locale: {Locale}"); Logger.LogInformation($"Secondary locale: {secondaryLocale}"); - var languageID = new LanguageIdentification(SubscriptionKey, SubscriptionRegion); + var languageID = new LanguageIdentification(SubscriptionKey, StartTranscriptionEnvironmentVariables.AzureSpeechServicesRegion); var fileExtension = Path.GetExtension(audioFileName); var sasUrl = StorageConnectorInstance.CreateSas(busMessage.Data.Url); var byteArray = await StorageConnector.DownloadFileFromSAS(sasUrl).ConfigureAwait(false); @@ -155,16 +165,21 @@ public bool IsValidServiceBusMessage(Message message) return false; } - private static TimeSpan GetInitialFetchingDelay(int transcriptionCount) - { - var minutes = Math.Max(2, transcriptionCount / 100); - return TimeSpan.FromMinutes(minutes); - } - private static TimeSpan GetMessageDelayTime(int pollingCounter) { - var delayInMinutes = Math.Pow(2, Math.Min(pollingCounter, 7)); - return TimeSpan.FromMinutes(delayInMinutes); + if (pollingCounter == 0) + { + return TimeSpan.FromMinutes(StartTranscriptionEnvironmentVariables.InitialPollingDelayInMinutes); + } + + var updatedDelay = Math.Pow(2, Math.Min(pollingCounter, 8)) * StartTranscriptionEnvironmentVariables.InitialPollingDelayInMinutes; + + if ((int)updatedDelay > StartTranscriptionEnvironmentVariables.MaxPollingDelayInMinutes) + { + return TimeSpan.FromMinutes(StartTranscriptionEnvironmentVariables.MaxPollingDelayInMinutes); + } + + return TimeSpan.FromMinutes(updatedDelay); } [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Allow general exception catching to retry transcriptions in that case.")] @@ -176,7 +191,6 @@ private async Task StartBatchTranscriptionJobAsync(IEnumerable messages return; } - var fetchingDelay = GetInitialFetchingDelay(messages.Count()); var locationString = string.Empty; var serviceBusMessages = messages.Select(message => JsonConvert.DeserializeObject(Encoding.UTF8.GetString(message.Body))); @@ -220,11 +234,12 @@ private async Task StartBatchTranscriptionJobAsync(IEnumerable messages 0, 0); + var fetchingDelay = TimeSpan.FromMinutes(StartTranscriptionEnvironmentVariables.InitialPollingDelayInMinutes); await ServiceBusUtilities.SendServiceBusMessageAsync(FetchQueueClientInstance, transcriptionMessage.CreateMessageString(), Logger, fetchingDelay).ConfigureAwait(false); } catch (WebException e) { - if (BatchClient.IsThrottledOrTimeoutStatusCode(((HttpWebResponse)e.Response).StatusCode)) + if (e.Response != null && BatchClient.IsThrottledOrTimeoutStatusCode(((HttpWebResponse)e.Response).StatusCode)) { var errorMessage = $"Throttled or timeout while creating post. Error Message: {e.Message}"; Logger.LogError(errorMessage); @@ -234,31 +249,20 @@ private async Task StartBatchTranscriptionJobAsync(IEnumerable messages { var errorMessage = $"Start Transcription in job with name {jobName} failed with WebException {e} and message {e.Message}"; Logger.LogError(errorMessage); - - using (var reader = new StreamReader(e.Response.GetResponseStream())) - { - var responseMessage = await reader.ReadToEndAsync().ConfigureAwait(false); - errorMessage += "\nResponse message:" + responseMessage; - } - await WriteFailedJobLogToStorageAsync(serviceBusMessages, errorMessage, jobName).ConfigureAwait(false); } - - throw; } catch (TimeoutException e) { var errorMessage = $"Timeout while creating post, re-enqueueing transcription start. Message: {e.Message}"; Logger.LogError(errorMessage); await RetryOrFailMessagesAsync(messages, errorMessage).ConfigureAwait(false); - throw; } catch (Exception e) { var errorMessage = $"Start Transcription in job with name {jobName} failed with exception {e} and message {e.Message}"; Logger.LogError(errorMessage); await WriteFailedJobLogToStorageAsync(serviceBusMessages, errorMessage, jobName).ConfigureAwait(false); - throw; } Logger.LogInformation($"Fetch transcription queue successfully informed about job at: {jobName}"); @@ -275,15 +279,8 @@ private async Task RetryOrFailMessagesAsync(IEnumerable messages, strin var fileName = StorageConnector.GetFileNameFromUri(sbMessage.Data.Url); var errorFileName = fileName + ".txt"; var retryExceededErrorMessage = $"Exceeded retry count for transcription {fileName} with error message {errorMessage}."; - Logger.LogInformation(retryExceededErrorMessage); - await StorageConnectorInstance.WriteTextFileToBlobAsync(retryExceededErrorMessage, ErrorReportContaineName, errorFileName, Logger).ConfigureAwait(false); - - await StorageConnectorInstance.MoveFileAsync( - AudioInputContainerName, - fileName, - StartTranscriptionEnvironmentVariables.ErrorFilesOutputContainer, - fileName, - Logger).ConfigureAwait(false); + Logger.LogError(retryExceededErrorMessage); + await ProcessFailedFileAsync(fileName, errorMessage, errorFileName).ConfigureAwait(false); } else { @@ -304,13 +301,7 @@ private async Task WriteFailedJobLogToStorageAsync(IEnumerable GetTranscriptionPropertyBag() return properties; } + + private async Task ProcessFailedFileAsync(string fileName, string errorMessage, string logFileName) + { + try + { + await StorageConnectorInstance.WriteTextFileToBlobAsync(errorMessage, ErrorReportContaineName, logFileName, Logger).ConfigureAwait(false); + await StorageConnectorInstance.MoveFileAsync( + AudioInputContainerName, + fileName, + StartTranscriptionEnvironmentVariables.ErrorFilesOutputContainer, + fileName, + Logger).ConfigureAwait(false); + } + catch (StorageException e) + { + Logger.LogError($"Storage Exception {e} while writing error log to file and moving result"); + } + } } }