Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 78b8386

Browse files
sleipnirpolvalente
andauthoredMay 14, 2025··
[Feat]: Add support for streams pipelines (#418)
* feat: added stream pipeline support * feat: add streams app example * chore: added reflection to hello streams example for easy testing * feat: added new factory function * feat: added support for header propagation * feat: added support for header propagation * chore: adjusts * feat: added new replace operator * new line * refactor: propagate context option * refactor: rename functions * refactor: removing the destructive version * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * adjusts * refactor: separating unary from non-unary * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * chore: remove reject function * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * chore: format * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * refactor: rename operator reject to via * chore: pointing flow docs * fix: correct api examples * chore: change ask and join_with apis * refactor: rename single to unary * refactor: rename single to unary in tests * chore: remove unascessary function * chore: removes a not so necessary function * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * fix: format --------- Co-authored-by: Adriano Santos <solid.sistemas@gmail.com> Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>
1 parent 917f3ff commit 78b8386

File tree

20 files changed

+1127
-4
lines changed

20 files changed

+1127
-4
lines changed
 
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Used by "mix format"
2+
[
3+
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
4+
]
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# The directory Mix will write compiled artifacts to.
2+
/_build/
3+
4+
# If you run "mix test --cover", coverage assets end up here.
5+
/cover/
6+
7+
# The directory Mix downloads your dependencies sources to.
8+
/deps/
9+
10+
# Where third-party dependencies like ExDoc output generated docs.
11+
/doc/
12+
13+
# Ignore .fetch files in case you like to edit your project deps locally.
14+
/.fetch
15+
16+
# If the VM crashes, it generates a dump, let's ignore it too.
17+
erl_crash.dump
18+
19+
# Also ignore archive artifacts (built via "mix archive.build").
20+
*.ez
21+
22+
# Ignore package tarball (built via "mix hex.build").
23+
helloworld_streams-*.tar
24+
25+
# Temporary files, for example, from tests.
26+
/tmp/

‎examples/helloworld_streams/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# HelloworldStreams
2+
3+
**TODO: Add description**
4+
5+
## Installation
6+
7+
If [available in Hex](https://hex.pm/docs/publish), the package can be installed
8+
by adding `helloworld_streams` to your list of dependencies in `mix.exs`:
9+
10+
```elixir
11+
def deps do
12+
[
13+
{:helloworld_streams, "~> 0.1.0"}
14+
]
15+
end
16+
```
17+
18+
Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
19+
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
20+
be found at <https://hexdocs.pm/helloworld_streams>.
21+
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
defmodule HelloworldStreams do
2+
@moduledoc """
3+
Documentation for `HelloworldStreams`.
4+
"""
5+
end
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
defmodule HelloworldStreams.Application do
2+
@moduledoc false
3+
use Application
4+
5+
@impl true
6+
def start(_type, _args) do
7+
children = [
8+
HelloworldStreams.Utils.Transformer,
9+
GrpcReflection,
10+
{
11+
GRPC.Server.Supervisor,
12+
endpoint: HelloworldStreams.Endpoint, port: 50053, start_server: true
13+
}
14+
]
15+
16+
opts = [strategy: :one_for_one, name: HelloworldStreams.Supervisor]
17+
Supervisor.start_link(children, opts)
18+
end
19+
end
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
defmodule HelloworldStreams.Endpoint do
2+
@moduledoc false
3+
use GRPC.Endpoint
4+
5+
intercept(GRPC.Server.Interceptors.Logger)
6+
run(HelloworldStreams.Utils.Reflection)
7+
run(HelloworldStreams.Server)
8+
end
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
defmodule HelloworldStreams.Server do
2+
@moduledoc """
3+
gRPC service for streaming data.
4+
"""
5+
use GRPC.Server, service: Stream.EchoServer.Service
6+
7+
alias HelloworldStreams.Utils.Transformer
8+
alias GRPC.Stream, as: GRPCStream
9+
10+
alias Stream.HelloRequest
11+
alias Stream.HelloReply
12+
13+
@spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
14+
def say_unary_hello(request, _materializer) do
15+
GRPCStream.unary(request)
16+
|> GRPCStream.ask(Transformer)
17+
|> GRPCStream.map(fn %HelloReply{} = reply ->
18+
%HelloReply{message: "[Reply] #{reply.message}"}
19+
end)
20+
|> GRPCStream.run()
21+
end
22+
23+
@spec say_server_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
24+
def say_server_hello(request, materializer) do
25+
create_output_stream(request)
26+
|> GRPCStream.from()
27+
|> GRPCStream.run_with(materializer)
28+
end
29+
30+
defp create_output_stream(msg) do
31+
Stream.repeatedly(fn ->
32+
index = :rand.uniform(10)
33+
%HelloReply{message: "[#{index}] I'm the Server for #{msg.name}"}
34+
end)
35+
|> Stream.take(10)
36+
|> Enum.to_list()
37+
end
38+
39+
@spec say_bid_stream_hello(Enumerable.t(), GRPC.Server.Stream.t()) :: any()
40+
def say_bid_stream_hello(request, materializer) do
41+
# simulate a infinite stream of data
42+
# this is a simple example, in a real world application
43+
# you would probably use a GenStage or similar
44+
# to handle the stream of data
45+
output_stream =
46+
Stream.repeatedly(fn ->
47+
index = :rand.uniform(10)
48+
%HelloReply{message: "[#{index}] I'm the Server ;)"}
49+
end)
50+
51+
GRPCStream.from(request, join_with: output_stream)
52+
|> GRPCStream.map(fn
53+
%HelloRequest{} = hello ->
54+
%HelloReply{message: "Welcome #{hello.name}"}
55+
56+
output_item ->
57+
output_item
58+
end)
59+
|> GRPCStream.run_with(materializer)
60+
end
61+
end
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
defmodule Stream.HelloRequest do
2+
@moduledoc false
3+
4+
use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3
5+
6+
def descriptor do
7+
# credo:disable-for-next-line
8+
%Google.Protobuf.DescriptorProto{
9+
name: "HelloRequest",
10+
field: [
11+
%Google.Protobuf.FieldDescriptorProto{
12+
name: "name",
13+
extendee: nil,
14+
number: 1,
15+
label: :LABEL_OPTIONAL,
16+
type: :TYPE_STRING,
17+
type_name: nil,
18+
default_value: nil,
19+
options: nil,
20+
oneof_index: nil,
21+
json_name: "name",
22+
proto3_optional: nil,
23+
__unknown_fields__: []
24+
}
25+
],
26+
nested_type: [],
27+
enum_type: [],
28+
extension_range: [],
29+
extension: [],
30+
options: nil,
31+
oneof_decl: [],
32+
reserved_range: [],
33+
reserved_name: [],
34+
__unknown_fields__: []
35+
}
36+
end
37+
38+
field(:name, 1, type: :string)
39+
end
40+
41+
defmodule Stream.HelloReply do
42+
@moduledoc false
43+
44+
use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3
45+
46+
def descriptor do
47+
# credo:disable-for-next-line
48+
%Google.Protobuf.DescriptorProto{
49+
name: "HelloReply",
50+
field: [
51+
%Google.Protobuf.FieldDescriptorProto{
52+
name: "message",
53+
extendee: nil,
54+
number: 1,
55+
label: :LABEL_OPTIONAL,
56+
type: :TYPE_STRING,
57+
type_name: nil,
58+
default_value: nil,
59+
options: nil,
60+
oneof_index: nil,
61+
json_name: "message",
62+
proto3_optional: nil,
63+
__unknown_fields__: []
64+
}
65+
],
66+
nested_type: [],
67+
enum_type: [],
68+
extension_range: [],
69+
extension: [],
70+
options: nil,
71+
oneof_decl: [],
72+
reserved_range: [],
73+
reserved_name: [],
74+
__unknown_fields__: []
75+
}
76+
end
77+
78+
field(:message, 1, type: :string)
79+
end
80+
81+
defmodule Stream.EchoServer.Service do
82+
@moduledoc false
83+
84+
use GRPC.Service, name: "stream.EchoServer", protoc_gen_elixir_version: "0.14.0"
85+
86+
def descriptor do
87+
# credo:disable-for-next-line
88+
%Google.Protobuf.ServiceDescriptorProto{
89+
name: "EchoServer",
90+
method: [
91+
%Google.Protobuf.MethodDescriptorProto{
92+
name: "SayUnaryHello",
93+
input_type: ".stream.HelloRequest",
94+
output_type: ".stream.HelloReply",
95+
options: %Google.Protobuf.MethodOptions{
96+
deprecated: false,
97+
idempotency_level: :IDEMPOTENCY_UNKNOWN,
98+
features: nil,
99+
uninterpreted_option: [],
100+
__pb_extensions__: %{},
101+
__unknown_fields__: []
102+
},
103+
client_streaming: false,
104+
server_streaming: false,
105+
__unknown_fields__: []
106+
},
107+
%Google.Protobuf.MethodDescriptorProto{
108+
name: "SayServerHello",
109+
input_type: ".stream.HelloRequest",
110+
output_type: ".stream.HelloReply",
111+
options: %Google.Protobuf.MethodOptions{
112+
deprecated: false,
113+
idempotency_level: :IDEMPOTENCY_UNKNOWN,
114+
features: nil,
115+
uninterpreted_option: [],
116+
__pb_extensions__: %{},
117+
__unknown_fields__: []
118+
},
119+
client_streaming: false,
120+
server_streaming: true,
121+
__unknown_fields__: []
122+
},
123+
%Google.Protobuf.MethodDescriptorProto{
124+
name: "SayBidStreamHello",
125+
input_type: ".stream.HelloRequest",
126+
output_type: ".stream.HelloReply",
127+
options: %Google.Protobuf.MethodOptions{
128+
deprecated: false,
129+
idempotency_level: :IDEMPOTENCY_UNKNOWN,
130+
features: nil,
131+
uninterpreted_option: [],
132+
__pb_extensions__: %{},
133+
__unknown_fields__: []
134+
},
135+
client_streaming: true,
136+
server_streaming: true,
137+
__unknown_fields__: []
138+
}
139+
],
140+
options: nil,
141+
__unknown_fields__: []
142+
}
143+
end
144+
145+
rpc(:SayUnaryHello, Stream.HelloRequest, Stream.HelloReply)
146+
147+
rpc(:SayServerHello, Stream.HelloRequest, stream(Stream.HelloReply))
148+
149+
rpc(:SayBidStreamHello, stream(Stream.HelloRequest), stream(Stream.HelloReply))
150+
end
151+
152+
defmodule Stream.EchoServer.Stub do
153+
@moduledoc false
154+
155+
use GRPC.Stub, service: Stream.EchoServer.Service
156+
end
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
defmodule HelloworldStreams.Utils.Reflection do
2+
@moduledoc """
3+
gRPC reflection server.
4+
"""
5+
use GrpcReflection.Server,
6+
version: :v1,
7+
services: [Stream.EchoServer.Service]
8+
end
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
defmodule HelloworldStreams.Utils.Transformer do
2+
@moduledoc """
3+
`Transformer` GenServer for example purposes.
4+
"""
5+
use GenServer
6+
7+
alias Stream.HelloRequest
8+
alias Stream.HelloReply
9+
10+
def start_link(_) do
11+
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
12+
end
13+
14+
def init(_), do: {:ok, %{}}
15+
16+
def handle_info({:request, %HelloRequest{} = value, from}, state) do
17+
Process.send(from, {:response, %HelloReply{message: "Hello #{value.name}"}}, [])
18+
{:noreply, state}
19+
end
20+
end

‎examples/helloworld_streams/mix.exs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
defmodule HelloworldStreams.MixProject do
2+
use Mix.Project
3+
4+
def project do
5+
[
6+
app: :helloworld_streams,
7+
version: "0.1.0",
8+
elixir: "~> 1.15",
9+
start_permanent: Mix.env() == :prod,
10+
deps: deps()
11+
]
12+
end
13+
14+
def application do
15+
[
16+
extra_applications: [:logger],
17+
mod: {HelloworldStreams.Application, []}
18+
]
19+
end
20+
21+
defp deps do
22+
[
23+
{:grpc, path: "../../", override: true},
24+
{:protobuf, "~> 0.14"},
25+
{:grpc_reflection, "~> 0.1"}
26+
]
27+
end
28+
end

‎examples/helloworld_streams/mix.lock

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
%{
2+
"cowboy": {:hex, :cowboy, "2.13.0", "09d770dd5f6a22cc60c071f432cd7cb87776164527f205c5a6b0f24ff6b38990", [:make, :rebar3], [{:cowlib, ">= 2.14.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, ">= 1.8.0 and < 3.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "e724d3a70995025d654c1992c7b11dbfea95205c047d86ff9bf1cda92ddc5614"},
3+
"cowlib": {:hex, :cowlib, "2.15.0", "3c97a318a933962d1c12b96ab7c1d728267d2c523c25a5b57b0f93392b6e9e25", [:make, :rebar3], [], "hexpm", "4f00c879a64b4fe7c8fcb42a4281925e9ffdb928820b03c3ad325a617e857532"},
4+
"flow": {:hex, :flow, "1.2.4", "1dd58918287eb286656008777cb32714b5123d3855956f29aa141ebae456922d", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm", "874adde96368e71870f3510b91e35bc31652291858c86c0e75359cbdd35eb211"},
5+
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
6+
"google_protos": {:hex, :google_protos, "0.3.0", "15faf44dce678ac028c289668ff56548806e313e4959a3aaf4f6e1ebe8db83f4", [:mix], [{:protobuf, "~> 0.10", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "1f6b7fb20371f72f418b98e5e48dae3e022a9a6de1858d4b254ac5a5d0b4035f"},
7+
"grpc_reflection": {:hex, :grpc_reflection, "0.1.5", "d00cdf8ef2638edb9578248eedc742e1b34eda9100e61be764c552c10f4b46cb", [:mix], [{:grpc, "~> 0.9", [hex: :grpc, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.14", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "848334d16029aee33728603be6171fc8bfcdfa3508cd6885ec1729e2e6ac60a5"},
8+
"gun": {:hex, :gun, "2.2.0", "b8f6b7d417e277d4c2b0dc3c07dfdf892447b087f1cc1caff9c0f556b884e33d", [:make, :rebar3], [{:cowlib, ">= 2.15.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "76022700c64287feb4df93a1795cff6741b83fb37415c40c34c38d2a4645261a"},
9+
"hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"},
10+
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
11+
"protobuf": {:hex, :protobuf, "0.14.1", "9ac0582170df27669ccb2ef6cb0a3d55020d58896edbba330f20d0748881530a", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "39a9d49d346e3ed597e5ae3168a43d9603870fc159419617f584cdf6071f0e25"},
12+
"ranch": {:hex, :ranch, "2.2.0", "25528f82bc8d7c6152c57666ca99ec716510fe0925cb188172f41ce93117b1b0", [:make, :rebar3], [], "hexpm", "fa0b99a1780c80218a4197a59ea8d3bdae32fbff7e88527d7d8a4787eff4f8e7"},
13+
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
14+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
syntax = "proto3";
2+
3+
package stream;
4+
5+
message HelloRequest {
6+
string name = 1;
7+
}
8+
9+
message HelloReply {
10+
string message = 1;
11+
}
12+
13+
service EchoServer {
14+
rpc SayUnaryHello (HelloRequest) returns (HelloReply) {}
15+
rpc SayServerHello (HelloRequest) returns (stream HelloReply) {}
16+
rpc SayBidStreamHello (stream HelloRequest) returns (stream HelloReply) {}
17+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
defmodule HelloworldStreamsTest do
2+
use ExUnit.Case
3+
doctest HelloworldStreams
4+
end
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ExUnit.start()

‎lib/grpc/stream.ex

Lines changed: 362 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,375 @@
11
defmodule GRPC.Stream do
22
@moduledoc """
3-
Some useful operations for streams.
3+
Provides a `Flow`-based abstraction layer for building gRPC streaming pipelines in Elixir.
4+
5+
This module allows you to consume gRPC request streams as `Flow` pipelines with support for
6+
backpressure via GenStage. You can also produce gRPC responses by materializing a `Flow`
7+
back into the gRPC stream.
8+
9+
## Capabilities
10+
11+
- Transforms an incoming gRPC request stream into a `Flow` with backpressure.
12+
- Emits messages back into the gRPC response stream using `run_with/3`.
13+
- Supports joining with external producers (e.g., RabbitMQ, Kafka) for unbounded or fan-in stream sources.
14+
- Offers composable functional operators (`map/2`, `filter/2`, `flat_map/2`, etc.) on the stream.
15+
16+
## Example: Bidirectional Streaming
17+
18+
defmodule MyGRPCService do
19+
use GRPC.Server, service: MyService.Service
20+
21+
def route_chat(input, materializer) do
22+
GRPC.Stream.from(input, max_demand: 10)
23+
|> GRPC.Stream.map(fn note -> process_note(note) end)
24+
|> GRPC.Stream.run_with(materializer)
25+
end
26+
27+
defp process_note(note), do: %Response{message: "Received"}
28+
end
29+
30+
## Example: Joining with an External Producer
31+
32+
When integrating with external unbounded sources (e.g., message queues),
33+
you can pass a running `GenStage` producer using the `:join_with` option:
34+
35+
defmodule MyGRPCService do
36+
use GRPC.Server, service: MyService.Service
37+
38+
def stream_events(input, materializer) do
39+
{:ok, pid} = MyApp.RabbitMQ.Producer.start_link([])
40+
41+
GRPC.Stream.from(input, join_with: pid, max_demand: 10)
42+
|> GRPC.Stream.map(&handle_event/1)
43+
|> GRPC.Stream.run_with(materializer)
44+
end
45+
46+
defp handle_event({_, msg}), do: msg
47+
defp handle_event(event), do: %MyGRPC.Event{data: inspect(event)}
48+
end
49+
"""
50+
alias GRPC.Stream.Operators
51+
alias GRPC.Server.Stream, as: Materializer
52+
53+
defstruct flow: nil, options: [], metadata: %{}
54+
55+
@type t :: %__MODULE__{flow: Flow.t(), options: Keyword.t(), metadata: map()}
56+
57+
@type item :: any()
58+
59+
@type reason :: any()
60+
61+
@doc """
62+
Converts a gRPC input into a `Flow` pipeline with backpressure support.
63+
64+
## Parameters
65+
66+
- `input`: A gRPC request stream (struct, enumerable, or Elixir `Stream`).
67+
68+
## Options
69+
70+
- `:join_with` — An optional external `GenStage` producer to merge with the gRPC input.
71+
- `:dispatcher` — Specifies the `Flow` dispatcher (defaults to `GenStage.DemandDispatcher`).
72+
- `:propagate_context` - If `true`, the context from the `materializer` is propagated to the `Flow`.
73+
- `:materializer` - The `%GRPC.Server.Stream{}` struct representing the current gRPC stream context.
74+
75+
And any other options supported by `Flow`.
76+
77+
## Returns
78+
79+
A `GRPC.Stream` struct that represents the transformed stream.
80+
81+
## Example
82+
83+
flow = GRPC.Stream.from(request, max_demand: 50)
84+
"""
85+
@spec from(any(), Keyword.t()) :: t()
86+
def from(input, opts \\ [])
87+
88+
def from(%Elixir.Stream{} = input, opts), do: build_grpc_stream(input, opts)
89+
90+
def from(input, opts) when is_list(input), do: build_grpc_stream(input, opts)
91+
92+
def from(input, opts) when not is_nil(input), do: from([input], opts)
93+
94+
@doc """
95+
Converts a single gRPC request into a `Flow` pipeline with support for backpressure.
96+
This is useful for unary gRPC requests where you want to use the Flow API.
97+
98+
## Parameters
99+
100+
- `input`: The single gRPC message to convert into a Flow.
101+
102+
## Options
103+
104+
- `:join_with` - An optional additional producer stage PID to include in the Flow.
105+
- `:dispatcher` - An optional `GenStage` dispatcher to use in the underlying `Flow`. Defaults to `GenStage.DemandDispatcher`.
106+
- `:propagate_context` - If `true`, the context from the `materializer` is propagated to the `Flow`.
107+
- `:materializer` - The `%GRPC.Server.Stream{}` struct representing the current gRPC stream context.
108+
109+
And any other options supported by `Flow`.
110+
111+
## Returns
112+
- A `GRPCStream` that emits the single gRPC message under demand.
113+
114+
## Example
115+
116+
flow = GRPCStream.single(request, max_demand: 5)
117+
"""
118+
@spec unary(any(), Keyword.t()) :: t()
119+
def unary(input, opts \\ []) when is_struct(input),
120+
do: build_grpc_stream([input], Keyword.merge(opts, unary: true))
121+
122+
@doc """
123+
Extracts the underlying `Flow` pipeline from a `GRPC.Stream`.
124+
125+
Raises an `ArgumentError` if the `Flow` has not been initialized.
126+
127+
## Returns
128+
129+
A `Flow` pipeline.
130+
"""
131+
@spec to_flow(t()) :: Flow.t()
132+
def to_flow(%__MODULE__{flow: flow}) when is_nil(flow), do: Flow.from_enumerable([])
133+
134+
def to_flow(%__MODULE__{flow: flow}), do: flow
135+
136+
@doc """
137+
Executes the underlying `Flow` for unary streams and emits responses into the provided gRPC server stream.
138+
139+
## Parameters
140+
141+
- `flow`: A `GRPC.Stream` struct containing the flow to be executed.
142+
- `stream`: A `GRPC.Server.Stream` to which responses are sent.
143+
- `:dry_run` — If `true`, responses are not sent (used for testing or inspection).
144+
145+
## Example
146+
147+
GRPC.Stream.run(request)
148+
"""
149+
@spec run(t()) :: any()
150+
def run(%__MODULE__{flow: flow, options: opts}) do
151+
unless Keyword.get(opts, :unary, false) do
152+
raise ArgumentError, "run/2 is not supported for non-unary streams"
153+
end
154+
155+
# We have to call `Enum.to_list` because we want to actually run and materialize the full stream.
156+
# List.flatten and List.first are used so that we can obtain the first result of the materialized list.
157+
flow
158+
|> Enum.to_list()
159+
|> List.flatten()
160+
|> List.first()
161+
end
162+
163+
@doc """
164+
Executes the flow and emits responses into the provided gRPC server stream.
165+
166+
## Parameters
167+
168+
- `flow`: A `GRPC.Stream` struct containing the flow to be executed.
169+
- `stream`: A `GRPC.Server.Stream` to which responses are sent.
170+
171+
## Options
172+
173+
- `:dry_run` — If `true`, responses are not sent (used for testing or inspection).
174+
175+
## Returns
176+
177+
- `:ok` if the stream was processed successfully.
178+
179+
## Example
180+
181+
GRPC.Stream.run_with(request, mat)
4182
"""
183+
@spec run_with(t(), Stream.t(), Keyword.t()) :: :ok
184+
def run_with(
185+
%__MODULE__{flow: flow, options: flow_opts} = _stream,
186+
%Materializer{} = from,
187+
opts \\ []
188+
) do
189+
if not Keyword.get(flow_opts, :unary, true) do
190+
raise ArgumentError, "run_with/3 is not supported for unary streams"
191+
end
192+
193+
dry_run? = Keyword.get(opts, :dry_run, false)
194+
195+
flow
196+
|> Flow.map(fn msg ->
197+
if not dry_run? do
198+
send_response(from, msg)
199+
end
200+
201+
flow
202+
end)
203+
|> Flow.run()
204+
end
205+
206+
@doc """
207+
Sends a request to an external process and awaits a response.
208+
209+
If `target` is a PID, a message in the format `{:request, item, from}` is sent, and a reply
210+
in the format `{:response, msg}` is expected.
211+
212+
If `target` is an `atom` we will try to locate the process through `Process.whereis/1`.
213+
214+
## Parameters
215+
216+
- `stream`: The `GRPC.Stream` pipeline.
217+
- `target`: Target process PID or atom name.
218+
- `timeout`: Timeout in milliseconds (defaults to `5000`).
219+
220+
## Returns
221+
222+
- Updated stream if successful.
223+
- `{:error, item, reason}` if the request fails or times out.
224+
"""
225+
@spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, item(), reason()}
226+
defdelegate ask(stream, target, timeout \\ 5000), to: Operators
227+
228+
@doc """
229+
Same as `ask/3`, but raises an exception on failure.
230+
231+
## Caution
232+
233+
This version propagates errors via raised exceptions, which can crash the Flow worker and halt the pipeline.
234+
Prefer `ask/3` for production usage unless failure should abort the stream.
235+
"""
236+
@spec ask!(t(), pid | atom, non_neg_integer) :: t()
237+
defdelegate ask!(stream, target, timeout \\ 5000), to: Operators
5238

6239
@doc """
7-
Get headers from server stream.
240+
Filters the stream using the given predicate function.
241+
242+
The filter function is applied concurrently to the stream entries, so it shouldn't rely on execution order.
243+
"""
244+
@spec filter(t(), (term -> term)) :: t
245+
defdelegate filter(stream, filter), to: Operators
246+
247+
@doc """
248+
Applies a function to each entry and concatenates the resulting lists.
249+
250+
Useful for emitting multiple messages for each input.
251+
"""
252+
@spec flat_map(t, (term -> Enumerable.t())) :: t()
253+
defdelegate flat_map(stream, flat_mapper), to: Operators
254+
255+
@doc """
256+
Applies a function to each stream item.
257+
"""
258+
@spec map(t(), (term -> term)) :: t()
259+
defdelegate map(stream, mapper), to: Operators
260+
261+
@doc """
262+
Applies a transformation function to each stream item, passing the context as an additional argument.
263+
This is useful for operations that require access to the stream's headers.
264+
"""
265+
@spec map_with_context(t(), (map(), term -> term)) :: t()
266+
defdelegate map_with_context(stream, mapper), to: Operators
267+
268+
@doc """
269+
Partitions the stream to allow grouping of items by key or condition.
270+
271+
Use this before stateful operations such as `reduce/3`.
272+
273+
## Note
274+
275+
Excessive use of partitioning can impact performance and memory usage.
276+
Only partition when required for correctness or performance.
277+
See https://hexdocs.pm/flow/Flow.html#module-partitioning for more details.
8278
9-
For the client side, you should use `:return_headers` option to get headers,
10-
see `GRPC.Stub` for details.
279+
"""
280+
@spec partition(t(), keyword()) :: t()
281+
defdelegate partition(stream, options \\ []), to: Operators
282+
283+
@doc """
284+
Reduces items in the stream using an accumulator.
285+
286+
## Parameters
287+
288+
- `acc_fun` initializes the accumulator,
289+
- `reducer_fun` updates it for each input.
290+
291+
## Note
292+
See https://hexdocs.pm/flow/Flow.html#reduce/3 for more details.
293+
294+
"""
295+
@spec reduce(t, (-> acc), (term(), acc -> acc)) :: t when acc: term()
296+
defdelegate reduce(stream, acc_fun, reducer_fun), to: Operators
297+
298+
@doc """
299+
Emits only distinct items from the stream. See `uniq_by/2` for more information.
300+
301+
"""
302+
@spec uniq(t) :: t
303+
defdelegate uniq(stream), to: Operators
304+
305+
@doc """
306+
Emits only unique items as determined by the result of the given function.
307+
308+
## Note
309+
This function requires care when used for unbounded flows. For more information see https://hexdocs.pm/flow/Flow.html#uniq_by/2
310+
311+
"""
312+
@spec uniq_by(t, (term -> term)) :: t
313+
defdelegate uniq_by(stream, fun), to: Operators
314+
315+
@doc """
316+
Retrieves HTTP/2 headers from a `GRPC.Server.Stream`.
317+
318+
## Client Note
319+
320+
To receive headers on the client side, use the `:return_headers` option. See `GRPC.Stub`.
11321
"""
12322
@spec get_headers(GRPC.Server.Stream.t()) :: map
13323
def get_headers(%GRPC.Server.Stream{adapter: adapter} = stream) do
14324
headers = adapter.get_headers(stream.payload)
15325
GRPC.Transport.HTTP2.decode_headers(headers)
16326
end
327+
328+
defp build_grpc_stream(input, opts) do
329+
metadata =
330+
if Keyword.has_key?(opts, :propagate_context) do
331+
%GRPC.Server.Stream{} = mat = Keyword.fetch!(opts, :materializer)
332+
get_headers(mat) || %{}
333+
end
334+
335+
opts = Keyword.merge(opts, metadata: metadata)
336+
dispatcher = Keyword.get(opts, :default_dispatcher, GenStage.DemandDispatcher)
337+
338+
flow =
339+
case Keyword.get(opts, :join_with) do
340+
pid when is_pid(pid) ->
341+
opts = Keyword.drop(opts, [:join_with, :default_dispatcher])
342+
343+
input_flow = Flow.from_enumerable(input, opts)
344+
other_flow = Flow.from_stages([pid], opts)
345+
Flow.merge([input_flow, other_flow], dispatcher, opts)
346+
347+
name when not is_nil(name) and is_atom(name) ->
348+
pid = Process.whereis(name)
349+
350+
if not is_nil(pid) do
351+
opts = Keyword.drop(opts, [:join_with, :default_dispatcher])
352+
353+
input_flow = Flow.from_enumerable(input, opts)
354+
other_flow = Flow.from_stages([pid], opts)
355+
Flow.merge([input_flow, other_flow], dispatcher, opts)
356+
else
357+
raise ArgumentError, "No process found for the given name: #{inspect(name)}"
358+
end
359+
360+
# handle Elixir.Stream joining
361+
other when is_list(other) or is_function(other) ->
362+
Flow.from_enumerables([input, other], opts)
363+
364+
_ ->
365+
opts = Keyword.drop(opts, [:join_with, :default_dispatcher])
366+
Flow.from_enumerable(input, opts)
367+
end
368+
369+
%__MODULE__{flow: flow, options: opts}
370+
end
371+
372+
defp send_response(from, msg) do
373+
GRPC.Server.send_reply(from, msg)
374+
end
17375
end

‎lib/grpc/stream/operators.ex

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
defmodule GRPC.Stream.Operators do
2+
@moduledoc """
3+
Useful and internal functions for manipulating streams.
4+
"""
5+
alias GRPC.Stream, as: GRPCStream
6+
7+
@type item :: any()
8+
9+
@type reason :: any()
10+
11+
@spec ask(GRPCStream.t(), pid | atom, non_neg_integer) ::
12+
GRPCStream.t() | {:error, any(), :timeout | :not_alive}
13+
def ask(%GRPCStream{flow: flow} = stream, target, timeout \\ 5000) do
14+
mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end
15+
%GRPCStream{stream | flow: Flow.map(flow, mapper)}
16+
end
17+
18+
@spec ask!(GRPCStream.t(), pid | atom, non_neg_integer) :: GRPCStream.t()
19+
def ask!(%GRPCStream{flow: flow} = stream, target, timeout \\ 5000) do
20+
mapper = fn item -> do_ask(item, target, timeout, raise_on_error: true) end
21+
%GRPCStream{stream | flow: Flow.map(flow, mapper)}
22+
end
23+
24+
defp do_ask(item, target, timeout, raise_on_error: raise?) do
25+
resolved_target =
26+
case target do
27+
pid when is_pid(pid) -> if Process.alive?(pid), do: pid, else: nil
28+
atom when is_atom(atom) -> Process.whereis(atom)
29+
end
30+
31+
cond do
32+
is_nil(resolved_target) and raise? ->
33+
raise "Target #{inspect(target)} is not alive. Cannot send request to it."
34+
35+
is_nil(resolved_target) ->
36+
{:error, item, :not_alive}
37+
38+
true ->
39+
send(resolved_target, {:request, item, self()})
40+
41+
receive do
42+
{:response, res} -> res
43+
after
44+
timeout ->
45+
if raise? do
46+
raise "Timeout waiting for response from #{inspect(target)}"
47+
else
48+
{:error, item, :timeout}
49+
end
50+
end
51+
end
52+
end
53+
54+
@spec filter(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
55+
def filter(%GRPCStream{flow: flow} = stream, filter) do
56+
%GRPCStream{stream | flow: Flow.filter(flow, filter)}
57+
end
58+
59+
@spec flat_map(GRPCStream.t(), (term -> Enumerable.GRPCStream.t())) :: GRPCStream.t()
60+
def flat_map(%GRPCStream{flow: flow} = stream, flat_mapper) do
61+
%GRPCStream{stream | flow: Flow.flat_map(flow, flat_mapper)}
62+
end
63+
64+
@spec map(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
65+
def map(%GRPCStream{flow: flow} = stream, mapper) do
66+
%GRPCStream{stream | flow: Flow.map(flow, mapper)}
67+
end
68+
69+
@spec map_with_context(GRPCStream.t(), (map(), term -> term)) :: GRPCStream.t()
70+
def map_with_context(%GRPCStream{flow: flow, metadata: meta} = stream, mapper)
71+
when is_function(mapper, 2) do
72+
wrapper = fn item ->
73+
mapper.(meta, item)
74+
end
75+
76+
%GRPCStream{stream | flow: Flow.map(flow, wrapper)}
77+
end
78+
79+
@spec partition(GRPCStream.t(), keyword()) :: GRPCStream.t()
80+
def partition(%GRPCStream{flow: flow} = stream, options \\ []) do
81+
%GRPCStream{stream | flow: Flow.partition(flow, options)}
82+
end
83+
84+
@spec reduce(GRPCStream.t(), (-> acc), (term, acc -> acc)) :: GRPCStream.t() when acc: term()
85+
def reduce(%GRPCStream{flow: flow} = stream, acc_fun, reducer_fun) do
86+
%GRPCStream{stream | flow: Flow.reduce(flow, acc_fun, reducer_fun)}
87+
end
88+
89+
@spec reject(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
90+
def reject(%GRPCStream{flow: flow} = stream, filter) do
91+
%GRPCStream{stream | flow: Flow.reject(flow, filter)}
92+
end
93+
94+
@spec uniq(GRPCStream.t()) :: GRPCStream.t()
95+
def uniq(%GRPCStream{flow: flow} = stream) do
96+
%GRPCStream{stream | flow: Flow.uniq(flow)}
97+
end
98+
99+
@spec uniq_by(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
100+
def uniq_by(%GRPCStream{flow: flow} = stream, fun) do
101+
%GRPCStream{stream | flow: Flow.uniq_by(flow, fun)}
102+
end
103+
end

‎mix.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ defmodule GRPC.Mixfile do
3636
defp deps do
3737
[
3838
{:cowboy, "~> 2.10"},
39+
{:flow, "~> 1.2"},
3940
{:gun, "~> 2.0"},
4041
{:jason, ">= 0.0.0", optional: true},
4142
{:cowlib, "~> 2.12"},

‎mix.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
66
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
77
"ex_parameterized": {:hex, :ex_parameterized, "1.3.7", "801f85fc4651cb51f11b9835864c6ed8c5e5d79b1253506b5bb5421e8ab2f050", [:mix], [], "hexpm", "1fb0dc4aa9e8c12ae23806d03bcd64a5a0fc9cd3f4c5602ba72561c9b54a625c"},
8+
"flow": {:hex, :flow, "1.2.4", "1dd58918287eb286656008777cb32714b5123d3855956f29aa141ebae456922d", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm", "874adde96368e71870f3510b91e35bc31652291858c86c0e75359cbdd35eb211"},
9+
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
810
"gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"},
911
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
1012
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},

‎test/grpc/stream_test.exs

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
defmodule GRPC.StreamTest do
2+
use ExUnit.Case
3+
doctest GRPC.Stream
4+
5+
describe "simple test" do
6+
defmodule TestInput do
7+
defstruct [:message]
8+
end
9+
10+
defmodule FakeAdapter do
11+
def get_headers(_), do: %{"content-type" => "application/grpc"}
12+
end
13+
14+
test "unary/2 creates a flow from a unary input" do
15+
input = %TestInput{message: 1}
16+
17+
result =
18+
GRPC.Stream.unary(input)
19+
|> GRPC.Stream.map(& &1)
20+
|> GRPC.Stream.run()
21+
22+
assert result == input
23+
end
24+
25+
test "unary/2 creates a flow with metadata" do
26+
input = %TestInput{message: 1}
27+
materializer = %GRPC.Server.Stream{adapter: FakeAdapter}
28+
29+
flow =
30+
GRPC.Stream.unary(input, materializer: materializer, propagate_context: true)
31+
|> GRPC.Stream.map_with_context(fn meta, item ->
32+
assert not is_nil(meta)
33+
assert is_map(meta)
34+
item
35+
end)
36+
37+
result = Enum.to_list(GRPC.Stream.to_flow(flow)) |> Enum.at(0)
38+
assert result == input
39+
end
40+
41+
test "from/2 creates a flow from enumerable input" do
42+
input = [%{message: "a"}, %{message: "b"}]
43+
44+
flow =
45+
GRPC.Stream.from(input, max_demand: 1)
46+
|> GRPC.Stream.map(& &1)
47+
48+
result = Enum.to_list(GRPC.Stream.to_flow(flow))
49+
assert result == input
50+
end
51+
52+
test "from_as_ctx/3 creates a flow from enumerable input" do
53+
input = [%{message: "a"}, %{message: "b"}]
54+
materializer = %GRPC.Server.Stream{adapter: FakeAdapter}
55+
56+
flow =
57+
GRPC.Stream.from(input, propagate_context: true, materializer: materializer)
58+
|> GRPC.Stream.map_with_context(fn meta, item ->
59+
assert not is_nil(meta)
60+
assert is_map(meta)
61+
item
62+
end)
63+
64+
result = Enum.to_list(GRPC.Stream.to_flow(flow))
65+
assert result == input
66+
end
67+
end
68+
69+
describe "from/2" do
70+
test "converts a list into a flow" do
71+
stream = GRPC.Stream.from([1, 2, 3])
72+
assert %GRPC.Stream{} = stream
73+
74+
result = stream |> GRPC.Stream.map(&(&1 * 2)) |> GRPC.Stream.to_flow() |> Enum.to_list()
75+
assert Enum.sort(result) == [2, 4, 6]
76+
end
77+
end
78+
79+
describe "ask/3 with pid" do
80+
test "calls a pid and returns the response" do
81+
pid =
82+
spawn(fn ->
83+
receive do
84+
{:request, :hello, test_pid} ->
85+
send(test_pid, {:response, :world})
86+
end
87+
end)
88+
89+
result =
90+
GRPC.Stream.from([:hello])
91+
|> GRPC.Stream.ask(pid)
92+
|> GRPC.Stream.to_flow()
93+
|> Enum.to_list()
94+
95+
assert result == [:world]
96+
end
97+
98+
test "returns error if pid not alive" do
99+
pid = spawn(fn -> :ok end)
100+
# wait for the process to exit
101+
ref = Process.monitor(pid)
102+
assert_receive {:DOWN, ^ref, _, _, _}
103+
104+
result =
105+
GRPC.Stream.from(["msg"])
106+
|> GRPC.Stream.ask(pid)
107+
|> GRPC.Stream.to_flow()
108+
|> Enum.to_list()
109+
110+
assert result == [{:error, "msg", :not_alive}]
111+
end
112+
end
113+
114+
describe "ask/3 with GenServer" do
115+
defmodule TestServer do
116+
use GenServer
117+
118+
def start_link(_) do
119+
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
120+
end
121+
122+
def init(_), do: {:ok, %{}}
123+
124+
def handle_info({:request, value, from}, state) do
125+
Process.send(from, {:response, value}, [])
126+
{:noreply, state}
127+
end
128+
end
129+
130+
setup do
131+
{:ok, _pid} = TestServer.start_link([])
132+
:ok
133+
end
134+
135+
test "asks GenServer and receives correct response" do
136+
stream = GRPC.Stream.from(["abc"])
137+
138+
result =
139+
stream
140+
|> GRPC.Stream.ask(TestServer)
141+
|> GRPC.Stream.to_flow()
142+
|> Enum.to_list()
143+
144+
assert result == ["abc"]
145+
end
146+
end
147+
148+
describe "map/2, flat_map/2, filter/2" do
149+
test "maps values correctly" do
150+
result =
151+
GRPC.Stream.from([1, 2, 3])
152+
|> GRPC.Stream.map(&(&1 * 10))
153+
|> GRPC.Stream.to_flow()
154+
|> Enum.to_list()
155+
156+
assert Enum.sort(result) == [10, 20, 30]
157+
end
158+
159+
test "flat_maps values correctly" do
160+
result =
161+
GRPC.Stream.from([1, 2])
162+
|> GRPC.Stream.flat_map(&[&1, &1])
163+
|> GRPC.Stream.to_flow()
164+
|> Enum.to_list()
165+
166+
assert Enum.sort(result) == [1, 1, 2, 2]
167+
end
168+
169+
test "filters values correctly" do
170+
result =
171+
GRPC.Stream.from([1, 2, 3, 4])
172+
|> GRPC.Stream.filter(&(rem(&1, 2) == 0))
173+
|> GRPC.Stream.to_flow()
174+
|> Enum.to_list()
175+
176+
assert result == [2, 4]
177+
end
178+
end
179+
180+
describe "test complex operations" do
181+
test "pipeline with all GRPC.Stream operators" do
182+
target =
183+
spawn(fn ->
184+
receive_loop()
185+
end)
186+
187+
input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
188+
189+
result =
190+
input
191+
|> GRPC.Stream.from()
192+
# 2..11
193+
|> GRPC.Stream.map(&(&1 + 1))
194+
# [2,4,3,6,4,8,...]
195+
|> GRPC.Stream.flat_map(&[&1, &1 * 2])
196+
# keep evens
197+
|> GRPC.Stream.filter(&(rem(&1, 2) == 0))
198+
199+
# remove duplicates
200+
|> GRPC.Stream.uniq()
201+
# multiply by 10 via process
202+
|> GRPC.Stream.ask(target)
203+
|> GRPC.Stream.partition()
204+
|> GRPC.Stream.reduce(fn -> [] end, fn i, acc -> [i | acc] end)
205+
|> GRPC.Stream.to_flow()
206+
|> Enum.to_list()
207+
|> List.flatten()
208+
|> Enum.sort()
209+
210+
assert result == [20, 40, 60, 80, 100, 120, 140, 160, 180, 200, 220]
211+
end
212+
end
213+
214+
describe "join_with/merge streams" do
215+
test "merges input stream with joined GenStage producer" do
216+
defmodule TestProducer do
217+
use GenStage
218+
219+
def start_link(items) do
220+
GenStage.start_link(__MODULE__, items)
221+
end
222+
223+
def init(items) do
224+
{:producer, items}
225+
end
226+
227+
def handle_demand(demand, state) when demand > 0 do
228+
{events, remaining} = Enum.split(state, demand)
229+
230+
{:noreply, events, remaining}
231+
end
232+
end
233+
234+
elements = Enum.to_list(4..1000)
235+
{:ok, producer_pid} = TestProducer.start_link(elements)
236+
237+
input = [1, 2, 3]
238+
239+
task =
240+
Task.async(fn ->
241+
GRPC.Stream.from(input, join_with: producer_pid, max_demand: 500)
242+
|> GRPC.Stream.map(fn it -> it end)
243+
|> GRPC.Stream.run_with(%GRPC.Server.Stream{}, dry_run: true)
244+
end)
245+
246+
result =
247+
case Task.yield(task, 1000) || Task.shutdown(task) do
248+
{:ok, _} -> :ok
249+
_ -> :ok
250+
end
251+
252+
if Process.alive?(producer_pid) do
253+
Process.exit(producer_pid, :normal)
254+
end
255+
256+
assert result == :ok
257+
end
258+
end
259+
260+
defp receive_loop do
261+
receive do
262+
{:request, item, from} ->
263+
send(from, {:response, item * 10})
264+
receive_loop()
265+
end
266+
end
267+
end

0 commit comments

Comments
 (0)
Please sign in to comment.