Skip to content

Commit 2060e05

Browse files
sleipnirAdriano Santos
andauthored
[Fix]: Stream map error (#487)
* fix: map_error send_response --------- Co-authored-by: Adriano Santos <[email protected]>
1 parent 9baf871 commit 2060e05

File tree

2 files changed

+328
-1
lines changed

2 files changed

+328
-1
lines changed

grpc_client/test/grpc/integration/stream_test.exs

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,326 @@ defmodule GRPC.StreamTest do
3333
end)
3434
end
3535
end
36+
37+
describe "map_error/2" do
38+
defmodule MapErrorService do
39+
use GRPC.Server, service: Routeguide.RouteGuide.Service
40+
41+
def get_feature(input, materializer) do
42+
GRPC.Stream.unary(input, materializer: materializer)
43+
|> GRPC.Stream.map(fn point ->
44+
# Trigger error when latitude is 0
45+
if point.latitude == 0 do
46+
raise "Boom! Invalid latitude"
47+
end
48+
49+
%Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
50+
end)
51+
|> GRPC.Stream.map_error(fn error ->
52+
case error do
53+
{:error, {:exception, %{message: msg}}} ->
54+
{:error,
55+
GRPC.RPCError.exception(status: :invalid_argument, message: "Error: #{msg}")}
56+
57+
other ->
58+
# Not an error, return as-is to continue the flow
59+
other
60+
end
61+
end)
62+
|> GRPC.Stream.run()
63+
end
64+
end
65+
66+
defmodule DirectRPCErrorService do
67+
use GRPC.Server, service: Routeguide.RouteGuide.Service
68+
69+
def get_feature(input, materializer) do
70+
GRPC.Stream.unary(input, materializer: materializer)
71+
|> GRPC.Stream.map(fn point ->
72+
# Trigger error when latitude is negative
73+
if point.latitude < 0 do
74+
raise "Negative latitude not allowed"
75+
end
76+
77+
%Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
78+
end)
79+
|> GRPC.Stream.map_error(fn error ->
80+
case error do
81+
{:error, {:exception, %{message: msg}}} ->
82+
# Return RPCError directly without {:error, ...} wrapper
83+
GRPC.RPCError.exception(status: :out_of_range, message: "Direct error: #{msg}")
84+
85+
other ->
86+
# Not an error, return as-is to continue the flow
87+
other
88+
end
89+
end)
90+
|> GRPC.Stream.run()
91+
end
92+
end
93+
94+
defmodule ExplicitValidationService do
95+
use GRPC.Server, service: Routeguide.RouteGuide.Service
96+
97+
def get_feature(input, materializer) do
98+
GRPC.Stream.unary(input, materializer: materializer)
99+
|> GRPC.Stream.map(fn point ->
100+
# Trigger different error types based on coordinates
101+
cond do
102+
point.latitude == 999 ->
103+
raise RuntimeError, "Runtime error occurred"
104+
105+
point.latitude == 888 ->
106+
raise ArgumentError, "Argument is invalid"
107+
108+
point.latitude == 777 ->
109+
raise "Simple string error"
110+
111+
true ->
112+
%Routeguide.Feature{location: point, name: "valid"}
113+
end
114+
end)
115+
|> GRPC.Stream.map_error(fn error ->
116+
# Explicitly validate the error structure
117+
case error do
118+
{:error, {:exception, exception_data}} when is_map(exception_data) ->
119+
# Validate that we have the expected exception structure
120+
message = Map.get(exception_data, :message)
121+
kind = Map.get(exception_data, :kind, :error)
122+
123+
cond do
124+
is_binary(message) and message =~ "Runtime error" ->
125+
{:error,
126+
GRPC.RPCError.exception(
127+
status: :internal,
128+
message: "Validated: RuntimeError - #{message}"
129+
)}
130+
131+
is_binary(message) and message =~ "Argument is invalid" ->
132+
{:error,
133+
GRPC.RPCError.exception(
134+
status: :invalid_argument,
135+
message: "Validated: ArgumentError - #{message}"
136+
)}
137+
138+
is_binary(message) ->
139+
{:error,
140+
GRPC.RPCError.exception(
141+
status: :unknown,
142+
message: "Validated: #{kind} - #{message}"
143+
)}
144+
145+
true ->
146+
{:error,
147+
GRPC.RPCError.exception(
148+
status: :unknown,
149+
message: "Validated but no message found"
150+
)}
151+
end
152+
153+
other ->
154+
# Not an exception error, pass through
155+
other
156+
end
157+
end)
158+
|> GRPC.Stream.run()
159+
end
160+
end
161+
162+
defmodule MultipleErrorsService do
163+
use GRPC.Server, service: Routeguide.RouteGuide.Service
164+
165+
def get_feature(input, materializer) do
166+
GRPC.Stream.unary(input, materializer: materializer)
167+
|> GRPC.Stream.map(fn point ->
168+
cond do
169+
point.latitude == 0 ->
170+
raise "Invalid latitude: cannot be zero"
171+
172+
point.longitude == 0 ->
173+
raise "Invalid longitude: cannot be zero"
174+
175+
point.latitude < 0 ->
176+
raise ArgumentError, "Latitude must be positive"
177+
178+
true ->
179+
%Routeguide.Feature{location: point, name: "valid"}
180+
end
181+
end)
182+
|> GRPC.Stream.map_error(fn error ->
183+
case error do
184+
{:error, {:exception, %{message: msg}}} when is_binary(msg) ->
185+
cond do
186+
msg =~ "latitude" ->
187+
{:error,
188+
GRPC.RPCError.exception(
189+
status: :invalid_argument,
190+
message: "Latitude error: #{msg}"
191+
)}
192+
193+
msg =~ "longitude" ->
194+
{:error,
195+
GRPC.RPCError.exception(
196+
status: :invalid_argument,
197+
message: "Longitude error: #{msg}"
198+
)}
199+
200+
true ->
201+
{:error,
202+
GRPC.RPCError.exception(status: :unknown, message: "Unknown error: #{msg}")}
203+
end
204+
205+
other ->
206+
# Not an error we handle, return as-is to continue the flow
207+
other
208+
end
209+
end)
210+
|> GRPC.Stream.run()
211+
end
212+
end
213+
214+
@tag :map_error
215+
test "handles errors with map_error and sends RPCError to client" do
216+
run_server([MapErrorService], fn port ->
217+
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}", adapter_opts: [retry_timeout: 10])
218+
219+
# Test with invalid latitude (0) - should trigger error
220+
invalid_point = %Routeguide.Point{latitude: 0, longitude: -746_188_906}
221+
222+
result =
223+
Routeguide.RouteGuide.Stub.get_feature(channel, invalid_point, return_headers: true)
224+
225+
# Should receive error response with custom message
226+
assert {:error, error} = result
227+
assert %GRPC.RPCError{} = error
228+
# Status is returned as integer (3 = INVALID_ARGUMENT)
229+
assert error.status == 3
230+
assert error.message =~ "Error: Boom! Invalid latitude"
231+
end)
232+
end
233+
234+
@tag :map_error
235+
test "handles successful requests without triggering map_error" do
236+
run_server([MapErrorService], fn port ->
237+
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}", adapter_opts: [retry_timeout: 10])
238+
239+
# Test with valid latitude (non-zero) - should succeed
240+
valid_point = %Routeguide.Point{latitude: 409_146_138, longitude: -746_188_906}
241+
242+
result =
243+
Routeguide.RouteGuide.Stub.get_feature(channel, valid_point, return_headers: true)
244+
245+
assert {:ok, response, _metadata} = result
246+
assert response.location == valid_point
247+
assert response.name == "409146138,-746188906"
248+
end)
249+
end
250+
251+
@tag :map_error
252+
test "handles RPCError returned directly without {:error, ...} wrapper" do
253+
run_server([DirectRPCErrorService], fn port ->
254+
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}", adapter_opts: [retry_timeout: 10])
255+
256+
# Test with negative latitude - should trigger error
257+
negative_point = %Routeguide.Point{latitude: -50, longitude: 100}
258+
259+
result =
260+
Routeguide.RouteGuide.Stub.get_feature(channel, negative_point, return_headers: true)
261+
262+
# Should receive error response with custom message
263+
assert {:error, error} = result
264+
assert %GRPC.RPCError{} = error
265+
# Status is returned as integer (11 = OUT_OF_RANGE)
266+
assert error.status == 11
267+
assert error.message =~ "Direct error: Negative latitude not allowed"
268+
end)
269+
end
270+
271+
@tag :map_error
272+
test "handles successful request when using direct RPCError service" do
273+
run_server([DirectRPCErrorService], fn port ->
274+
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}", adapter_opts: [retry_timeout: 10])
275+
276+
# Test with positive latitude - should succeed
277+
valid_point = %Routeguide.Point{latitude: 50, longitude: 100}
278+
279+
result =
280+
Routeguide.RouteGuide.Stub.get_feature(channel, valid_point, return_headers: true)
281+
282+
assert {:ok, response, _metadata} = result
283+
assert response.location == valid_point
284+
assert response.name == "50,100"
285+
end)
286+
end
287+
288+
@tag :map_error
289+
test "handles different error types with conditional map_error" do
290+
run_server([MultipleErrorsService], fn port ->
291+
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}", adapter_opts: [retry_timeout: 10])
292+
293+
# Test latitude error
294+
lat_error_point = %Routeguide.Point{latitude: 0, longitude: 100}
295+
assert {:error, error} = Routeguide.RouteGuide.Stub.get_feature(channel, lat_error_point)
296+
# INVALID_ARGUMENT
297+
assert error.status == 3
298+
assert error.message =~ "Latitude error"
299+
300+
# Test longitude error
301+
long_error_point = %Routeguide.Point{latitude: 100, longitude: 0}
302+
assert {:error, error} = Routeguide.RouteGuide.Stub.get_feature(channel, long_error_point)
303+
# INVALID_ARGUMENT
304+
assert error.status == 3
305+
assert error.message =~ "Longitude error"
306+
307+
# Test ArgumentError (negative latitude) - falls into "Unknown error" branch
308+
arg_error_point = %Routeguide.Point{latitude: -100, longitude: 100}
309+
assert {:error, error} = Routeguide.RouteGuide.Stub.get_feature(channel, arg_error_point)
310+
# UNKNOWN (because message contains "Latitude must be positive")
311+
assert error.status == 2
312+
assert error.message =~ "Latitude must be positive"
313+
end)
314+
end
315+
316+
@tag :map_error
317+
test "explicitly validates exception structure in map_error" do
318+
run_server([ExplicitValidationService], fn port ->
319+
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}", adapter_opts: [retry_timeout: 10])
320+
321+
# Test RuntimeError - should validate and transform to INTERNAL
322+
runtime_error_point = %Routeguide.Point{latitude: 999, longitude: 100}
323+
324+
assert {:error, error} =
325+
Routeguide.RouteGuide.Stub.get_feature(channel, runtime_error_point)
326+
327+
# INTERNAL
328+
assert error.status == 13
329+
assert error.message =~ "Validated: RuntimeError"
330+
assert error.message =~ "Runtime error occurred"
331+
332+
# Test ArgumentError - should validate and transform to INVALID_ARGUMENT
333+
arg_error_point = %Routeguide.Point{latitude: 888, longitude: 100}
334+
assert {:error, error} = Routeguide.RouteGuide.Stub.get_feature(channel, arg_error_point)
335+
# INVALID_ARGUMENT
336+
assert error.status == 3
337+
assert error.message =~ "Validated: ArgumentError"
338+
assert error.message =~ "Argument is invalid"
339+
340+
# Test simple string error - should validate and transform to UNKNOWN
341+
string_error_point = %Routeguide.Point{latitude: 777, longitude: 100}
342+
343+
assert {:error, error} =
344+
Routeguide.RouteGuide.Stub.get_feature(channel, string_error_point)
345+
346+
# UNKNOWN
347+
assert error.status == 2
348+
assert error.message =~ "Validated:"
349+
assert error.message =~ "Simple string error"
350+
351+
# Test successful request - should not trigger error handling
352+
valid_point = %Routeguide.Point{latitude: 100, longitude: 100}
353+
assert {:ok, response} = Routeguide.RouteGuide.Stub.get_feature(channel, valid_point)
354+
assert response.name == "valid"
355+
end)
356+
end
357+
end
36358
end

grpc_server/lib/grpc/stream.ex

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,12 @@ defmodule GRPC.Stream do
542542
dry_run? = Keyword.get(opts, :dry_run, false)
543543

544544
if not dry_run? do
545-
GRPC.Server.send_reply(from, msg)
545+
# RPCError should be raised, not sent as reply
546+
case msg do
547+
%GRPC.RPCError{} = rpc_error -> raise rpc_error
548+
{:error, %GRPC.RPCError{} = rpc_error} -> raise rpc_error
549+
_ -> GRPC.Server.send_reply(from, msg)
550+
end
546551
end
547552
end
548553
end

0 commit comments

Comments
 (0)