From 772d7dfa672038fb0745eee7eb608799ef81b9a1 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Mon, 21 Oct 2024 09:48:49 +0900 Subject: [PATCH] socket_manager: add feature to take over another server Another process can take over UDP/TCP sockets without downtime. server = ServerEngine::SocketManager::Server.take_over_another_server(path) This starts a new server that has all UDP/TCP sockets of the existing server. The old process should stop without removing the file for the socket after the new process starts. This may not be the primary use case assumed by ServerEngine, but we need this feature to replace both the server and the workers with a new process without downtime. Currently, ServerEngine does not provide this feature for network servers. At the moment, I assume that the application side uses this feature ad hoc, but, in the future, this could be used to support live reload for entire network servers. ref: https://github.com/fluent/fluentd/issues/4622 Signed-off-by: Daijiro Fukuda Co-authored-by: Shizuo Fujita --- README.md | 8 +- lib/serverengine/socket_manager.rb | 16 ++- lib/serverengine/socket_manager_unix.rb | 129 +++++++++++++------- lib/serverengine/socket_manager_win.rb | 40 +++---- spec/socket_manager_spec.rb | 149 ++++++++++++++++++++++++ 5 files changed, 277 insertions(+), 65 deletions(-) diff --git a/README.md b/README.md index 3edf48b..6031c42 100644 --- a/README.md +++ b/README.md @@ -413,8 +413,14 @@ se = ServerEngine.create(MyServer, MyWorker, { se.run ``` -See also [examples](https://github.com/fluent/serverengine/tree/master/examples). +Other features: + +- `socket_manager_server = SocketManager::Server.take_over_another_server(path)` + - It starts a new manager server that has all UDP/TCP sockets of the existing manager. + - It means that another process can take over UDP/TCP sockets without downtime. + - The old process should stop without removing the file for the socket after the new process starts. +See also [examples](https://github.com/fluent/serverengine/tree/master/examples). ## Module API diff --git a/lib/serverengine/socket_manager.rb b/lib/serverengine/socket_manager.rb index fe0e484..83d5f42 100644 --- a/lib/serverengine/socket_manager.rb +++ b/lib/serverengine/socket_manager.rb @@ -96,14 +96,22 @@ def self.open(path = nil) end end - def initialize(path) + def self.take_over_another_server(path) + raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows? + server = new(path, start: false) + server.take_over_another_server + server + end + + def initialize(path, start: true) @tcp_sockets = {} @udp_sockets = {} @mutex = Mutex.new - @path = start_server(path) + @path = start ? start_server(path) : path end attr_reader :path + attr_reader :tcp_sockets, :udp_sockets # for tests def new_client Client.new(@path) @@ -159,9 +167,9 @@ def process_peer(peer) res = SocketManager.recv_peer(peer) return if res.nil? - pid, method, bind, port = *res + pid, method, *opts = res begin - send_socket(peer, pid, method, bind, port) + send_socket(peer, pid, method, *opts) rescue => e SocketManager.send_peer(peer, e) end diff --git a/lib/serverengine/socket_manager_unix.rb b/lib/serverengine/socket_manager_unix.rb index 625a831..addfb70 100644 --- a/lib/serverengine/socket_manager_unix.rb +++ b/lib/serverengine/socket_manager_unix.rb @@ -47,6 +47,67 @@ def recv_udp(family, peer, sent) end module ServerModule + def start_server(path) + unless @server + # return absolute path so that client can connect to this path + # when client changed working directory + path = File.expand_path(path) + + begin + old_umask = File.umask(0077) # Protect unix socket from other users + @server = UNIXServer.new(path) + ensure + File.umask(old_umask) + end + end + + @thread = Thread.new do + begin + while peer = @server.accept + Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket + end + rescue => e + unless @server.closed? + ServerEngine.dump_uncaught_error(e) + end + end + end + + return path + end + + def take_over_another_server + another_server = UNIXSocket.new(@path) + begin + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :get_listening_tcp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @tcp_sockets[key] = another_server.recv_io TCPServer + idx += 1 + end + + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :get_listening_udp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @udp_sockets[key] = another_server.recv_io UDPSocket + idx += 1 + end + + SocketManager.send_peer(another_server, [Process.pid, :get_unix]) + res = SocketManager.recv_peer(another_server) + raise res if res.is_a?(Exception) + @server = another_server.recv_io UNIXServer + + start_server(@path) + ensure + another_server.close + end + end + private def listen_tcp_new(bind_ip, port) @@ -76,33 +137,6 @@ def listen_udp_new(bind_ip, port) UDPSocket.for_fd(usock.fileno) end - def start_server(path) - # return absolute path so that client can connect to this path - # when client changed working directory - path = File.expand_path(path) - - begin - old_umask = File.umask(0077) # Protect unix socket from other users - @server = UNIXServer.new(path) - ensure - File.umask(old_umask) - end - - @thread = Thread.new do - begin - while peer = @server.accept - Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket - end - rescue => e - unless @server.closed? - ServerEngine.dump_uncaught_error(e) - end - end - end - - return path - end - def stop_server @tcp_sockets.reject! {|key,lsock| lsock.close; true } @udp_sockets.reject! {|key,usock| usock.close; true } @@ -111,19 +145,34 @@ def stop_server @thread.join if RUBY_VERSION >= "2.2" end - def send_socket(peer, pid, method, bind, port) - sock = case method - when :listen_tcp - listen_tcp(bind, port) - when :listen_udp - listen_udp(bind, port) - else - raise ArgumentError, "Unknown method: #{method.inspect}" - end - - SocketManager.send_peer(peer, nil) - - peer.send_io sock + def send_socket(peer, pid, method, *opts) + case method + when :listen_tcp + bind, port = opts + sock = listen_tcp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :listen_udp + bind, port = opts + sock = listen_udp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :get_listening_tcp + idx, = opts + key = @tcp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@tcp_sockets.values[idx]) if key + when :get_listening_udp + idx, = opts + key = @udp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@udp_sockets.values[idx]) if key + when :get_unix + SocketManager.send_peer(peer, nil) + peer.send_io @server + else + raise ArgumentError, "Unknown method: #{method.inspect}" + end end end diff --git a/lib/serverengine/socket_manager_win.rb b/lib/serverengine/socket_manager_win.rb index f7a7e26..42acaa6 100644 --- a/lib/serverengine/socket_manager_win.rb +++ b/lib/serverengine/socket_manager_win.rb @@ -58,6 +58,26 @@ def recv_udp(family, peer, sent) end module ServerModule + def start_server(addr) + # We need to take care about selecting an available port. + # By passing `nil` or `0` as `addr`, an available port is automatically selected. + # However, we should consider using NamedPipe instead of TCPServer. + @server = TCPServer.new("127.0.0.1", addr) + @thread = Thread.new do + begin + while peer = @server.accept + Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket + end + rescue => e + unless @server.closed? + ServerEngine.dump_uncaught_error(e) + end + end + end + + return @server.addr[1] + end + private TCP_OPTIONS = [Socket::SOCK_STREAM, Socket::IPPROTO_TCP, TCPServer, true] @@ -107,26 +127,6 @@ def htons(h) [h].pack("S").unpack("n")[0] end - def start_server(addr) - # We need to take care about selecting an available port. - # By passing `nil` or `0` as `addr`, an available port is automatically selected. - # However, we should consider using NamedPipe instead of TCPServer. - @server = TCPServer.new("127.0.0.1", addr) - @thread = Thread.new do - begin - while peer = @server.accept - Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket - end - rescue => e - unless @server.closed? - ServerEngine.dump_uncaught_error(e) - end - end - end - - return @server.addr[1] - end - def stop_server @tcp_sockets.reject! {|key,lsock| lsock.close; true } @udp_sockets.reject! {|key,usock| usock.close; true } diff --git a/spec/socket_manager_spec.rb b/spec/socket_manager_spec.rb index c74e877..4ff554c 100644 --- a/spec/socket_manager_spec.rb +++ b/spec/socket_manager_spec.rb @@ -55,6 +55,15 @@ expect(server.path).to be_between(49152, 65535) end end + + context 'Server.take_over_another_server' do + it 'not supported' do + server = SocketManager::Server.open(server_path) + expect { SocketManager::Server.take_over_another_server(server_path) }.to raise_error(NotImplementedError) + ensure + server.close + end + end else context 'Server.generate_path' do it 'returns socket path under /tmp' do @@ -76,6 +85,146 @@ expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_') end end + + context 'Server.take_over_another_server' do + it 'takes over listen sockets to another server' do + server = SocketManager::Server.open(server_path) + + client = SocketManager::Client.new(server_path) + tcp1 = client.listen_tcp('127.0.0.1', 55551) + udp1 = client.listen_udp('127.0.0.1', 55561) + udp2 = client.listen_udp('127.0.0.1', 55562) + + another_server = SocketManager::Server.take_over_another_server(server_path) + + expect(another_server.tcp_sockets.size).to eq(1) + expect(another_server.tcp_sockets['localhost:55551'].addr).to eq(['AF_INET', 55551, '127.0.0.1', '127.0.0.1']) + + expect(another_server.udp_sockets.size).to eq(2) + expect(another_server.udp_sockets['localhost:55561'].addr).to eq(['AF_INET', 55561, '127.0.0.1', '127.0.0.1']) + expect(another_server.udp_sockets['localhost:55562'].addr).to eq(['AF_INET', 55562, '127.0.0.1', '127.0.0.1']) + ensure + tcp1&.close + udp1&.close + udp2&.close + server&.close + another_server&.close + end + + it 'takes over TCP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + thread_server = Thread.new do + server = manager_client.listen_tcp('127.0.0.1', test_port) + has_server_started = true + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + thread_client = Thread.new do + 100.times do |i| + socket = TCPSocket.new('127.0.0.1', test_port) + begin + socket.write("Hello #{i}\n") + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + child_pid = fork do + new_manager_server = SocketManager::Server.take_over_another_server(server_path) + server = manager_client.listen_tcp('127.0.0.1', test_port) + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + new_manager_server&.close + server&.close + end + + thread_client.join + wait_for_stop + + expect(test_state(:count)).to eq(100) + ensure + if child_pid + Process.kill :TERM, child_pid + Process.waitpid(child_pid) + end + manager_server&.close + thread_server&.kill + thread_server&.join + end + + it 'takes over UDP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + thread_server = Thread.new do + server = manager_client.listen_udp('127.0.0.1', test_port) + has_server_started = true + while server.recv(10) + incr_test_state(:count) + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + thread_client = Thread.new do + 100.times do |i| + socket = UDPSocket.new + begin + socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port) + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + child_pid = fork do + new_manager_server = SocketManager::Server.take_over_another_server(server_path) + server = manager_client.listen_udp('127.0.0.1', test_port) + while server.recv(10) + incr_test_state(:count) + end + ensure + new_manager_server&.close + server&.close + end + + thread_client.join + wait_for_stop + + expect(test_state(:count)).to eq(100) + ensure + if child_pid + Process.kill :TERM, child_pid + Process.waitpid(child_pid) + end + manager_server&.close + thread_server&.kill + thread_server&.join + end + end end context 'with thread' do