From 00eba2a88ef87dd26c03f751eb7199590d6eb98b Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Mon, 21 Oct 2024 14:44:44 +0900 Subject: [PATCH] socket_manager: add feature to share sockets with another server Another process can take over UDP/TCP sockets without downtime. server = ServerEngine::SocketManager::Server.share_sockets_with_another_server(path) This starts a new server that shares all UDP/TCP sockets with the existing server. The old process should stop without removing the file for the socket after the new process starts. This may not be the primary use case assumed by ServerEngine, but we need this feature to replace both the server and the workers with a new process without downtime. Currently, ServerEngine does not provide this feature for network servers. At the moment, I assume that the application side uses this feature ad hoc, but, in the future, this could be used to support live reload for entire network servers. ref: https://github.com/fluent/fluentd/issues/4622 Limitation: This feature would not work well if the process opens new TCP ports frequently. Signed-off-by: Daijiro Fukuda Co-authored-by: Shizuo Fujita --- README.md | 9 +- lib/serverengine/socket_manager.rb | 21 +++- lib/serverengine/socket_manager_unix.rb | 91 ++++++++++---- spec/socket_manager_spec.rb | 151 ++++++++++++++++++++++++ 4 files changed, 246 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 3edf48b..51bc6f1 100644 --- a/README.md +++ b/README.md @@ -413,8 +413,15 @@ se = ServerEngine.create(MyServer, MyWorker, { se.run ``` -See also [examples](https://github.com/fluent/serverengine/tree/master/examples). +Other features: + +- `socket_manager_server = SocketManager::Server.share_sockets_with_another_server(path)` + - It starts a new manager server that shares all UDP/TCP sockets with the existing manager. + - It means that another process can take over UDP/TCP sockets without downtime. + - The old process should stop without removing the file for the socket after the new process starts. + - Limitation: This feature would not work well if the process opens new TCP ports frequently. +See also [examples](https://github.com/fluent/serverengine/tree/master/examples). ## Module API diff --git a/lib/serverengine/socket_manager.rb b/lib/serverengine/socket_manager.rb index fe0e484..b0ef405 100644 --- a/lib/serverengine/socket_manager.rb +++ b/lib/serverengine/socket_manager.rb @@ -96,19 +96,32 @@ def self.open(path = nil) end end - def initialize(path) + def self.share_sockets_with_another_server(path) + raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows? + server = new(path, start: false) + server.share_sockets_with_another_server + server + end + + def initialize(path, start: true) @tcp_sockets = {} @udp_sockets = {} @mutex = Mutex.new - @path = start_server(path) + @path = start ? start_server(path) : path end attr_reader :path + attr_reader :tcp_sockets, :udp_sockets # for tests def new_client Client.new(@path) end + def start + start_server(path) + nil + end + def close stop_server nil @@ -159,9 +172,9 @@ def process_peer(peer) res = SocketManager.recv_peer(peer) return if res.nil? - pid, method, bind, port = *res + pid, method, *opts = res begin - send_socket(peer, pid, method, bind, port) + send_socket(peer, pid, method, *opts) rescue => e SocketManager.send_peer(peer, e) end diff --git a/lib/serverengine/socket_manager_unix.rb b/lib/serverengine/socket_manager_unix.rb index 625a831..1b1f46e 100644 --- a/lib/serverengine/socket_manager_unix.rb +++ b/lib/serverengine/socket_manager_unix.rb @@ -47,6 +47,38 @@ def recv_udp(family, peer, sent) end module ServerModule + def share_sockets_with_another_server + another_server = UNIXSocket.new(@path) + begin + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :share_udp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @udp_sockets[key] = another_server.recv_io UDPSocket + idx += 1 + end + + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :share_tcp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @tcp_sockets[key] = another_server.recv_io TCPServer + idx += 1 + end + + SocketManager.send_peer(another_server, [Process.pid, :share_unix]) + res = SocketManager.recv_peer(another_server) + raise res if res.is_a?(Exception) + @server = another_server.recv_io UNIXServer + + start_server(@path) + ensure + another_server.close + end + end + private def listen_tcp_new(bind_ip, port) @@ -77,15 +109,17 @@ def listen_udp_new(bind_ip, port) end def start_server(path) - # return absolute path so that client can connect to this path - # when client changed working directory - path = File.expand_path(path) + unless @server + # return absolute path so that client can connect to this path + # when client changed working directory + path = File.expand_path(path) - begin - old_umask = File.umask(0077) # Protect unix socket from other users - @server = UNIXServer.new(path) - ensure - File.umask(old_umask) + begin + old_umask = File.umask(0077) # Protect unix socket from other users + @server = UNIXServer.new(path) + ensure + File.umask(old_umask) + end end @thread = Thread.new do @@ -111,19 +145,34 @@ def stop_server @thread.join if RUBY_VERSION >= "2.2" end - def send_socket(peer, pid, method, bind, port) - sock = case method - when :listen_tcp - listen_tcp(bind, port) - when :listen_udp - listen_udp(bind, port) - else - raise ArgumentError, "Unknown method: #{method.inspect}" - end - - SocketManager.send_peer(peer, nil) - - peer.send_io sock + def send_socket(peer, pid, method, *opts) + case method + when :listen_tcp + bind, port = opts + sock = listen_tcp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :listen_udp + bind, port = opts + sock = listen_udp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :share_tcp + idx, = opts + key = @tcp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@tcp_sockets.values[idx]) if key + when :share_udp + idx, = opts + key = @udp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@udp_sockets.values[idx]) if key + when :share_unix + SocketManager.send_peer(peer, nil) + peer.send_io @server + else + raise ArgumentError, "Unknown method: #{method.inspect}" + end end end diff --git a/spec/socket_manager_spec.rb b/spec/socket_manager_spec.rb index c74e877..0a561a7 100644 --- a/spec/socket_manager_spec.rb +++ b/spec/socket_manager_spec.rb @@ -55,6 +55,15 @@ expect(server.path).to be_between(49152, 65535) end end + + context 'Server.share_sockets_with_another_server' do + it 'not supported' do + server = SocketManager::Server.open(server_path) + expect { SocketManager::Server.share_sockets_with_another_server(server_path) }.to raise_error(NotImplementedError) + ensure + server.close + end + end else context 'Server.generate_path' do it 'returns socket path under /tmp' do @@ -76,6 +85,148 @@ expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_') end end + + context 'Server.share_sockets_with_another_server' do + it 'takes over listen sockets to another server' do + server = SocketManager::Server.open(server_path) + + client = SocketManager::Client.new(server_path) + tcp1 = client.listen_tcp('127.0.0.1', 55551) + udp1 = client.listen_udp('127.0.0.1', 55561) + udp2 = client.listen_udp('127.0.0.1', 55562) + + another_server = SocketManager::Server.share_sockets_with_another_server(server_path) + + expect(another_server.tcp_sockets.size).to eq(1) + expect(another_server.tcp_sockets['localhost:55551'].addr).to eq(['AF_INET', 55551, '127.0.0.1', '127.0.0.1']) + + expect(another_server.udp_sockets.size).to eq(2) + expect(another_server.udp_sockets['localhost:55561'].addr).to eq(['AF_INET', 55561, '127.0.0.1', '127.0.0.1']) + expect(another_server.udp_sockets['localhost:55562'].addr).to eq(['AF_INET', 55562, '127.0.0.1', '127.0.0.1']) + ensure + tcp1&.close + udp1&.close + udp2&.close + server&.close + another_server&.close + end + + it 'takes over TCP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + thread_server = Thread.new do + server = manager_client.listen_tcp('127.0.0.1', test_port) + has_server_started = true + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + thread_client = Thread.new do + 100.times do |i| + socket = TCPSocket.new('127.0.0.1', test_port) + begin + socket.write("Hello #{i}\n") + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + thread_new_server = Thread.new do + new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path) + server = manager_client.listen_tcp('127.0.0.1', test_port) + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + new_manager_server&.close + server&.close + end + + thread_server.kill + thread_server.join + + thread_client.join + wait_for_stop + + expect(test_state(:count)).to eq(100) + ensure + manager_server&.close + thread_server&.kill + thread_new_server&.kill + thread_server&.join + thread_new_server&.join + end + + it 'takes over UDP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + thread_server = Thread.new do + server = manager_client.listen_udp('127.0.0.1', test_port) + has_server_started = true + while server.recv(10) + incr_test_state(:count) + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + thread_client = Thread.new do + 100.times do |i| + socket = UDPSocket.new + begin + socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port) + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + thread_new_server = Thread.new do + new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path) + server = manager_client.listen_udp('127.0.0.1', test_port) + while server.recv(10) + incr_test_state(:count) + end + ensure + new_manager_server&.close + server&.close + end + + thread_server.kill + thread_server.join + + thread_client.join + wait_for_stop + + expect(test_state(:count)).to eq(100) + ensure + manager_server&.close + thread_server&.kill + thread_new_server&.kill + thread_server&.join + thread_new_server&.join + end + end end context 'with thread' do