Skip to content

Source from IAsyncEnumerable is not disposed on shutdown, while interrupted because of back-pressure #6903

Closed
@kovalikp

Description

@kovalikp

Version Information
1.5.12
Akka, Akka.Streams

Describe the bug
Assume stream composed of following:

  1. Source constructed from IAsyncEnumerable<>.
  2. Stream has a kill switch.
  3. Sink is an actor that sends back back-pressure signal (using acknowledge message).

When:

  1. Source is paused/interrupted because sink does not signal acknowledge message.
  2. Stream is shut down via kill switch.

Then:

  1. Enumerator provided by IAsyncEnumerable<> does not complete enumeration, leaving any resource allocated withing the enumerator not disposed. In my case this would be EF Core DbContext constructed via IDbContextFactory<>.

To Reproduce

using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.Streams;
using Akka.Streams.Dsl;
using System.Runtime.CompilerServices;

const bool interruptSinkActor = false;
using var system = ActorSystem.Create("Streams");

var sinkActorRef = system.ActorOf(act =>
{
    act.ReceiveAny((message, context) =>
    {
        switch (message)
        {
            case int value:
                Console.WriteLine(value);
                if (interruptSinkActor && value > 50)
                    Console.WriteLine("Interrupted. Press [Enter] to shutdown.");
                else
                    context.Sender.Tell(new Ack());
                break;

            case OnInit:
                context.Sender.Tell(new Ack());
                break;

            case Complete:
                break;
        }
    });
});

var stream = Source.From(() => Enumerate(default))
    .ViaMaterialized(KillSwitches.Single<int>(), Keep.Right)
    .ToMaterialized(
        Sink.ActorRefWithAck<int>(
            actorRef: sinkActorRef,
            onInitMessage: new OnInit(),
            ackMessage: new Ack(),
            onCompleteMessage: new Complete()),
        Keep.Left);

var killSwitch = stream.Run(system);

Console.ReadLine(); 
killSwitch.Shutdown();
await system.Terminate();
Console.WriteLine("System terminated.");

async static IAsyncEnumerable<int> Enumerate([EnumeratorCancellation] CancellationToken cancellationToken)
{
    await using var resource = new Resource(); 
    foreach (var i in Enumerable.Range(0, 100))
    {
        await Task.Delay(1, cancellationToken).ConfigureAwait(false);
        yield return i;
    }
}

public class Resource : IAsyncDisposable
{
    public ValueTask DisposeAsync()
    {
        Console.WriteLine("Enumerator completed and resource disposed");
        return ValueTask.CompletedTask;
    }
}

public record class OnInit();

public record class Ack();

public record class Complete();

Links to working reproductions on Github / Gitlab are very much appreciated

Expected behavior
Async enumerator should be either disposed, or maybe awaited next via MoveNextAsync with cancellation triggered, so that enumerator is correctly completed. Resources allocated within enumerator are thus correctly disposed.

Actual behavior
Async enumerator is left uncompleted. Resources allocated within enumerator are not disposed.

Screenshots
Interrupted enumerator:
image
Completed enumerator:
image

Environment
All environments are impacted.

Additional context
Disposal of the stream was removed previously by #6290, related to #6280.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions