diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 4910f73..0b36dbd 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -67,7 +67,6 @@ convergence_check = nok :: convergence_check(), info_pid = [] :: [pid()], - update_delay = 1000, % 1s delay between slot map update requests client_opts = [], update_slot_wait = 500, min_replicas = 0, @@ -256,9 +255,18 @@ handle_info(Msg = {connection_status, {Pid, Addr, _Id}, Status}, State0) -> pending = sets:del_element(Addr, State#st.pending), reconnecting = sets:del_element(Addr, State#st.reconnecting)}; connection_up -> - State#st{up = sets:add_element(Addr, State#st.up), - pending = sets:del_element(Addr, State#st.pending), - reconnecting = sets:del_element(Addr, State#st.reconnecting)}; + NewState = State#st{up = sets:add_element(Addr, State#st.up), + pending = sets:del_element(Addr, State#st.pending)}, + %% If we already have known masters, but this node is the + %% first to be up, then we probably have had a longer + %% interruption that requires a slotmap update. + case (sets:is_empty(State#st.up) andalso + not sets:is_empty(State#st.masters)) of + true -> + start_periodic_slot_info_request(NewState); + false -> + NewState + end; queue_full -> State#st{queue_full = sets:add_element(Addr, State#st.queue_full)}; queue_ok -> @@ -664,7 +672,7 @@ start_clients(Addrs, State) -> {State#st.nodes, State#st.closing}, Addrs), - State#st{nodes = maps:merge(State#st.nodes, NewNodes), + State#st{nodes = NewNodes, pending = sets:union(State#st.pending, sets:subtract(new_set(maps:keys(NewNodes)), State#st.up)), diff --git a/test/ered_SUITE.erl b/test/ered_SUITE.erl index 84741b4..ce0eb0b 100644 --- a/test/ered_SUITE.erl +++ b/test/ered_SUITE.erl @@ -16,6 +16,7 @@ all() -> t_manual_failover, t_manual_failover_then_old_master_down, t_blackhole, + t_blackhole_all_nodes, t_init_timeout, t_empty_slotmap, t_empty_initial_slotmap, @@ -467,6 +468,63 @@ t_blackhole(_) -> no_more_msgs(). +t_blackhole_all_nodes(_) -> + %% Simulate that all nodes are unreachable, e.g. a network failure. We use + %% 'docket pause', similar to sending SIGSTOP to a process, to make the + %% nodes unresponsive. This makes TCP recv() and connect() time out. + CloseWait = 2000, % default is 10000 + NodeDownTimeout = 2000, % default is 2000 + ResponseTimeout = 10000, % default is 10000 + R = start_cluster([{close_wait, CloseWait}, + %% Require replicas for 'cluster OK'. + {min_replicas, 1}, + {client_opts, + [{node_down_timeout, NodeDownTimeout}, + {connection_opts, + [{response_timeout, ResponseTimeout}]}]} + ]), + + %% Pause all nodes + lists:foreach(fun(Port) -> + Pod = get_pod_name_from_port(Port), + ct:pal("Pausing container: " ++ os:cmd("docker pause " ++ Pod)) + end, ?PORTS), + + %% Send PING to all nodes and expect closed sockets, error replies for sent requests, + %% and a report that the cluster is not ok. + TestPid = self(), + AddrToPid = ered:get_addr_to_client_map(R), + maps:foreach(fun(_ClientAddr, ClientPid) -> + ered:command_client_async(ClientPid, [<<"PING">>], + fun(Reply) -> TestPid ! {ping_reply, Reply} end) + end, AddrToPid), + + [?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, addr := {"127.0.0.1", Port}}, + ResponseTimeout + 1000) || Port <- ?PORTS], + ?MSG({ping_reply, {error, _Reason1}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason2}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason3}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason4}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason5}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason6}}, NodeDownTimeout + 1000), + [?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}) || Port <- ?PORTS], + ?MSG(#{msg_type := cluster_not_ok, reason := master_down}), + + %% Unpause all nodes + lists:foreach(fun(Port) -> + Pod = get_pod_name_from_port(Port), + ct:pal("Unpausing container: " ++ os:cmd("docker unpause " ++ Pod)) + end, ?PORTS), + timer:sleep(500), + + wait_for_consistent_cluster(), + + %% Expect connects and a cluster ok. + [?MSG(#{msg_type := connected, addr := {"127.0.0.1", Port}}, 10000) || Port <- ?PORTS], + ?MSG(#{msg_type := cluster_ok}, 10000), + + no_more_msgs(). + t_init_timeout(_) -> Opts = [