From 4cab5b442f40baf8011b9b7c2ec064097a52b314 Mon Sep 17 00:00:00 2001 From: Andrii Chebukin Date: Sun, 24 Mar 2024 02:36:58 +0400 Subject: [PATCH] Updated WebSocket reading logic to eliminate large arrays allocation (#464) --- Packages.props | 1 + ...harp.Data.GraphQL.Server.AspNetCore.fsproj | 1 + .../GraphQLOptions.fs | 2 + .../GraphQLWebsocketMiddleware.fs | 63 +++++++++++-------- .../StartupExtensions.fs | 3 +- 5 files changed, 42 insertions(+), 28 deletions(-) diff --git a/Packages.props b/Packages.props index 1932e5cd7..68cb08a79 100644 --- a/Packages.props +++ b/Packages.props @@ -9,6 +9,7 @@ 6.* + contentFiles diff --git a/src/FSharp.Data.GraphQL.Server.AspNetCore/FSharp.Data.GraphQL.Server.AspNetCore.fsproj b/src/FSharp.Data.GraphQL.Server.AspNetCore/FSharp.Data.GraphQL.Server.AspNetCore.fsproj index 330736346..0639958b9 100644 --- a/src/FSharp.Data.GraphQL.Server.AspNetCore/FSharp.Data.GraphQL.Server.AspNetCore.fsproj +++ b/src/FSharp.Data.GraphQL.Server.AspNetCore/FSharp.Data.GraphQL.Server.AspNetCore.fsproj @@ -36,6 +36,7 @@ + diff --git a/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLOptions.fs b/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLOptions.fs index 537db5fdc..0690a2a24 100644 --- a/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLOptions.fs +++ b/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLOptions.fs @@ -21,6 +21,8 @@ type IGraphQLOptions = type GraphQLOptions<'Root> = { SchemaExecutor : Executor<'Root> RootFactory : HttpContext -> 'Root + /// The minimum rented array size to read a message from WebSocket + ReadBufferSize : int SerializerOptions : JsonSerializerOptions WebsocketOptions : GraphQLTransportWSOptions } with diff --git a/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLWebsocketMiddleware.fs b/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLWebsocketMiddleware.fs index 6713725a7..7e225cf6a 100644 --- a/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLWebsocketMiddleware.fs +++ b/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLWebsocketMiddleware.fs @@ -1,7 +1,10 @@ namespace FSharp.Data.GraphQL.Server.AspNetCore open System +open System.Buffers open System.Collections.Generic +open System.Diagnostics +open System.Linq open System.Net.WebSockets open System.Text.Json open System.Text.Json.Serialization @@ -11,6 +14,8 @@ open Microsoft.AspNetCore.Http open Microsoft.Extensions.Hosting open Microsoft.Extensions.Logging open Microsoft.Extensions.Options + +open Collections.Pooled open FsToolkit.ErrorHandling open FSharp.Data.GraphQL @@ -47,9 +52,9 @@ type GraphQLWebSocketMiddleware<'Root> static let invalidJsonInClientMessageError = Result.Error <| InvalidMessage (4400, "Invalid json in client message") - let deserializeClientMessage (serializerOptions : JsonSerializerOptions) (msg : string) = taskResult { + let deserializeClientMessage (serializerOptions : JsonSerializerOptions) (msg : IReadOnlyPooledList) = taskResult { try - return JsonSerializer.Deserialize (msg, serializerOptions) + return JsonSerializer.Deserialize (msg.Span, serializerOptions) with | :? InvalidWebsocketMessageException as ex -> logger.LogError(ex, "Invalid websocket message:\n{payload}", msg) @@ -75,31 +80,35 @@ type GraphQLWebSocketMiddleware<'Root> && not (theSocket.State = WebSocketState.Closed) let receiveMessageViaSocket (cancellationToken : CancellationToken) (serializerOptions : JsonSerializerOptions) (socket : WebSocket) = taskResult { - let buffer = Array.zeroCreate 4096 - let completeMessage = new List () - let mutable segmentResponse : WebSocketReceiveResult = null - while (not cancellationToken.IsCancellationRequested) - && socket |> isSocketOpen - && ((segmentResponse = null) - || (not segmentResponse.EndOfMessage)) do - try - let! r = socket.ReceiveAsync (new ArraySegment (buffer), cancellationToken) - segmentResponse <- r - completeMessage.AddRange (new ArraySegment (buffer, 0, r.Count)) - with :? OperationCanceledException -> - () - - // TODO: Allocate string only if a debugger is attached - let message = - completeMessage - |> Seq.filter (fun x -> x > 0uy) - |> Array.ofSeq - |> System.Text.Encoding.UTF8.GetString - if String.IsNullOrWhiteSpace message then - return ValueNone - else - let! result = message |> deserializeClientMessage serializerOptions - return ValueSome result + let buffer = ArrayPool.Shared.Rent options.ReadBufferSize + try + let completeMessage = new PooledList () + let mutable segmentResponse : WebSocketReceiveResult = null + while (not cancellationToken.IsCancellationRequested) + && socket |> isSocketOpen + && ((segmentResponse = null) + || (not segmentResponse.EndOfMessage)) do + try + let! r = socket.ReceiveAsync (new ArraySegment (buffer), cancellationToken) + segmentResponse <- r + completeMessage.AddRange (new ArraySegment (buffer, 0, r.Count)) + with :? OperationCanceledException -> + () + + if Debugger.IsAttached then + let message = + completeMessage + |> Seq.filter (fun x -> x > 0uy) + |> Array.ofSeq + |> System.Text.Encoding.UTF8.GetString + logger.LogInformation ("-> Request: {request}", message) + if completeMessage.All(fun b -> b = 0uy) then + return ValueNone + else + let! result = deserializeClientMessage serializerOptions completeMessage + return ValueSome result + finally + ArrayPool.Shared.Return buffer } let sendMessageViaSocket (jsonSerializerOptions) (socket : WebSocket) (message : ServerMessage) : Task = task { diff --git a/src/FSharp.Data.GraphQL.Server.AspNetCore/StartupExtensions.fs b/src/FSharp.Data.GraphQL.Server.AspNetCore/StartupExtensions.fs index f539b73b7..8ed2a9d3e 100644 --- a/src/FSharp.Data.GraphQL.Server.AspNetCore/StartupExtensions.fs +++ b/src/FSharp.Data.GraphQL.Server.AspNetCore/StartupExtensions.fs @@ -6,8 +6,8 @@ open System.Runtime.CompilerServices open Microsoft.AspNetCore.Builder open Microsoft.AspNetCore.Http open Microsoft.Extensions.DependencyInjection -open FSharp.Data.GraphQL open Microsoft.Extensions.Options +open FSharp.Data.GraphQL [] module ServiceCollectionExtensions = @@ -15,6 +15,7 @@ module ServiceCollectionExtensions = let createStandardOptions executor rootFactory endpointUrl = { SchemaExecutor = executor RootFactory = rootFactory + ReadBufferSize = 4096 SerializerOptions = Json.serializerOptions WebsocketOptions = { EndpointUrl = endpointUrl