Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,18 @@ public async Task<OneOf<GenerateDailySummaryReportResponse, Error>> Process(
{
logger.LogInformation("Starting daily summary report generation with Altinn2Included={altinn2Included}", request.Altinn2Included);

// Get file transfers for statistics with service owner IDs (optimized to avoid N+1 queries)
// Note: Broker only supports Altinn3, so Altinn2Included is ignored but kept for API compatibility
var (fileTransfers, serviceOwnerIds) = await fileTransferRepository.GetFileTransfersForReportWithServiceOwnerIds(cancellationToken);
logger.LogInformation("Retrieved {count} file transfers for daily summary report", fileTransfers.Count);
var aggregatedData = await fileTransferRepository.GetAggregatedDailySummaryData(cancellationToken);
logger.LogInformation("Retrieved {count} aggregated records for daily summary report", aggregatedData.Count);

if (fileTransfers.Count == 0)
if (aggregatedData.Count == 0)
{
logger.LogWarning("No file transfers found for daily summary report generation");
return StatisticsErrors.NoFileTransfersFound;
}
// Aggregate daily data (using pre-fetched service owner IDs for performance)
var summaryData = await AggregateDailyDataAsync(fileTransfers, serviceOwnerIds, cancellationToken);

var summaryData = await EnrichAggregatedDataAsync(aggregatedData, cancellationToken);
logger.LogInformation("Aggregated data into {count} daily summary records", summaryData.Count);

// Generate parquet file and upload to blob storage
var (blobUrl, fileHash, fileSize) = await GenerateAndUploadParquetFile(summaryData, request.Altinn2Included, cancellationToken);

var response = new GenerateDailySummaryReportResponse
Expand All @@ -57,7 +54,7 @@ public async Task<OneOf<GenerateDailySummaryReportResponse, Error>> Process(
Environment = hostEnvironment.EnvironmentName ?? "Unknown",
FileSizeBytes = fileSize,
FileHash = fileHash,
Altinn2Included = false // Broker only supports Altinn3
Altinn2Included = false
};

logger.LogInformation("Successfully generated and uploaded daily summary report to blob storage: {blobUrl}", blobUrl);
Expand All @@ -70,61 +67,16 @@ public async Task<OneOf<GenerateDailySummaryReportResponse, Error>> Process(
}
}

private async Task<List<DailySummaryData>> AggregateDailyDataAsync(
List<FileTransferEntity> fileTransfers,
Dictionary<Guid, string> preFetchedServiceOwnerIds,
private async Task<List<DailySummaryData>> EnrichAggregatedDataAsync(
List<Core.Repositories.AggregatedDailySummaryData> aggregatedData,
CancellationToken cancellationToken)
{
// Flatten file transfers with recipients - each recipient gets its own row
var flattenedData = new List<(FileTransferEntity ft, string recipientId)>();

foreach (var ft in fileTransfers)
{
if (ft.RecipientCurrentStatuses.Any())
{
foreach (var recipient in ft.RecipientCurrentStatuses)
{
flattenedData.Add((ft, recipient.Actor.ActorExternalId));
}
}
else
{
// If no recipients, still include the file transfer with unknown recipient
flattenedData.Add((ft, "unknown"));
}
}

// Use pre-fetched service owner IDs from SQL query (avoids N database queries)
var serviceOwnerIds = preFetchedServiceOwnerIds;
var uniqueServiceOwnerIds = aggregatedData
.Select(d => d.ServiceOwnerId)
.Distinct()
.Where(id => !string.IsNullOrEmpty(id) && id != "unknown")
.ToList();

// For file transfers without service owner ID in query, fall back to lookup
var missingServiceOwnerIds = new List<FileTransferEntity>();
foreach (var ft in fileTransfers)
{
if (!serviceOwnerIds.ContainsKey(ft.FileTransferId))
{
missingServiceOwnerIds.Add(ft);
}
}

// Fetch missing service owner IDs in parallel
if (missingServiceOwnerIds.Any())
{
var missingIdsTasks = missingServiceOwnerIds.Select(async ft =>
{
var serviceOwnerId = await GetServiceOwnerIdAsync(ft, cancellationToken);
return (ft.FileTransferId, serviceOwnerId);
});

var missingIdsResults = await Task.WhenAll(missingIdsTasks);
foreach (var (fileTransferId, serviceOwnerId) in missingIdsResults)
{
serviceOwnerIds[fileTransferId] = serviceOwnerId;
}
}

// Fetch service owner names in parallel (batch processing)
var uniqueServiceOwnerIds = serviceOwnerIds.Values.Distinct().Where(id => !string.IsNullOrEmpty(id) && id != "unknown").ToList();
var serviceOwnerNames = new Dictionary<string, string>();

var serviceOwnerNameTasks = uniqueServiceOwnerIds.Select(async serviceOwnerId =>
Expand All @@ -139,11 +91,10 @@ private async Task<List<DailySummaryData>> AggregateDailyDataAsync(
serviceOwnerNames[serviceOwnerId] = name;
}

// Fetch resource titles in parallel (batch processing for HTTP calls)
var uniqueResourceIds = flattenedData
.Select(x => x.ft.ResourceId ?? "unknown")
var uniqueResourceIds = aggregatedData
.Select(d => d.ResourceId)
.Distinct()
.Where(id => id != "unknown")
.Where(id => !string.IsNullOrEmpty(id) && id != "unknown")
.ToList();

var resourceTitles = new Dictionary<string, string>();
Expand All @@ -160,46 +111,26 @@ private async Task<List<DailySummaryData>> AggregateDailyDataAsync(
resourceTitles[resourceId] = title;
}

var groupedData = flattenedData
.GroupBy(item => new
{
item.ft.Created.Date,
ServiceOwnerId = serviceOwnerIds.GetValueOrDefault(item.ft.FileTransferId, "unknown"),
ResourceId = item.ft.ResourceId ?? "unknown",
RecipientId = item.recipientId,
RecipientType = GetRecipientType(item.recipientId),
AltinnVersion = AltinnVersion.Altinn3 // Broker only supports Altinn3
})
.Select(g => new DailySummaryData
{
Date = g.Key.Date,
Year = g.Key.Date.Year,
Month = g.Key.Date.Month,
Day = g.Key.Date.Day,
ServiceOwnerId = g.Key.ServiceOwnerId,
ServiceOwnerName = serviceOwnerNames.GetValueOrDefault(g.Key.ServiceOwnerId, "Unknown"),
ResourceId = g.Key.ResourceId,
ResourceTitle = resourceTitles.GetValueOrDefault(g.Key.ResourceId, "Unknown"),
RecipientType = g.Key.RecipientType,
AltinnVersion = g.Key.AltinnVersion,
MessageCount = g.Count(),
DatabaseStorageBytes = CalculateDatabaseStorage(g.Select(x => x.ft).ToList()),
AttachmentStorageBytes = CalculateAttachmentStorage(g.Select(x => x.ft).ToList())
})
.OrderBy(d => d.Date)
.ThenBy(d => d.ServiceOwnerId)
.ThenBy(d => d.ResourceId)
.ThenBy(d => d.RecipientType)
.ThenBy(d => d.AltinnVersion)
.ToList();

return groupedData;
return aggregatedData.Select(d => new DailySummaryData
{
Date = d.Date,
Year = d.Year,
Month = d.Month,
Day = d.Day,
ServiceOwnerId = d.ServiceOwnerId,
ServiceOwnerName = serviceOwnerNames.GetValueOrDefault(d.ServiceOwnerId, "Unknown"),
ResourceId = d.ResourceId,
ResourceTitle = resourceTitles.GetValueOrDefault(d.ResourceId, "Unknown"),
RecipientType = (RecipientType)d.RecipientType,
AltinnVersion = (AltinnVersion)d.AltinnVersion,
MessageCount = d.MessageCount,
DatabaseStorageBytes = d.DatabaseStorageBytes,
AttachmentStorageBytes = d.AttachmentStorageBytes
}).ToList();
}

private async Task<string> GetServiceOwnerIdAsync(FileTransferEntity fileTransfer, CancellationToken cancellationToken)
{
// Get service owner from database resource table (service_owner_id_fk -> service_owner.service_owner_id_pk)
// This matches the mapping: serviceownerorgnr -> service_owner.service_owner_id_pk
try
{
var resource = await resourceRepository.GetResource(fileTransfer.ResourceId, cancellationToken);
Expand Down Expand Up @@ -265,8 +196,6 @@ private async Task<string> GetResourceTitleAsync(string? resourceId, Cancellatio

try
{
// Get service owner name from Resource Registry (like correspondence does)
// This returns the name from HasCompetentAuthority.Name (e.g., "Digitaliseringsdirektoratet", "NAV", etc.)
var serviceOwnerName = await altinnResourceRepository.GetServiceOwnerNameOfResource(resourceId, cancellationToken);
return serviceOwnerName ?? $"Unknown ({resourceId})";
}
Expand All @@ -279,29 +208,23 @@ private async Task<string> GetResourceTitleAsync(string? resourceId, Cancellatio

private long CalculateDatabaseStorage(List<FileTransferEntity> fileTransfers)
{
// TODO: Calculate database storage based on file transfer metadata
// For now, return 0 as placeholder
return 0;
}

private long CalculateAttachmentStorage(List<FileTransferEntity> fileTransfers)
{
// Sum up file transfer sizes
return fileTransfers.Sum(ft => ft.FileTransferSize);
}

private async Task<(string blobUrl, string fileHash, long fileSize)> GenerateAndUploadParquetFile(List<DailySummaryData> summaryData, bool altinn2Included, CancellationToken cancellationToken)
{
// Generate filename with timestamp as prefix and Altinn version indicator
var altinnVersionIndicator = "A3"; // Broker only supports Altinn3
var altinnVersionIndicator = "A3";
var fileName = $"{DateTimeOffset.UtcNow:yyyyMMdd_HHmmss}_daily_summary_report_{altinnVersionIndicator}_{hostEnvironment.EnvironmentName}.parquet";

logger.LogInformation("Generating daily summary parquet file with {count} records for blob storage", summaryData.Count);

// Generate the parquet file as a stream
var (parquetStream, fileHash, fileSize) = await GenerateParquetFileStream(summaryData, altinn2Included, cancellationToken);

// Upload to blob storage - need to add this method to storage service
var blobUrl = await UploadReportFileToStorage(fileName, parquetStream, cancellationToken);

logger.LogInformation("Successfully generated and uploaded daily summary parquet file to blob storage: {blobUrl}", blobUrl);
Expand All @@ -311,8 +234,6 @@ private long CalculateAttachmentStorage(List<FileTransferEntity> fileTransfers)

private async Task<string> UploadReportFileToStorage(string fileName, Stream stream, CancellationToken cancellationToken)
{
// Use Azure Storage directly for reports (similar to correspondence implementation)
// Reports are stored in a "reports" container in the default storage account
try
{
var connectionString = reportStorageOptions.Value.ConnectionString;
Expand All @@ -324,12 +245,10 @@ private async Task<string> UploadReportFileToStorage(string fileName, Stream str
var blobServiceClient = new Azure.Storage.Blobs.BlobServiceClient(connectionString);
var blobContainerClient = blobServiceClient.GetBlobContainerClient("reports");

// Ensure the reports container exists
await blobContainerClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);

var blobClient = blobContainerClient.GetBlobClient(fileName);

// Upload the file
await blobClient.UploadAsync(stream, overwrite: true, cancellationToken);

logger.LogInformation("Successfully uploaded report file to blob storage: {fileName}", fileName);
Expand All @@ -346,7 +265,6 @@ private async Task<string> UploadReportFileToStorage(string fileName, Stream str
{
logger.LogInformation("Generating daily summary parquet file with {count} records", summaryData.Count);

// Convert to parquet-friendly model
var parquetData = summaryData.Select(d => new ParquetDailySummaryData
{
Date = d.Date.ToString("yyyy-MM-dd"),
Expand All @@ -364,17 +282,14 @@ private async Task<string> UploadReportFileToStorage(string fileName, Stream str
AttachmentStorageBytes = d.AttachmentStorageBytes
}).ToList();

// Create a memory stream for the parquet data
var memoryStream = new MemoryStream();

// Write parquet data to memory stream
await ParquetSerializer.SerializeAsync(parquetData, memoryStream, cancellationToken: cancellationToken);
memoryStream.Position = 0; // Reset position for reading
memoryStream.Position = 0;

// Calculate MD5 hash
using var md5 = MD5.Create();
var hash = Convert.ToBase64String(md5.ComputeHash(memoryStream.ToArray()));
memoryStream.Position = 0; // Reset position for reading
memoryStream.Position = 0;

logger.LogInformation("Successfully generated daily summary parquet file stream");

Expand All @@ -389,25 +304,21 @@ public async Task<OneOf<GenerateAndDownloadDailySummaryReportResponse, Error>> P

try
{
// Get file transfers data with service owner IDs (optimized to avoid N+1 queries)
var (fileTransfers, serviceOwnerIds) = await fileTransferRepository.GetFileTransfersForReportWithServiceOwnerIds(cancellationToken);
var aggregatedData = await fileTransferRepository.GetAggregatedDailySummaryData(cancellationToken);

if (!fileTransfers.Any())
if (!aggregatedData.Any())
{
logger.LogWarning("No file transfers found for report generation");
return StatisticsErrors.NoFileTransfersFound;
}

logger.LogInformation("Found {count} file transfers for report generation", fileTransfers.Count);
logger.LogInformation("Found {count} aggregated records for report generation", aggregatedData.Count);

// Aggregate data by day and service owner (using pre-fetched service owner IDs for performance)
var summaryData = await AggregateDailyDataAsync(fileTransfers, serviceOwnerIds, cancellationToken);
var summaryData = await EnrichAggregatedDataAsync(aggregatedData, cancellationToken);

// Generate the parquet file as a stream
var (parquetStream, fileHash, fileSize) = await GenerateParquetFileStream(summaryData, request.Altinn2Included, cancellationToken);

// Generate filename
var altinnVersionIndicator = "A3"; // Broker only supports Altinn3
var altinnVersionIndicator = "A3";
var fileName = $"{DateTimeOffset.UtcNow:yyyyMMdd_HHmmss}_daily_summary_report_{altinnVersionIndicator}_{hostEnvironment.EnvironmentName}.parquet";

var response = new GenerateAndDownloadDailySummaryReportResponse
Expand All @@ -420,7 +331,7 @@ public async Task<OneOf<GenerateAndDownloadDailySummaryReportResponse, Error>> P
TotalFileTransferCount = summaryData.Sum(d => d.MessageCount),
GeneratedAt = DateTimeOffset.UtcNow,
Environment = hostEnvironment.EnvironmentName ?? "Unknown",
Altinn2Included = false // Broker only supports Altinn3
Altinn2Included = false
};

logger.LogInformation("Successfully generated daily summary report for download with {serviceOwnerCount} service owners and {totalCount} file transfers",
Expand Down
17 changes: 17 additions & 0 deletions src/Altinn.Broker.Core/Repositories/IFileTransferRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,21 @@ CancellationToken cancellationToken
);
Task SetFileTransferHangfireJobId(Guid fileTransferId, string hangfireJobId, CancellationToken cancellationToken);
Task<(List<FileTransferEntity> FileTransfers, Dictionary<Guid, string> ServiceOwnerIds)> GetFileTransfersForReportWithServiceOwnerIds(CancellationToken cancellationToken);
Task<List<AggregatedDailySummaryData>> GetAggregatedDailySummaryData(CancellationToken cancellationToken);
}

public class AggregatedDailySummaryData
{
public DateTime Date { get; set; }
public int Year { get; set; }
public int Month { get; set; }
public int Day { get; set; }
public string ServiceOwnerId { get; set; } = string.Empty;
public string ResourceId { get; set; } = string.Empty;
public string RecipientId { get; set; } = string.Empty;
public int RecipientType { get; set; }
public int AltinnVersion { get; set; }
public int MessageCount { get; set; }
public long DatabaseStorageBytes { get; set; }
public long AttachmentStorageBytes { get; set; }
}
Loading
Loading