Skip to content

Commit b7ade80

Browse files
committed
Implement jsonrpc handler over unix domain socket
1 parent e9e9a57 commit b7ade80

File tree

9 files changed

+678
-2
lines changed

9 files changed

+678
-2
lines changed

cmd/juno/juno.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ const (
4141
wsF = "ws"
4242
wsHostF = "ws-host"
4343
wsPortF = "ws-port"
44+
ipcF = "ipc"
45+
ipcPathF = "ipc-path"
4446
dbPathF = "db-path"
4547
networkF = "network"
4648
ethNodeF = "eth-node"
@@ -71,6 +73,7 @@ const (
7173
defaultHTTPPort = 6060
7274
defaultWS = false
7375
defaultWSPort = 6061
76+
defaultIpc = false
7477
defaultEthNode = ""
7578
defaultPprof = false
7679
defaultPprofPort = 6062
@@ -96,6 +99,8 @@ const (
9699
wsUsage = "Enables the Websocket RPC server on the default port."
97100
wsHostUsage = "The interface on which the Websocket RPC server will listen for requests."
98101
wsPortUsage = "The port on which the websocket server will listen for requests."
102+
ipcUsage = "Enables the IPC RPC server."
103+
ipcPathUsage = "The path on which the IPC RPC server will listen for requests."
99104
dbPathUsage = "Location of the database files."
100105
networkUsage = "Options: mainnet, goerli, goerli2, integration, sepolia, sepolia-integration."
101106
pprofUsage = "Enables the pprof endpoint on the default port."
@@ -226,6 +231,8 @@ func NewCmd(config *node.Config, run func(*cobra.Command, []string) error) *cobr
226231
junoCmd.Flags().Bool(wsF, defaultWS, wsUsage)
227232
junoCmd.Flags().String(wsHostF, defaulHost, wsHostUsage)
228233
junoCmd.Flags().Uint16(wsPortF, defaultWSPort, wsPortUsage)
234+
junoCmd.Flags().Bool(ipcF, defaultIpc, ipcUsage)
235+
junoCmd.Flags().String(ipcPathF, defaultDBPath, ipcPathUsage)
229236
junoCmd.Flags().String(dbPathF, defaultDBPath, dbPathUsage)
230237
junoCmd.Flags().Var(&defaultNetwork, networkF, networkUsage)
231238
junoCmd.Flags().String(ethNodeF, defaultEthNode, ethNodeUsage)

cmd/juno/juno_test.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func TestConfigPrecedence(t *testing.T) {
3333
defaultHTTPPort := uint16(6060)
3434
defaultWS := false
3535
defaultWSPort := uint16(6061)
36+
defaultIpc := false
3637
defaultDBPath := filepath.Join(pwd, "juno")
3738
defaultNetwork := utils.Mainnet
3839
defaultPprof := false
@@ -65,6 +66,8 @@ func TestConfigPrecedence(t *testing.T) {
6566
Websocket: defaultWS,
6667
WebsocketHost: defaultHost,
6768
WebsocketPort: defaultWSPort,
69+
IPC: defaultIpc,
70+
IPCPath: defaultDBPath,
6871
DatabasePath: defaultDBPath,
6972
Network: defaultNetwork,
7073
Pprof: defaultPprof,
@@ -94,6 +97,8 @@ func TestConfigPrecedence(t *testing.T) {
9497
Websocket: defaultWS,
9598
WebsocketHost: defaultHost,
9699
WebsocketPort: defaultWSPort,
100+
IPC: defaultIpc,
101+
IPCPath: defaultDBPath,
97102
GRPC: defaultGRPC,
98103
GRPCHost: defaultHost,
99104
GRPCPort: defaultGRPCPort,
@@ -128,6 +133,8 @@ func TestConfigPrecedence(t *testing.T) {
128133
Websocket: defaultWS,
129134
WebsocketHost: defaultHost,
130135
WebsocketPort: defaultWSPort,
136+
IPC: defaultIpc,
137+
IPCPath: defaultDBPath,
131138
GRPC: defaultGRPC,
132139
GRPCHost: defaultHost,
133140
GRPCPort: defaultGRPCPort,
@@ -164,6 +171,8 @@ pprof: true
164171
Websocket: defaultWS,
165172
WebsocketHost: defaultHost,
166173
WebsocketPort: defaultWSPort,
174+
IPC: defaultIpc,
175+
IPCPath: defaultDBPath,
167176
GRPC: defaultGRPC,
168177
GRPCHost: defaultHost,
169178
GRPCPort: defaultGRPCPort,
@@ -197,6 +206,8 @@ http-port: 4576
197206
Websocket: defaultWS,
198207
WebsocketHost: defaultHost,
199208
WebsocketPort: defaultWSPort,
209+
IPC: defaultIpc,
210+
IPCPath: defaultDBPath,
200211
GRPC: defaultGRPC,
201212
GRPCHost: defaultHost,
202213
GRPCPort: defaultGRPCPort,
@@ -229,6 +240,8 @@ http-port: 4576
229240
Websocket: defaultWS,
230241
WebsocketHost: defaultHost,
231242
WebsocketPort: defaultWSPort,
243+
IPC: defaultIpc,
244+
IPCPath: defaultDBPath,
232245
GRPC: defaultGRPC,
233246
GRPCHost: defaultHost,
234247
GRPCPort: defaultGRPCPort,
@@ -260,6 +273,8 @@ http-port: 4576
260273
Websocket: defaultWS,
261274
WebsocketHost: defaultHost,
262275
WebsocketPort: defaultWSPort,
276+
IPC: defaultIpc,
277+
IPCPath: defaultDBPath,
263278
GRPC: defaultGRPC,
264279
GRPCHost: defaultHost,
265280
GRPCPort: defaultGRPCPort,
@@ -288,6 +303,8 @@ http-port: 4576
288303
ws: true
289304
ws-host: 0.0.0.0
290305
ws-port: 4576
306+
ipc: true
307+
ipc-path: /home/config-file/.juno
291308
metrics: true
292309
metrics-host: 0.0.0.0
293310
metrics-port: 4576
@@ -304,7 +321,7 @@ db-cache-size: 8
304321
`,
305322
inputArgs: []string{
306323
"--log-level", "error", "--http", "--http-port", "4577", "--http-host", "127.0.0.1", "--ws", "--ws-port", "4577", "--ws-host", "127.0.0.1",
307-
"--grpc", "--grpc-port", "4577", "--grpc-host", "127.0.0.1", "--metrics", "--metrics-port", "4577", "--metrics-host", "127.0.0.1",
324+
"--ipc", "--ipc-path", "/home/flag/.juno", "--grpc", "--grpc-port", "4577", "--grpc-host", "127.0.0.1", "--metrics", "--metrics-port", "4577", "--metrics-host", "127.0.0.1",
308325
"--db-path", "/home/flag/.juno", "--network", "integration", "--pprof", "--pending-poll-interval", time.Millisecond.String(),
309326
"--db-cache-size", "9",
310327
},
@@ -316,6 +333,8 @@ db-cache-size: 8
316333
Websocket: true,
317334
WebsocketHost: "127.0.0.1",
318335
WebsocketPort: 4577,
336+
IPC: true,
337+
IPCPath: "/home/flag/.juno",
319338
Metrics: true,
320339
MetricsHost: "127.0.0.1",
321340
MetricsPort: 4577,
@@ -351,6 +370,8 @@ network: goerli
351370
Websocket: defaultWS,
352371
WebsocketHost: defaultHost,
353372
WebsocketPort: defaultWSPort,
373+
IPC: defaultIpc,
374+
IPCPath: defaultDBPath,
354375
GRPC: defaultGRPC,
355376
GRPCHost: defaultHost,
356377
GRPCPort: defaultGRPCPort,
@@ -373,7 +394,7 @@ network: goerli
373394
"some setting set in default, config file and flags": {
374395
cfgFile: true,
375396
cfgFileContents: `network: goerli2`,
376-
inputArgs: []string{"--db-path", "/home/flag/.juno", "--pprof"},
397+
inputArgs: []string{"--ipc", "--ipc-path", "/home/flag/.juno", "--db-path", "/home/flag/.juno", "--pprof"},
377398
expectedConfig: &node.Config{
378399
LogLevel: defaultLogLevel,
379400
HTTP: defaultHTTP,
@@ -382,6 +403,8 @@ network: goerli
382403
Websocket: defaultWS,
383404
WebsocketHost: defaultHost,
384405
WebsocketPort: defaultWSPort,
406+
IPC: true,
407+
IPCPath: "/home/flag/.juno",
385408
GRPC: defaultGRPC,
386409
GRPCHost: defaultHost,
387410
GRPCPort: defaultGRPCPort,

jsonrpc/ipc.go

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
package jsonrpc
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"net"
9+
"os"
10+
"path/filepath"
11+
"sync"
12+
"syscall"
13+
"time"
14+
15+
"github.com/NethermindEth/juno/utils"
16+
"github.com/ethereum/go-ethereum/p2p/netutil"
17+
"github.com/sourcegraph/conc"
18+
)
19+
20+
const (
21+
// On Linux, path is 108 bytes in size.
22+
// http://man7.org/linux/man-pages/man7/unix.7.html
23+
maxUnixPathBytes = 108
24+
// fileModePerms represents the permission mode for directories, which is set to 751.
25+
// This mode is stricter than the common default of 755 for directories.
26+
fileModePerms = 0o751
27+
// socketPerms represents the permission mode for socket files, set to 600. It allows
28+
// read and write access for the owner only, ensuring that socket is accessible only to the intended user.
29+
socketPerms = 0o600
30+
)
31+
32+
// createListener creates a Unix domain socket at the given path and sets proper permissions.
33+
func createListener(endpoint string) (net.Listener, error) {
34+
// path + terminator
35+
if len(endpoint)+1 > maxUnixPathBytes {
36+
return nil, errors.New("path too long")
37+
}
38+
// Try connecting first; if it works, then the socket is still live,
39+
// so let's abort the creation of a new one.
40+
if c, err := net.Dial("unix", endpoint); err == nil {
41+
c.Close()
42+
return nil, fmt.Errorf("%v: address already in use", endpoint)
43+
}
44+
45+
if err := os.MkdirAll(filepath.Dir(endpoint), fileModePerms); err != nil {
46+
return nil, err
47+
}
48+
// Remove any existing file at the specified path.
49+
if err := os.Remove(endpoint); !os.IsNotExist(err) {
50+
return nil, err
51+
}
52+
l, err := net.Listen("unix", endpoint)
53+
if err != nil {
54+
return nil, err
55+
}
56+
// Set permissions for the socket file to read and write for the owner only (0o600)
57+
err = os.Chmod(endpoint, socketPerms)
58+
return l, err
59+
}
60+
61+
type Ipc struct {
62+
rpc *Server
63+
events NewRequestListener
64+
log utils.SimpleLogger
65+
66+
connWg conc.WaitGroup // connWg is a WaitGroup for tracking active connections.
67+
68+
connParams IpcConnParams
69+
listener net.Listener
70+
71+
// everything below is protected
72+
mu sync.Mutex
73+
conns map[net.Conn]struct{} // conns is a map that holds active connections.
74+
}
75+
76+
// NewIpc creates a new IPC handler instance with the provided RPC server, endpoint and logger.
77+
func NewIpc(rpc *Server, endpoint string, log utils.SimpleLogger) (*Ipc, error) {
78+
listener, err := createListener(endpoint)
79+
if err != nil {
80+
return nil, err
81+
}
82+
return &Ipc{
83+
rpc: rpc,
84+
log: log,
85+
connParams: DefaultIpcConnParams(),
86+
conns: make(map[net.Conn]struct{}),
87+
events: &SelectiveListener{},
88+
listener: listener,
89+
}, nil
90+
}
91+
92+
// WithConnParams sanity checks and applies the provided params.
93+
func (i *Ipc) WithConnParams(p *IpcConnParams) *Ipc {
94+
i.connParams = *p
95+
return i
96+
}
97+
98+
// WithListener registers a NewRequestListener
99+
func (i *Ipc) WithListener(listener NewRequestListener) *Ipc {
100+
i.events = listener
101+
return i
102+
}
103+
104+
// Run launches the IPC handler and handles any potential errors.
105+
// It is the caller's responsibility to cancel the provided context when they wish to
106+
// gracefully shut down the IPC handler.
107+
func (i *Ipc) Run(ctx context.Context) error {
108+
var wg conc.WaitGroup
109+
defer wg.Wait()
110+
111+
ctx, cancel := context.WithCancel(ctx)
112+
defer cancel()
113+
114+
wg.Go(func() {
115+
defer cancel()
116+
for {
117+
conn, err := i.listener.Accept()
118+
if netutil.IsTemporaryError(err) {
119+
i.log.Warnw("Failed to accept connection", "err", err)
120+
continue
121+
} else if err != nil {
122+
if !isSocketError(err) {
123+
i.log.Warnw("Accept connection", "err", err)
124+
}
125+
return
126+
}
127+
i.connWg.Go(func() {
128+
i.serveConn(ctx, newIpcConn(conn, i.connParams))
129+
})
130+
}
131+
})
132+
133+
<-ctx.Done()
134+
return i.stop()
135+
}
136+
137+
// cleanupConnNoLock frees resources
138+
func (i *Ipc) cleanupConnNoLock(conn net.Conn) {
139+
_, ok := i.conns[conn]
140+
delete(i.conns, conn)
141+
if !ok {
142+
return
143+
}
144+
if err := conn.Close(); err != nil {
145+
i.log.Warnw("Error occurred while closing connection", "err", err)
146+
}
147+
}
148+
149+
// serveConn handles incoming connection.
150+
func (i *Ipc) serveConn(ctx context.Context, conn net.Conn) {
151+
defer func() {
152+
i.mu.Lock()
153+
defer i.mu.Unlock()
154+
i.cleanupConnNoLock(conn)
155+
}()
156+
i.mu.Lock()
157+
i.conns[conn] = struct{}{}
158+
i.mu.Unlock()
159+
if ctx.Err() != nil {
160+
return
161+
}
162+
163+
var err error
164+
for err == nil {
165+
i.events.OnNewRequest("any")
166+
err = i.rpc.HandleReadWriter(ctx, conn)
167+
}
168+
169+
if isSocketError(err) || isPipeError(err) {
170+
return
171+
}
172+
173+
i.log.Warnw("Closing ipc connection due to internal error", "err", err)
174+
}
175+
176+
// stop gracefully shuts down the IPC handler.
177+
func (i *Ipc) stop() error {
178+
i.mu.Lock()
179+
defer func() {
180+
i.mu.Unlock()
181+
i.connWg.Wait()
182+
}()
183+
err := i.listener.Close()
184+
for conn := range i.conns {
185+
i.cleanupConnNoLock(conn)
186+
}
187+
return err
188+
}
189+
190+
type IpcConnParams struct {
191+
// Maximum time to write a message.
192+
WriteDuration time.Duration
193+
}
194+
195+
type ipcConn struct {
196+
IpcConnParams
197+
net.Conn
198+
}
199+
200+
func DefaultIpcConnParams() IpcConnParams {
201+
return IpcConnParams{
202+
WriteDuration: 5 * time.Second,
203+
}
204+
}
205+
206+
func newIpcConn(conn net.Conn, params IpcConnParams) *ipcConn {
207+
return &ipcConn{
208+
IpcConnParams: params,
209+
Conn: conn,
210+
}
211+
}
212+
213+
func (ipc *ipcConn) Write(p []byte) (int, error) {
214+
if err := ipc.Conn.SetWriteDeadline(time.Now().Add(ipc.WriteDuration)); err != nil {
215+
return 0, err
216+
}
217+
return ipc.Conn.Write(p)
218+
}
219+
220+
func isSocketError(err error) bool {
221+
return errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF)
222+
}
223+
224+
func isPipeError(err error) bool {
225+
// broken pipe || conn reset
226+
return errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET)
227+
}

0 commit comments

Comments
 (0)