Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 95 additions & 31 deletions src/ered_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
queue_ok_level = 2000 :: non_neg_integer(),

max_waiting = 5000 :: non_neg_integer(),
max_pending = 128 :: non_neg_integer()
max_pending = 128 :: non_neg_integer(),
batch_size = 16 :: non_neg_integer()
}).

-record(st,
Expand All @@ -64,6 +65,10 @@
queue_full_event_sent = false :: boolean(), % set to true when full, false when reaching queue_ok_level
node_status = up :: up | node_down | node_deactivated,

transport_socket = none :: gen_tcp:socket() | ssl:sslsocket(),
recv_pid = none :: pid(),
Comment on lines +68 to +69
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport_socket is a tuple so it should have a tuple type like {module(), any()} or more exact {gen_tcp, gen_tcp:socket()} | {ssl, ssl:sslsocket()}.

Additionally, the type needs to allow none because it is set to none by default.

Same with recv_pid. The type needs to be none | pid().

I think lines these should be located just after to the other pids in the state, i.e. just after connect_loop_pid and connection_pid. (What is connect_loop_pid? Is it unused?)



node_down_timer = none :: none | reference(),
opts = #opts{}

Expand Down Expand Up @@ -115,6 +120,8 @@
%% If the queue has been full then it is considered ok
%% again when it reaches this level
{queue_ok_level, non_neg_integer()} |
%% Automatic batching when queue is full, turn off batching by setting size to 1.
{batch_size, non_neg_integer()} |
%% How long to wait to reconnect after a failed connect attempt
{reconnect_wait, non_neg_integer()} |
%% Pid to send status messages to
Expand Down Expand Up @@ -230,6 +237,7 @@ init({Host, Port, OptsList, User}) ->
fun({connection_opts, Val}, S) -> S#opts{connection_opts = Val};
({max_waiting, Val}, S) -> S#opts{max_waiting = Val};
({max_pending, Val}, S) -> S#opts{max_pending = Val};
({batch_size, Val}, S) -> S#opts{batch_size = Val};
({queue_ok_level, Val}, S) -> S#opts{queue_ok_level = Val};
({reconnect_wait, Val}, S) -> S#opts{reconnect_wait = Val};
({info_pid, Val}, S) -> S#opts{info_pid = Val};
Expand Down Expand Up @@ -276,8 +284,7 @@ handle_cast(reactivate, #st{connection_pid = none} = State) ->
handle_cast(reactivate, State) ->
{noreply, State#st{node_status = up}}.


handle_info({{command_reply, Pid}, Reply}, State = #st{pending = Pending, connection_pid = Pid}) ->
handle_info({{command_reply, _}, Reply}, State = #st{pending = Pending, connection_pid = _Pid}) ->
case q_out(Pending) of
empty ->
{noreply, State};
Expand All @@ -299,15 +306,15 @@ handle_info(Reason = {init_error, _Errors}, State) ->
handle_info(Reason = {socket_closed, _CloseReason}, State) ->
{noreply, connection_down(Reason, State)};

handle_info({connected, Pid, ClusterId}, State) ->
handle_info({connected, ConnectionPid, Socket, RecvPid, ClusterId}, State) ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the connection pid for? I don't think ered_client is using it for anything. We could remove it from this message.

What we need from ered_connection is Transport, Socket and the RecvPid. We could put these three in a tuple and treat it as an opaque connection handle. Technically, ered_client doesn't even need to know what it contains. It could just use it when calling functions in the ered_connection module.

State1 = cancel_node_down_timer(State),
State2 = State1#st{connection_pid = Pid, cluster_id = ClusterId,
State2 = State1#st{connection_pid = ConnectionPid, cluster_id = ClusterId,
node_status = case State1#st.node_status of
node_down -> up;
OldStatus -> OldStatus
end},
State3 = report_connection_status(connection_up, State2),
{noreply, process_commands(State3)};
{noreply, process_commands(State3#st{transport_socket = Socket, recv_pid = RecvPid})};

handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.node_down_timer ->
State1 = report_connection_status({connection_down, node_down_timeout}, State),
Expand Down Expand Up @@ -358,7 +365,8 @@ cancel_node_down_timer(#st{node_down_timer = TimerRef} = State) ->
connection_down(Reason, State) ->
State1 = State#st{waiting = q_join(State#st.pending, State#st.waiting),
pending = q_new(),
connection_pid = none},
connection_pid = none,
transport_socket = none},
State2 = process_commands(State1),
State3 = report_connection_status({connection_down, Reason}, State2),
start_node_down_timer(State3).
Expand All @@ -370,10 +378,10 @@ process_commands(State) ->
NumPending = q_len(State#st.pending),
if
(NumWaiting > 0) and (NumPending < State#st.opts#opts.max_pending) and (State#st.connection_pid /= none) ->
{Command, NewWaiting} = q_out(State#st.waiting),
Data = get_command_payload(Command),
ered_connection:command_async(State#st.connection_pid, Data, {command_reply, State#st.connection_pid}),
process_commands(State#st{pending = q_in(Command, State#st.pending),
{Commands, NewWaiting} = q_multi_out(State#st.opts#opts.batch_size, State#st.waiting),
Data = [get_command_payload(X) || X <- Commands],
batch_send(State#st.recv_pid, State#st.transport_socket, Data, {command_reply, State#st.connection_pid}),
process_commands(State#st{pending = q_in_multiple(Commands, State#st.pending),
waiting = NewWaiting});

(NumWaiting > State#st.opts#opts.max_waiting) and (State#st.queue_full_event_sent) ->
Expand Down Expand Up @@ -405,15 +413,30 @@ q_new() ->
{0, queue:new()}.

q_in(Item, {Size, Q}) ->
{Size+1, queue:in(Item, Q)}.
{Size + 1, queue:in(Item, Q)}.
q_in_multiple([], Q) ->
Q;
q_in_multiple([Item | Items] , {Size, Q}) ->
q_in_multiple(Items, {Size + 1, queue:in(Item, Q)}).

q_join({Size1, Q1}, {Size2, Q2}) ->
{Size1 + Size2, queue:join(Q1, Q2)}.

q_out({Size, Q}) ->
case queue:out(Q) of
{empty, _Q} -> empty;
{{value, Val}, NewQ} -> {Val, {Size-1, NewQ}}
{{value, Val}, NewQ} -> {Val, {Size - 1, NewQ}}
end.
q_multi_out(Nu, Queue) ->
q_multi_out(Nu, Queue, []).
q_multi_out(0, Q, Acc) ->
{lists:reverse(Acc), Q};
q_multi_out(Nu, {Size, Q}, Acc) ->
case queue:out(Q) of
{empty, _Q} ->
{lists:reverse(Acc), {Size,Q}};
{{value, Val}, NewQ} ->
q_multi_out(Nu - 1, {Size - 1, NewQ}, [Val | Acc])
end.

q_to_list({_Size, Q}) ->
Expand All @@ -422,7 +445,6 @@ q_to_list({_Size, Q}) ->
q_len({Size, _Q}) ->
Size.


reply_command({command, _, Fun}, Reply) ->
Fun(Reply).

Expand Down Expand Up @@ -478,28 +500,25 @@ send_info(_Msg, _State) ->
connect(Pid, Opts) ->
Result = ered_connection:connect(Opts#opts.host, Opts#opts.port, Opts#opts.connection_opts),
case Result of
{error, Reason} ->
Pid ! {connect_error, Reason},
timer:sleep(Opts#opts.reconnect_wait);

{ok, ConnectionPid} ->
case init(Pid, ConnectionPid, Opts) of
{socket_closed, ConnectionPid, Reason} ->
Pid ! {socket_closed, Reason},
timer:sleep(Opts#opts.reconnect_wait);
{ok, ConnectionPid, RecvPid, Socket} ->
case init(Pid, RecvPid, Socket, Opts) of
{ok, ClusterId} ->
Pid ! {connected, ConnectionPid, ClusterId},
Pid ! {connected, ConnectionPid, Socket, RecvPid, ClusterId},
receive
{socket_closed, ConnectionPid, Reason} ->
Pid ! {socket_closed, Reason}
end
end

end;
{socket_closed, ConnectionPid, Reason} ->
Pid ! {socket_closed, Reason},
timer:sleep(Opts#opts.reconnect_wait)
end;
{error, Reason} ->
Pid ! {connect_error, Reason},
timer:sleep(Opts#opts.reconnect_wait)
end,
connect(Pid, Opts).


init(MainPid, ConnectionPid, Opts) ->
init(MainPid, RecvPid, Socket, Opts) ->
Cmd1 = [[<<"CLUSTER">>, <<"MYID">>] || Opts#opts.use_cluster_id],
Cmd2 = case {Opts#opts.resp_version, Opts#opts.auth} of
{3, {Username, Password}} ->
Expand All @@ -517,7 +536,7 @@ init(MainPid, ConnectionPid, Opts) ->
[] ->
{ok, undefined};
Commands ->
ered_connection:command_async(ConnectionPid, Commands, init_command_reply),
send(RecvPid, Socket, Commands, init_command_reply),
receive
{init_command_reply, Reply} ->
case [Reason || {error, Reason} <- Reply] of
Expand All @@ -528,9 +547,54 @@ init(MainPid, ConnectionPid, Opts) ->
Errors ->
MainPid ! {init_error, Errors},
timer:sleep(Opts#opts.reconnect_wait),
init(MainPid, ConnectionPid, Opts)
init(MainPid, RecvPid, Socket, Opts)
end;
Other ->
Other
end
end.

send(RecvPid, {Socket, Transport}, Commands, Ref) ->
Time = erlang:monotonic_time(millisecond),
Commands2 = ered_command:convert_to(Commands),
Data = ered_command:get_data(Commands2),
Class = ered_command:get_response_class(Commands2),
RefInfo = {Class, self(), Ref, []},
case Transport:send(Socket, Data) of
ok ->
RecvPid ! {requests, [RefInfo], Time};
{error, Reason} ->
%% Give recv_loop time to finish processing
%% This will shut down recv_loop if it is waiting on socket
Transport:shutdown(Socket, read_write),
%% This will shut down recv_loop if it is waiting for a reference
RecvPid ! close_down,
%% Ok, recv done, time to die
receive {recv_exit, _Reason} -> ok end,
self() ! {socket_closed, Reason}
Comment on lines +570 to +574
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Time to die" is about the ered_connection process. It's not true for the ered_client where this code is running. We should edit the comment or remove it (or move it to ered_connection).

end.

batch_send(RecvPid, {Socket, Transport}, Commands, Ref) ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function has duplicated code with send. It's possible to refactor it to avoid the duplicated code.

Additionally, this code that builds RefInfo = {Class, self(), Ref, []} is designed just for the receiving process. Think about this as designing an API between the modules. This RefInfo stuff can be internal details to the ered_connection module. I think the send and batch_send functions should be moved to the ered_connection module, but still run in the caller's process.

To_Data = fun(Command) ->
Command2 = ered_command:convert_to(Command),
Data = ered_command:get_data(Command2),
Class = ered_command:get_response_class(Command2),
RefInfo = {Class, self(), Ref, []},
{Data, RefInfo}
end,
{Data, Refs} = lists:unzip([To_Data(Command) || Command <- Commands]),
Time = erlang:monotonic_time(millisecond),
case Transport:send(Socket, Data) of
ok ->
RecvPid ! {requests, Refs, Time};
{error, Reason} ->
%% Give recv_loop time to finish processing
%% This will shut down recv_loop if it is waiting on socket
Transport:shutdown(Socket, read_write),
%% This will shut down recv_loop if it is waiting for a reference
RecvPid ! close_down,
%% Ok, recv done, time to die
receive {recv_exit, _Reason} -> ok end,
self() ! {socket_closed, Reason}
end.

8 changes: 3 additions & 5 deletions src/ered_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ command(ClusterRef, Command, Key) ->
command(ClusterRef, Command, Key, infinity).

command(ClusterRef, Command, Key, Timeout) when is_binary(Key) ->
C = ered_command:convert_to(Command),
gen_server:call(ClusterRef, {command, C, Key}, Timeout).
gen_server:call(ClusterRef, {command, Command, Key}, Timeout).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we talked about: Let's revert these changes. It's good to call convert_to early, before the command is sent between processes. When it's being called again later, it's is a no-op.


%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any())) -> ok.
Expand All @@ -201,8 +200,7 @@ command(ClusterRef, Command, Key, Timeout) when is_binary(Key) ->
%% runs in an unspecified process.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
command_async(ServerRef, Command, Key, ReplyFun) when is_function(ReplyFun, 1) ->
C = ered_command:convert_to(Command),
gen_server:cast(ServerRef, {command_async, C, Key, ReplyFun}).
gen_server:cast(ServerRef, {command_async, Command, Key, ReplyFun}).

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec command_all(cluster_ref(), command()) -> [reply()].
Expand Down Expand Up @@ -663,7 +661,7 @@ create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft) ->
update_slots(Pid, SlotMapVersion, Client),
gen_server:cast(Pid, {forward_command, Command, Slot, From, Addr, AttemptsLeft-1});
{ask, Addr} ->
gen_server:cast(Pid, {forward_command_asking, Command, Slot, From, Addr, AttemptsLeft-1, Reply});
gen_server:cast(Pid, {forward_command_asking, ered_command:convert_to(Command), Slot, From, Addr, AttemptsLeft-1, Reply});
try_again ->
erlang:send_after(TryAgainDelay, Pid, {command_try_again, Command, Slot, From, AttemptsLeft-1});
cluster_down ->
Expand Down
68 changes: 9 additions & 59 deletions src/ered_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
}).

-type opt() ::
%% If commands are queued up in the process message queue this is the max
%% amount of messages that will be received and sent in one call
{batch_size, non_neg_integer()} |
%% Timeout passed to gen_tcp:connect/4 or ssl:connect/4.
{connect_timeout, timeout()} |
%% Options passed to gen_tcp:connect/4.
Expand Down Expand Up @@ -83,8 +80,8 @@ connect(Host, Port) ->
connect(Host, Port, Opts) ->
Pid = connect_async(Host, Port, Opts),
receive
{connected, Pid} ->
{ok, Pid};
{connected, Pid, RecvPid, Socket} ->
{ok, Pid, RecvPid, Socket};
{connect_error, Pid, Reason} ->
{error, Reason}
end.
Expand All @@ -104,9 +101,8 @@ connect(Host, Port, Opts) ->
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
connect_async(Addr, Port, Opts) ->
[error({badarg, BadOpt})
|| BadOpt <- proplists:get_keys(Opts) -- [batch_size, tcp_options, tls_options, push_cb, response_timeout,
|| BadOpt <- proplists:get_keys(Opts) -- [tcp_options, tls_options, push_cb, response_timeout,
tcp_connect_timeout, tls_connect_timeout, connect_timeout]],
BatchSize = proplists:get_value(batch_size, Opts, 16),
ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000),
PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end),
TcpOptions = proplists:get_value(tcp_options, Opts, []),
Expand All @@ -126,18 +122,21 @@ connect_async(Addr, Port, Opts) ->
SendPid = self(),
case catch Transport:connect(Addr, Port, [{active, false}, binary] ++ Options, Timeout) of
{ok, Socket} ->
Master ! {connected, SendPid},
Pid = spawn_link(fun() ->
ExitReason = recv_loop(Transport, Socket, PushCb, ResponseTimeout),
%% Inform sending process about exit
SendPid ! ExitReason
end),
ExitReason = send_loop(Transport, Socket, Pid, BatchSize),
Master ! {socket_closed, SendPid, ExitReason};
Master ! {connected, SendPid, Pid, {Socket, Transport}};
{error, Reason} ->
Master ! {connect_error, SendPid, Reason};
Other -> % {'EXIT',_}
Master ! {connect_error, SendPid, Other}
end,
%% Handle the exit of the receive_pid
receive
{recv_exit, R} ->
Master ! {socket_closed, SendPid, {recv_exit, R}}
end
end).

Expand Down Expand Up @@ -333,52 +332,3 @@ update_waiting(Timeout, State) when State#recv_st.waiting == [] ->
end;
update_waiting(_Timeout, State) ->
State.

%%
%% Send logic
%%

send_loop(Transport, Socket, RecvPid, BatchSize) ->
case receive_data(BatchSize) of
{recv_exit, Reason} ->
{recv_exit, Reason};
{data, {Refs, Data}} ->
Time = erlang:monotonic_time(millisecond),
case Transport:send(Socket, Data) of
ok ->
%% send to recv proc to fetch the response
RecvPid ! {requests, Refs, Time},
send_loop(Transport, Socket, RecvPid, BatchSize);
{error, Reason} ->
%% Give recv_loop time to finish processing
%% This will shut down recv_loop if it is waiting on socket
Transport:shutdown(Socket, read_write),
%% This will shut down recv_loop if it is waiting for a reference
RecvPid ! close_down,
%% Ok, recv done, time to die
receive {recv_exit, _Reason} -> ok end,
{send_exit, Reason}
end
end.

receive_data(N) ->
receive_data(N, infinity, []).

receive_data(0, _Time, Acc) ->
{data, lists:unzip(lists:reverse(Acc))};
receive_data(N, Time, Acc) ->
receive
Msg ->
case Msg of
{recv_exit, Reason} ->
{recv_exit, Reason};
{send, Pid, Ref, Commands} ->
Data = ered_command:get_data(Commands),
Class = ered_command:get_response_class(Commands),
RefInfo = {Class, Pid, Ref, []},
Acc1 = [{RefInfo, Data} | Acc],
receive_data(N - 1, 0, Acc1)
end
after Time ->
receive_data(0, 0, Acc)
end.
Loading
Loading