Skip to content

Commit 7c1ca47

Browse files
committed
The grpcbox_client get_channel function adds support for hash and direct strategies
1 parent 6ea2b84 commit 7c1ca47

File tree

3 files changed

+46
-12
lines changed

3 files changed

+46
-12
lines changed

src/grpcbox_channel.erl

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
-export([start_link/3,
66
is_ready/1,
77
pick/2,
8+
pick/3,
89
add_endpoints/2,
910
remove_endpoints/3,
1011
stop/1,
@@ -57,11 +58,19 @@ is_ready(Name) ->
5758
gen_statem:call(?CHANNEL(Name), is_ready).
5859

5960
%% @doc Picks a subchannel from a pool using the configured strategy.
60-
-spec pick(name(), unary | stream) -> {ok, {pid(), grpcbox_client:interceptor() | undefined}} |
61-
{error, undefined_channel | no_endpoints}.
61+
-spec pick(name(), unary | stream) ->
62+
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |
63+
{error, undefined_channel | no_endpoints}.
6264
pick(Name, CallType) ->
65+
pick(Name, CallType, undefined).
66+
67+
%% @doc Picks a subchannel from a pool using the configured strategy.
68+
-spec pick(name(), unary | stream, term() | undefined) ->
69+
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |
70+
{error, undefined_channel | no_endpoints}.
71+
pick(Name, CallType, Key) ->
6372
try
64-
case gproc_pool:pick_worker(Name) of
73+
case pick_worker(Name, Key) of
6574
false -> {error, no_endpoints};
6675
Pid when is_pid(Pid) ->
6776
{ok, {Pid, interceptor(Name, CallType)}}
@@ -71,6 +80,11 @@ pick(Name, CallType) ->
7180
{error, undefined_channel}
7281
end.
7382

83+
pick_worker(Name, undefined) ->
84+
gproc_pool:pick_worker(Name);
85+
pick_worker(Name, Key) ->
86+
gproc_pool:pick_worker(Name, Key).
87+
7488
add_endpoints(Name, Endpoints) ->
7589
gen_statem:call(?CHANNEL(Name), {add_endpoints, Endpoints}).
7690

@@ -123,9 +137,9 @@ connected({call, From}, is_ready, _Data) ->
123137
{keep_state_and_data, [{reply, From, true}]};
124138
connected({call, From}, {add_endpoints, Endpoints},
125139
Data=#data{pool=Pool,
126-
stats_handler=StatsHandler,
127-
encoding=Encoding,
128-
endpoints=TotalEndpoints}) ->
140+
stats_handler=StatsHandler,
141+
encoding=Encoding,
142+
endpoints=TotalEndpoints}) ->
129143
NewEndpoints = lists:subtract(Endpoints, TotalEndpoints),
130144
NewTotalEndpoints = lists:umerge(TotalEndpoints, Endpoints),
131145
start_workers(Pool, StatsHandler, Encoding, NewEndpoints),
@@ -197,8 +211,8 @@ insert_stream_interceptor(Name, _Type, Interceptors) ->
197211
start_workers(Pool, StatsHandler, Encoding, Endpoints) ->
198212
[begin
199213
gproc_pool:add_worker(Pool, Endpoint),
200-
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint,
201-
Pool, Endpoint, Encoding, StatsHandler),
214+
gproc_pool:add_worker({Pool, active}, Endpoint),
215+
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, Endpoint, Encoding, StatsHandler),
202216
Pid
203217
end || Endpoint <- Endpoints].
204218

src/grpcbox_client.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@
4747

4848
get_channel(Options, Type) ->
4949
Channel = maps:get(channel, Options, default_channel),
50-
grpcbox_channel:pick(Channel, Type).
50+
Key = maps:get(key, Options, undefined),
51+
grpcbox_channel:pick(Channel, Type, Key).
5152

5253
unary(Ctx, Service, Method, Input, Def, Options) ->
5354
unary(Ctx, filename:join([<<>>, Service, Method]), Input, Def, Options).

test/grpcbox_channel_SUITE.erl

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010

1111
all() ->
1212
[
13-
add_and_remove_endpoints
13+
add_and_remove_endpoints,
14+
pick_worker_strategy
1415
].
1516
init_per_suite(_Config) ->
1617
application:set_env(grpcbox, servers, []),
@@ -35,6 +36,24 @@ end_per_suite(_Config) ->
3536

3637
add_and_remove_endpoints(_Config) ->
3738
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}]),
38-
?assertMatch(4, length(gproc_pool:active_workers(default_channel))),
39+
?assertEqual(4, length(gproc_pool:active_workers(default_channel))),
3940
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], normal),
40-
?assertMatch(1, length(gproc_pool:active_workers(default_channel))).
41+
?assertEqual(1, length(gproc_pool:active_workers(default_channel))).
42+
43+
pick_worker_strategy(_Config) ->
44+
?assertEqual(ok, pick_worker(default_channel)),
45+
?assertEqual(ok, pick_worker(random_channel)),
46+
?assertEqual(ok, pick_worker(direct_channel, 1)),
47+
?assertEqual(ok, pick_worker(hash_channel, 1)),
48+
?assertEqual(error, pick_worker(default_channel, 1)),
49+
?assertEqual(error, pick_worker(random_channel, 1)),
50+
?assertEqual(error, pick_worker(direct_channel)),
51+
?assertEqual(error, pick_worker(hash_channel)),
52+
ok.
53+
54+
pick_worker(Name, N) ->
55+
{R, _} = grpcbox_channel:pick(Name, unary, N),
56+
R.
57+
pick_worker(Name) ->
58+
{R, _} = grpcbox_channel:pick(Name, unary, undefined),
59+
R.

0 commit comments

Comments
 (0)