Skip to content

Commit ba8256d

Browse files
committed
Pick the specify worker
1 parent 86ad40d commit ba8256d

File tree

3 files changed

+23
-2
lines changed

3 files changed

+23
-2
lines changed

src/grpcbox_channel.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
-export([start_link/3,
66
is_ready/1,
7+
get/3,
78
pick/2,
89
pick/3,
910
add_endpoints/2,
@@ -57,6 +58,16 @@ start_link(Name, Endpoints, Options) ->
5758
is_ready(Name) ->
5859
gen_statem:call(?CHANNEL(Name), is_ready).
5960

61+
-spec get(name(), unary | stream, term()) ->
62+
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |
63+
{error, undefined_channel | not_found_endpoint}.
64+
get(Name, CallType, Key) ->
65+
case lists:keyfind(Key, 1, gproc_pool:active_workers(Name)) of
66+
{_, Pid} -> {ok, {Pid, interceptor(Name, CallType)}};
67+
false -> {error, not_found_endpoint}
68+
end.
69+
70+
6071
%% @doc Picks a subchannel from a pool using the configured strategy.
6172
-spec pick(name(), unary | stream) ->
6273
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |

src/grpcbox_client.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ get_channel(Options, Type) ->
5050
Key = maps:get(key, Options, undefined),
5151
PickStrategy = maps:get(pick_strategy, Options, undefined),
5252
case PickStrategy of
53+
specify_worker -> gproc_pool:get(Channel, Type, Key);
5354
active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key);
5455
undefined -> grpcbox_channel:pick(Channel, Type, Key)
5556
end.

test/grpcbox_channel_SUITE.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
add_and_remove_endpoints/1,
77
add_and_remove_endpoints_active_workers/1,
88
pick_worker_strategy/1,
9-
pick_active_worker_strategy/1]).
9+
pick_active_worker_strategy/1,
10+
pick_specify_worker_strategy/1]).
1011

1112
-include_lib("eunit/include/eunit.hrl").
1213

@@ -15,7 +16,8 @@ all() ->
1516
add_and_remove_endpoints,
1617
add_and_remove_endpoints_active_workers,
1718
pick_worker_strategy,
18-
pick_active_worker_strategy
19+
pick_active_worker_strategy,
20+
pick_specify_worker_strategy
1921
].
2022
init_per_suite(_Config) ->
2123
GrpcOptions = #{service_protos => [route_guide_pb], services => #{'routeguide.RouteGuide' => routeguide_route_guide}},
@@ -95,6 +97,13 @@ pick_active_worker_strategy(_Config) ->
9597
?assertEqual(error, pick_worker({direct_channel, active})),
9698
?assertEqual(error, pick_worker({hash_channel, active})),
9799
ok.
100+
101+
pick_specify_worker_strategy(_Config) ->
102+
?assertMatch({ok, _} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 18080, #{}})),
103+
?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 8080, #{}})),
104+
?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(channel_xxx, stream, {http, "127.0.0.1", 8080, #{}})),
105+
ok.
106+
98107
pick_worker(Name, N) ->
99108
{R, _} = grpcbox_channel:pick(Name, unary, N),
100109
R.

0 commit comments

Comments
 (0)