Skip to content

Commit 3122d30

Browse files
committed
Pick the specify worker
1 parent 8cf000f commit 3122d30

File tree

3 files changed

+22
-2
lines changed

3 files changed

+22
-2
lines changed

src/grpcbox_channel.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
-export([start_link/3,
66
is_ready/1,
7+
get/3,
78
pick/2,
89
pick/3,
910
add_endpoints/2,
@@ -60,6 +61,16 @@ start_link(Name, Endpoints, Options) ->
6061
is_ready(Name) ->
6162
gen_statem:call(?CHANNEL(Name), is_ready).
6263

64+
-spec get(name(), unary | stream, term()) ->
65+
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |
66+
{error, undefined_channel | not_found_endpoint}.
67+
get(Name, CallType, Key) ->
68+
case lists:keyfind(Key, 1, gproc_pool:active_workers(Name)) of
69+
{_, Pid} -> {ok, {Pid, interceptor(Name, CallType)}};
70+
false -> {error, not_found_endpoint}
71+
end.
72+
73+
6374
%% @doc Picks a subchannel from a pool using the configured strategy.
6475
-spec pick(name(), unary | stream) ->
6576
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |

src/grpcbox_client.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ get_channel(Options, Type) ->
5050
Key = maps:get(key, Options, undefined),
5151
PickStrategy = maps:get(pick_strategy, Options, undefined),
5252
case PickStrategy of
53+
specify_worker -> grpcbox_channel:get(Channel, Type, Key);
5354
active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key);
5455
undefined -> grpcbox_channel:pick(Channel, Type, Key)
5556
end.

test/grpcbox_channel_SUITE.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
add_and_remove_endpoints/1,
77
add_and_remove_endpoints_active_workers/1,
88
pick_worker_strategy/1,
9-
pick_active_worker_strategy/1]).
9+
pick_active_worker_strategy/1,
10+
pick_specify_worker_strategy/1]).
1011

1112
-include_lib("eunit/include/eunit.hrl").
1213

@@ -15,7 +16,8 @@ all() ->
1516
add_and_remove_endpoints,
1617
add_and_remove_endpoints_active_workers,
1718
pick_worker_strategy,
18-
pick_active_worker_strategy
19+
pick_active_worker_strategy,
20+
pick_specify_worker_strategy
1921
].
2022
init_per_suite(_Config) ->
2123
application:set_env(grpcbox, client, #{channel => []}),
@@ -97,6 +99,12 @@ pick_active_worker_strategy(_Config) ->
9799
?assertEqual(error, pick_worker({hash_channel, active})),
98100
ok.
99101

102+
pick_specify_worker_strategy(_Config) ->
103+
?assertMatch({ok, _} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 18080, []})),
104+
?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 8080, []})),
105+
?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(channel_xxx, stream, {http, "127.0.0.1", 8080, []})),
106+
ok.
107+
100108
pick_worker(Name, N) ->
101109
{R, _} = grpcbox_channel:pick(Name, unary, N),
102110
R.

0 commit comments

Comments
 (0)