diff --git a/README.md b/README.md index 3edf48b..51bc6f1 100644 --- a/README.md +++ b/README.md @@ -413,8 +413,15 @@ se = ServerEngine.create(MyServer, MyWorker, { se.run ``` -See also [examples](https://github.com/fluent/serverengine/tree/master/examples). +Other features: + +- `socket_manager_server = SocketManager::Server.share_sockets_with_another_server(path)` + - It starts a new manager server that shares all UDP/TCP sockets with the existing manager. + - It means that another process can take over UDP/TCP sockets without downtime. + - The old process should stop without removing the file for the socket after the new process starts. + - Limitation: This feature would not work well if the process opens new TCP ports frequently. +See also [examples](https://github.com/fluent/serverengine/tree/master/examples). ## Module API diff --git a/lib/serverengine/socket_manager.rb b/lib/serverengine/socket_manager.rb index fe0e484..b0ef405 100644 --- a/lib/serverengine/socket_manager.rb +++ b/lib/serverengine/socket_manager.rb @@ -96,19 +96,32 @@ def self.open(path = nil) end end - def initialize(path) + def self.share_sockets_with_another_server(path) + raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows? + server = new(path, start: false) + server.share_sockets_with_another_server + server + end + + def initialize(path, start: true) @tcp_sockets = {} @udp_sockets = {} @mutex = Mutex.new - @path = start_server(path) + @path = start ? start_server(path) : path end attr_reader :path + attr_reader :tcp_sockets, :udp_sockets # for tests def new_client Client.new(@path) end + def start + start_server(path) + nil + end + def close stop_server nil @@ -159,9 +172,9 @@ def process_peer(peer) res = SocketManager.recv_peer(peer) return if res.nil? - pid, method, bind, port = *res + pid, method, *opts = res begin - send_socket(peer, pid, method, bind, port) + send_socket(peer, pid, method, *opts) rescue => e SocketManager.send_peer(peer, e) end diff --git a/lib/serverengine/socket_manager_unix.rb b/lib/serverengine/socket_manager_unix.rb index 625a831..1b1f46e 100644 --- a/lib/serverengine/socket_manager_unix.rb +++ b/lib/serverengine/socket_manager_unix.rb @@ -47,6 +47,38 @@ def recv_udp(family, peer, sent) end module ServerModule + def share_sockets_with_another_server + another_server = UNIXSocket.new(@path) + begin + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :share_udp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @udp_sockets[key] = another_server.recv_io UDPSocket + idx += 1 + end + + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :share_tcp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @tcp_sockets[key] = another_server.recv_io TCPServer + idx += 1 + end + + SocketManager.send_peer(another_server, [Process.pid, :share_unix]) + res = SocketManager.recv_peer(another_server) + raise res if res.is_a?(Exception) + @server = another_server.recv_io UNIXServer + + start_server(@path) + ensure + another_server.close + end + end + private def listen_tcp_new(bind_ip, port) @@ -77,15 +109,17 @@ def listen_udp_new(bind_ip, port) end def start_server(path) - # return absolute path so that client can connect to this path - # when client changed working directory - path = File.expand_path(path) + unless @server + # return absolute path so that client can connect to this path + # when client changed working directory + path = File.expand_path(path) - begin - old_umask = File.umask(0077) # Protect unix socket from other users - @server = UNIXServer.new(path) - ensure - File.umask(old_umask) + begin + old_umask = File.umask(0077) # Protect unix socket from other users + @server = UNIXServer.new(path) + ensure + File.umask(old_umask) + end end @thread = Thread.new do @@ -111,19 +145,34 @@ def stop_server @thread.join if RUBY_VERSION >= "2.2" end - def send_socket(peer, pid, method, bind, port) - sock = case method - when :listen_tcp - listen_tcp(bind, port) - when :listen_udp - listen_udp(bind, port) - else - raise ArgumentError, "Unknown method: #{method.inspect}" - end - - SocketManager.send_peer(peer, nil) - - peer.send_io sock + def send_socket(peer, pid, method, *opts) + case method + when :listen_tcp + bind, port = opts + sock = listen_tcp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :listen_udp + bind, port = opts + sock = listen_udp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :share_tcp + idx, = opts + key = @tcp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@tcp_sockets.values[idx]) if key + when :share_udp + idx, = opts + key = @udp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@udp_sockets.values[idx]) if key + when :share_unix + SocketManager.send_peer(peer, nil) + peer.send_io @server + else + raise ArgumentError, "Unknown method: #{method.inspect}" + end end end diff --git a/spec/socket_manager_spec.rb b/spec/socket_manager_spec.rb index c74e877..0a561a7 100644 --- a/spec/socket_manager_spec.rb +++ b/spec/socket_manager_spec.rb @@ -55,6 +55,15 @@ expect(server.path).to be_between(49152, 65535) end end + + context 'Server.share_sockets_with_another_server' do + it 'not supported' do + server = SocketManager::Server.open(server_path) + expect { SocketManager::Server.share_sockets_with_another_server(server_path) }.to raise_error(NotImplementedError) + ensure + server.close + end + end else context 'Server.generate_path' do it 'returns socket path under /tmp' do @@ -76,6 +85,148 @@ expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_') end end + + context 'Server.share_sockets_with_another_server' do + it 'takes over listen sockets to another server' do + server = SocketManager::Server.open(server_path) + + client = SocketManager::Client.new(server_path) + tcp1 = client.listen_tcp('127.0.0.1', 55551) + udp1 = client.listen_udp('127.0.0.1', 55561) + udp2 = client.listen_udp('127.0.0.1', 55562) + + another_server = SocketManager::Server.share_sockets_with_another_server(server_path) + + expect(another_server.tcp_sockets.size).to eq(1) + expect(another_server.tcp_sockets['localhost:55551'].addr).to eq(['AF_INET', 55551, '127.0.0.1', '127.0.0.1']) + + expect(another_server.udp_sockets.size).to eq(2) + expect(another_server.udp_sockets['localhost:55561'].addr).to eq(['AF_INET', 55561, '127.0.0.1', '127.0.0.1']) + expect(another_server.udp_sockets['localhost:55562'].addr).to eq(['AF_INET', 55562, '127.0.0.1', '127.0.0.1']) + ensure + tcp1&.close + udp1&.close + udp2&.close + server&.close + another_server&.close + end + + it 'takes over TCP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + thread_server = Thread.new do + server = manager_client.listen_tcp('127.0.0.1', test_port) + has_server_started = true + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + thread_client = Thread.new do + 100.times do |i| + socket = TCPSocket.new('127.0.0.1', test_port) + begin + socket.write("Hello #{i}\n") + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + thread_new_server = Thread.new do + new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path) + server = manager_client.listen_tcp('127.0.0.1', test_port) + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + new_manager_server&.close + server&.close + end + + thread_server.kill + thread_server.join + + thread_client.join + wait_for_stop + + expect(test_state(:count)).to eq(100) + ensure + manager_server&.close + thread_server&.kill + thread_new_server&.kill + thread_server&.join + thread_new_server&.join + end + + it 'takes over UDP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + thread_server = Thread.new do + server = manager_client.listen_udp('127.0.0.1', test_port) + has_server_started = true + while server.recv(10) + incr_test_state(:count) + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + thread_client = Thread.new do + 100.times do |i| + socket = UDPSocket.new + begin + socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port) + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + thread_new_server = Thread.new do + new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path) + server = manager_client.listen_udp('127.0.0.1', test_port) + while server.recv(10) + incr_test_state(:count) + end + ensure + new_manager_server&.close + server&.close + end + + thread_server.kill + thread_server.join + + thread_client.join + wait_for_stop + + expect(test_state(:count)).to eq(100) + ensure + manager_server&.close + thread_server&.kill + thread_new_server&.kill + thread_server&.join + thread_new_server&.join + end + end end context 'with thread' do