Skip to content

Commit 5c041a3

Browse files
committed
grpcbox_channel can add and remove dynamic endpoints
1 parent b9060a2 commit 5c041a3

File tree

2 files changed

+81
-9
lines changed

2 files changed

+81
-9
lines changed

src/grpcbox_channel.erl

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
is_ready/1,
77
pick/2,
88
pick/3,
9+
add_endpoints/2,
10+
remove_endpoints/3,
911
stop/1,
1012
stop/2]).
1113
-export([init/1,
@@ -86,6 +88,12 @@ pick_worker(Name, undefined) ->
8688
pick_worker(Name, Key) ->
8789
gproc_pool:pick_worker(Name, Key).
8890

91+
add_endpoints(Name, Endpoints) ->
92+
gen_statem:call(?CHANNEL(Name), {add_endpoints, Endpoints}).
93+
94+
remove_endpoints(Name, Endpoints, Reason) ->
95+
gen_statem:call(?CHANNEL(Name), {remove_endpoints, Endpoints, Reason}).
96+
8997
-spec interceptor(name(), unary | stream) -> grpcbox_client:interceptor() | undefined.
9098
interceptor(Name, CallType) ->
9199
case ets:lookup(?CHANNELS_TAB, {Name, CallType}) of
@@ -114,14 +122,13 @@ init([Name, Endpoints, Options]) ->
114122
pool = Name,
115123
encoding = Encoding,
116124
stats_handler = StatsHandler,
117-
endpoints = Endpoints
125+
endpoints = lists:usort(Endpoints)
118126
},
119-
120127
case maps:get(sync_start, Options, false) of
121128
false ->
122129
{ok, idle, Data, [{next_event, internal, connect}]};
123130
true ->
124-
_ = start_workers(Name, StatsHandler, Encoding, Endpoints),
131+
start_workers(Name, StatsHandler, Encoding, Endpoints),
125132
{ok, connected, Data}
126133
end.
127134

@@ -130,14 +137,32 @@ callback_mode() ->
130137

131138
connected({call, From}, is_ready, _Data) ->
132139
{keep_state_and_data, [{reply, From, true}]};
140+
connected({call, From}, {add_endpoints, Endpoints},
141+
Data=#data{pool=Pool,
142+
stats_handler=StatsHandler,
143+
encoding=Encoding,
144+
endpoints=TotalEndpoints}) ->
145+
NewEndpoints = lists:subtract(Endpoints, TotalEndpoints),
146+
NewTotalEndpoints = lists:umerge(TotalEndpoints, Endpoints),
147+
start_workers(Pool, StatsHandler, Encoding, NewEndpoints),
148+
{keep_state, Data#data{endpoints=NewTotalEndpoints}, [{reply, From, ok}]};
149+
connected({call, From}, {remove_endpoints, Endpoints, Reason},
150+
Data=#data{pool=Pool, endpoints=TotalEndpoints}) ->
151+
152+
NewEndpoints = sets:to_list(sets:intersection(sets:from_list(Endpoints),
153+
sets:from_list(TotalEndpoints))),
154+
NewTotalEndpoints = lists:subtract(TotalEndpoints, Endpoints),
155+
stop_workers(Pool, NewEndpoints, Reason),
156+
{keep_state, Data#data{endpoints = NewTotalEndpoints}, [{reply, From, ok}]};
133157
connected(EventType, EventContent, Data) ->
134158
handle_event(EventType, EventContent, Data).
135159

136160
idle(internal, connect, Data=#data{pool=Pool,
137161
stats_handler=StatsHandler,
138162
encoding=Encoding,
139163
endpoints=Endpoints}) ->
140-
_ = start_workers(Pool, StatsHandler, Encoding, Endpoints),
164+
165+
start_workers(Pool, StatsHandler, Encoding, Endpoints),
141166
{next_state, connected, Data};
142167
idle({call, From}, is_ready, _Data) ->
143168
{keep_state_and_data, [{reply, From, false}]};
@@ -186,8 +211,16 @@ insert_stream_interceptor(Name, _Type, Interceptors) ->
186211

187212
start_workers(Pool, StatsHandler, Encoding, Endpoints) ->
188213
[begin
189-
gproc_pool:add_worker(Pool, Endpoint),
190-
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, EndpointOptions},
191-
Encoding, StatsHandler),
192-
Pid
193-
end || Endpoint={Transport, Host, Port, EndpointOptions} <- Endpoints].
214+
gproc_pool:add_worker(Pool, Endpoint),
215+
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint,
216+
Pool, Endpoint, Encoding, StatsHandler),
217+
Pid
218+
end || Endpoint <- Endpoints].
219+
220+
stop_workers(Pool, Endpoints, Reason) ->
221+
[begin
222+
case gproc_pool:whereis_worker(Pool, Endpoint) of
223+
undefined -> ok;
224+
Pid -> grpcbox_subchannel:stop(Pid, Reason)
225+
end
226+
end || Endpoint <- Endpoints].

test/grpcbox_channel_SUITE.erl

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
-module(grpcbox_channel_SUITE).
2+
3+
-export([all/0,
4+
init_per_suite/1,
5+
end_per_suite/1,
6+
add_and_remove_endpoints/1]).
7+
8+
-include_lib("eunit/include/eunit.hrl").
9+
10+
all() ->
11+
[
12+
add_and_remove_endpoints
13+
].
14+
init_per_suite(_Config) ->
15+
application:set_env(grpcbox, servers, []),
16+
application:ensure_all_started(grpcbox),
17+
grpcbox_channel_sup:start_link(),
18+
grpcbox_channel_sup:start_child(default_channel, [{http, "127.0.0.1", 18080, []}], #{}),
19+
grpcbox_channel_sup:start_child(random_channel,
20+
[{http, "127.0.0.1", 18080, []}, {http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}],
21+
#{balancer => random}),
22+
grpcbox_channel_sup:start_child(hash_channel,
23+
[{http, "127.0.0.1", 18080, []}, {http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}],
24+
#{balancer => hash}),
25+
grpcbox_channel_sup:start_child(direct_channel,
26+
[{http, "127.0.0.1", 18080, []}, {http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.4", 18084, []}],
27+
#{ balancer => direct}),
28+
29+
_Config.
30+
31+
end_per_suite(_Config) ->
32+
application:stop(grpcbox),
33+
ok.
34+
35+
add_and_remove_endpoints(_Config) ->
36+
grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, []}, {http, "127.0.0.1", 18082, []}, {http, "127.0.0.1", 18083, []}]),
37+
?assertEqual(4, length(gproc_pool:active_workers(default_channel))),
38+
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, []}, {https, "127.0.0.1", 18082, []}, {https, "127.0.0.1", 18083, []}]),
39+
?assertEqual(7, length(gproc_pool:active_workers(default_channel))).

0 commit comments

Comments
 (0)