Skip to content

Commit

Permalink
socket_manager: add feature to share sockets with another server
Browse files Browse the repository at this point in the history
Another process can take over UDP/TCP sockets without downtime.

    server = ServerEngine::SocketManager::Server.share_sockets_with_another_server(path)

This starts a new server that shares all UDP/TCP sockets with
the existing server.
The old process should stop without removing the file for the
socket after the new process starts.

This may not be the primary use case assumed by ServerEngine, but
we need this feature to replace both the server and the workers
with a new process without downtime.
Currently, ServerEngine does not provide this feature for
network servers.

At the moment, I assume that the application side uses this
feature ad hoc, but, in the future, this could be used to support
live reload for entire network servers.

ref: fluent/fluentd#4622

Limitation: This feature would not work well if the process
opens new TCP ports frequently.

Signed-off-by: Daijiro Fukuda <[email protected]>
Co-authored-by: Shizuo Fujita <[email protected]>
  • Loading branch information
daipom and Watson1978 committed Oct 21, 2024
1 parent 5e9d11e commit 00eba2a
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 26 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,15 @@ se = ServerEngine.create(MyServer, MyWorker, {
se.run
```

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
Other features:

- `socket_manager_server = SocketManager::Server.share_sockets_with_another_server(path)`
- It starts a new manager server that shares all UDP/TCP sockets with the existing manager.
- It means that another process can take over UDP/TCP sockets without downtime.
- The old process should stop without removing the file for the socket after the new process starts.
- Limitation: This feature would not work well if the process opens new TCP ports frequently.

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).

## Module API

Expand Down
21 changes: 17 additions & 4 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,32 @@ def self.open(path = nil)
end
end

def initialize(path)
def self.share_sockets_with_another_server(path)
raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows?
server = new(path, start: false)
server.share_sockets_with_another_server
server
end

def initialize(path, start: true)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
@path = start ? start_server(path) : path
end

attr_reader :path
attr_reader :tcp_sockets, :udp_sockets # for tests

def new_client
Client.new(@path)
end

def start
start_server(path)
nil
end

def close
stop_server
nil
Expand Down Expand Up @@ -159,9 +172,9 @@ def process_peer(peer)
res = SocketManager.recv_peer(peer)
return if res.nil?

pid, method, bind, port = *res
pid, method, *opts = res
begin
send_socket(peer, pid, method, bind, port)
send_socket(peer, pid, method, *opts)
rescue => e
SocketManager.send_peer(peer, e)
end
Expand Down
91 changes: 70 additions & 21 deletions lib/serverengine/socket_manager_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,38 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def share_sockets_with_another_server
another_server = UNIXSocket.new(@path)
begin
idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :share_udp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@udp_sockets[key] = another_server.recv_io UDPSocket
idx += 1
end

idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :share_tcp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@tcp_sockets[key] = another_server.recv_io TCPServer
idx += 1
end

SocketManager.send_peer(another_server, [Process.pid, :share_unix])
res = SocketManager.recv_peer(another_server)
raise res if res.is_a?(Exception)
@server = another_server.recv_io UNIXServer

start_server(@path)
ensure
another_server.close
end
end

private

def listen_tcp_new(bind_ip, port)
Expand Down Expand Up @@ -77,15 +109,17 @@ def listen_udp_new(bind_ip, port)
end

def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)
unless @server
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end
end

@thread = Thread.new do
Expand All @@ -111,19 +145,34 @@ def stop_server
@thread.join if RUBY_VERSION >= "2.2"
end

def send_socket(peer, pid, method, bind, port)
sock = case method
when :listen_tcp
listen_tcp(bind, port)
when :listen_udp
listen_udp(bind, port)
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end

SocketManager.send_peer(peer, nil)

peer.send_io sock
def send_socket(peer, pid, method, *opts)
case method
when :listen_tcp
bind, port = opts
sock = listen_tcp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :listen_udp
bind, port = opts
sock = listen_udp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :share_tcp
idx, = opts
key = @tcp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@tcp_sockets.values[idx]) if key
when :share_udp
idx, = opts
key = @udp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@udp_sockets.values[idx]) if key
when :share_unix
SocketManager.send_peer(peer, nil)
peer.send_io @server
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end
end
end

Expand Down
151 changes: 151 additions & 0 deletions spec/socket_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
expect(server.path).to be_between(49152, 65535)
end
end

context 'Server.share_sockets_with_another_server' do
it 'not supported' do
server = SocketManager::Server.open(server_path)
expect { SocketManager::Server.share_sockets_with_another_server(server_path) }.to raise_error(NotImplementedError)
ensure
server.close
end
end
else
context 'Server.generate_path' do
it 'returns socket path under /tmp' do
Expand All @@ -76,6 +85,148 @@
expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_')
end
end

context 'Server.share_sockets_with_another_server' do
it 'takes over listen sockets to another server' do
server = SocketManager::Server.open(server_path)

client = SocketManager::Client.new(server_path)
tcp1 = client.listen_tcp('127.0.0.1', 55551)
udp1 = client.listen_udp('127.0.0.1', 55561)
udp2 = client.listen_udp('127.0.0.1', 55562)

another_server = SocketManager::Server.share_sockets_with_another_server(server_path)

expect(another_server.tcp_sockets.size).to eq(1)
expect(another_server.tcp_sockets['localhost:55551'].addr).to eq(['AF_INET', 55551, '127.0.0.1', '127.0.0.1'])

expect(another_server.udp_sockets.size).to eq(2)
expect(another_server.udp_sockets['localhost:55561'].addr).to eq(['AF_INET', 55561, '127.0.0.1', '127.0.0.1'])
expect(another_server.udp_sockets['localhost:55562'].addr).to eq(['AF_INET', 55562, '127.0.0.1', '127.0.0.1'])
ensure
tcp1&.close
udp1&.close
udp2&.close
server&.close
another_server&.close
end

it 'takes over TCP sockets without downtime' do
manager_server = SocketManager::Server.open(server_path)
manager_client = SocketManager::Client.new(server_path)

has_server_started = false
thread_server = Thread.new do
server = manager_client.listen_tcp('127.0.0.1', test_port)
has_server_started = true
while socket = server.accept
incr_test_state(:count)
socket.close
end
ensure
server&.close
end

sleep 0.1 until has_server_started

thread_client = Thread.new do
100.times do |i|
socket = TCPSocket.new('127.0.0.1', test_port)
begin
socket.write("Hello #{i}\n")
ensure
socket.close
end
sleep 0.01
end
end

sleep 0.5

thread_new_server = Thread.new do
new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path)
server = manager_client.listen_tcp('127.0.0.1', test_port)
while socket = server.accept
incr_test_state(:count)
socket.close
end
ensure
new_manager_server&.close
server&.close
end

thread_server.kill
thread_server.join

thread_client.join
wait_for_stop

expect(test_state(:count)).to eq(100)
ensure
manager_server&.close
thread_server&.kill
thread_new_server&.kill
thread_server&.join
thread_new_server&.join
end

it 'takes over UDP sockets without downtime' do
manager_server = SocketManager::Server.open(server_path)
manager_client = SocketManager::Client.new(server_path)

has_server_started = false
thread_server = Thread.new do
server = manager_client.listen_udp('127.0.0.1', test_port)
has_server_started = true
while server.recv(10)
incr_test_state(:count)
end
ensure
server&.close
end

sleep 0.1 until has_server_started

thread_client = Thread.new do
100.times do |i|
socket = UDPSocket.new
begin
socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port)
ensure
socket.close
end
sleep 0.01
end
end

sleep 0.5

thread_new_server = Thread.new do
new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path)
server = manager_client.listen_udp('127.0.0.1', test_port)
while server.recv(10)
incr_test_state(:count)
end
ensure
new_manager_server&.close
server&.close
end

thread_server.kill
thread_server.join

thread_client.join
wait_for_stop

expect(test_state(:count)).to eq(100)
ensure
manager_server&.close
thread_server&.kill
thread_new_server&.kill
thread_server&.join
thread_new_server&.join
end
end
end

context 'with thread' do
Expand Down

0 comments on commit 00eba2a

Please sign in to comment.