Skip to content

Commit

Permalink
Implement jsonrpc handler over unix domain socket
Browse files Browse the repository at this point in the history
  • Loading branch information
Exca-DK committed Dec 15, 2023
1 parent 23bcc52 commit 0a23c76
Show file tree
Hide file tree
Showing 9 changed files with 636 additions and 2 deletions.
7 changes: 7 additions & 0 deletions cmd/juno/juno.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
wsF = "ws"
wsHostF = "ws-host"
wsPortF = "ws-port"
ipcF = "ipc"
ipcPathF = "ipc-path"
dbPathF = "db-path"
networkF = "network"
ethNodeF = "eth-node"
Expand Down Expand Up @@ -69,6 +71,7 @@ const (
defaultHTTPPort = 6060
defaultWS = false
defaultWSPort = 6061
defaultIpc = false
defaultEthNode = ""
defaultPprof = false
defaultPprofPort = 6062
Expand All @@ -93,6 +96,8 @@ const (
wsUsage = "Enables the Websocket RPC server on the default port."
wsHostUsage = "The interface on which the Websocket RPC server will listen for requests."
wsPortUsage = "The port on which the websocket server will listen for requests."
ipcUsage = "Enables the IPC RPC server."
ipcPathUsage = "The path on which the IPC RPC server will listen for requests."
dbPathUsage = "Location of the database files."
networkUsage = "Options: mainnet, goerli, goerli2, integration, sepolia, sepolia-integration."
pprofUsage = "Enables the pprof endpoint on the default port."
Expand Down Expand Up @@ -219,6 +224,8 @@ func NewCmd(config *node.Config, run func(*cobra.Command, []string) error) *cobr
junoCmd.Flags().Bool(wsF, defaultWS, wsUsage)
junoCmd.Flags().String(wsHostF, defaulHost, wsHostUsage)
junoCmd.Flags().Uint16(wsPortF, defaultWSPort, wsPortUsage)
junoCmd.Flags().Bool(ipcF, defaultIpc, ipcUsage)
junoCmd.Flags().String(ipcPathF, defaultDBPath, ipcPathUsage)
junoCmd.Flags().String(dbPathF, defaultDBPath, dbPathUsage)
junoCmd.Flags().Var(&defaultNetwork, networkF, networkUsage)
junoCmd.Flags().String(ethNodeF, defaultEthNode, ethNodeUsage)
Expand Down
27 changes: 25 additions & 2 deletions cmd/juno/juno_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestConfigPrecedence(t *testing.T) {
defaultHTTPPort := uint16(6060)
defaultWS := false
defaultWSPort := uint16(6061)
defaultIpc := false
defaultDBPath := filepath.Join(pwd, "juno")
defaultNetwork := utils.Mainnet
defaultPprof := false
Expand Down Expand Up @@ -64,6 +65,8 @@ func TestConfigPrecedence(t *testing.T) {
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
DatabasePath: defaultDBPath,
Network: defaultNetwork,
Pprof: defaultPprof,
Expand Down Expand Up @@ -93,6 +96,8 @@ func TestConfigPrecedence(t *testing.T) {
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -127,6 +132,8 @@ func TestConfigPrecedence(t *testing.T) {
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -163,6 +170,8 @@ pprof: true
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -196,6 +205,8 @@ http-port: 4576
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -228,6 +239,8 @@ http-port: 4576
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -259,6 +272,8 @@ http-port: 4576
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -287,6 +302,8 @@ http-port: 4576
ws: true
ws-host: 0.0.0.0
ws-port: 4576
ipc: true
ipc-path: /home/config-file/.juno
metrics: true
metrics-host: 0.0.0.0
metrics-port: 4576
Expand All @@ -303,7 +320,7 @@ db-cache-size: 8
`,
inputArgs: []string{
"--log-level", "error", "--http", "--http-port", "4577", "--http-host", "127.0.0.1", "--ws", "--ws-port", "4577", "--ws-host", "127.0.0.1",
"--grpc", "--grpc-port", "4577", "--grpc-host", "127.0.0.1", "--metrics", "--metrics-port", "4577", "--metrics-host", "127.0.0.1",
"--ipc", "--ipc-path", "/home/flag/.juno", "--grpc", "--grpc-port", "4577", "--grpc-host", "127.0.0.1", "--metrics", "--metrics-port", "4577", "--metrics-host", "127.0.0.1",
"--db-path", "/home/flag/.juno", "--network", "integration", "--pprof", "--pending-poll-interval", time.Millisecond.String(),
"--db-cache-size", "9",
},
Expand All @@ -315,6 +332,8 @@ db-cache-size: 8
Websocket: true,
WebsocketHost: "127.0.0.1",
WebsocketPort: 4577,
IPC: true,
IPCPath: "/home/flag/.juno",
Metrics: true,
MetricsHost: "127.0.0.1",
MetricsPort: 4577,
Expand Down Expand Up @@ -350,6 +369,8 @@ network: goerli
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand All @@ -372,7 +393,7 @@ network: goerli
"some setting set in default, config file and flags": {
cfgFile: true,
cfgFileContents: `network: goerli2`,
inputArgs: []string{"--db-path", "/home/flag/.juno", "--pprof"},
inputArgs: []string{"--ipc", "--ipc-path", "/home/flag/.juno", "--db-path", "/home/flag/.juno", "--pprof"},
expectedConfig: &node.Config{
LogLevel: defaultLogLevel,
HTTP: defaultHTTP,
Expand All @@ -381,6 +402,8 @@ network: goerli
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: true,
IPCPath: "/home/flag/.juno",
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down
222 changes: 222 additions & 0 deletions jsonrpc/ipc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package jsonrpc

import (
"context"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/NethermindEth/juno/utils"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/sourcegraph/conc"
)

const (
// On Linux, path is 108 bytes in size.
// http://man7.org/linux/man-pages/man7/unix.7.html
maxUnixPathBytes = 108
// fileModePerms represents the permission mode for directories, which is set to 751.
// This mode is stricter than the common default of 755 for directories.
fileModePerms = 0o751
// socketPerms represents the permission mode for socket files, set to 600. It allows
// read and write access for the owner only, ensuring that socket is accessible only to the intended user.
socketPerms = 0o600
)

// createListener creates a Unix domain socket at the given path and sets proper permissions.
func createListener(endpoint string) (net.Listener, error) {
// path + terminator
if len(endpoint)+1 > maxUnixPathBytes {
return nil, errors.New("path too long")
}
// Try connecting first; if it works, then the socket is still live,
// so let's abort the creation of a new one.
if c, err := net.Dial("unix", endpoint); err == nil {
c.Close()
return nil, fmt.Errorf("%v: address already in use", endpoint)
}

if err := os.MkdirAll(filepath.Dir(endpoint), fileModePerms); err != nil {
return nil, err
}
// Remove any existing file at the specified path.
if err := os.Remove(endpoint); !os.IsNotExist(err) {
return nil, err
}
l, err := net.Listen("unix", endpoint)
if err != nil {
return nil, err
}
// Set permissions for the socket file to read and write for the owner only (0o600)
err = os.Chmod(endpoint, socketPerms)
return l, err
}

type Ipc struct {
rpc *Server
events NewRequestListener
log utils.SimpleLogger

connWg conc.WaitGroup // connWg is a WaitGroup for tracking active connections.

connParams IpcConnParams
listener net.Listener

// everything below is protected
mu sync.Mutex
conns map[net.Conn]struct{} // conns is a map that holds active connections.
}

// NewIpc creates a new IPC handler instance with the provided RPC server, endpoint and logger.
func NewIpc(rpc *Server, endpoint string, log utils.SimpleLogger) (*Ipc, error) {
listener, err := createListener(endpoint)
if err != nil {
return nil, err
}
return &Ipc{
rpc: rpc,
log: log,
connParams: DefaultIpcConnParams(),
conns: make(map[net.Conn]struct{}),
events: &SelectiveListener{},
listener: listener,
}, nil
}

// WithConnParams sanity checks and applies the provided params.
func (i *Ipc) WithConnParams(p *IpcConnParams) *Ipc {
i.connParams = *p
return i
}

// WithListener registers a NewRequestListener
func (i *Ipc) WithListener(listener NewRequestListener) *Ipc {
i.events = listener
return i
}

// Run launches the IPC handler and handles any potential errors.
// It is the caller's responsibility to cancel the provided context when they wish to
// gracefully shut down the IPC handler.
func (i *Ipc) Run(ctx context.Context) error {
var wg conc.WaitGroup
defer wg.Wait()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

wg.Go(func() {
defer cancel()
for {
conn, err := i.listener.Accept()
if netutil.IsTemporaryError(err) {
i.log.Warnw("Failed to accept connection", "err", err)
continue
} else if err != nil {
i.log.Warnw("Accept connection", "err", err)
return
}
i.connWg.Go(func() {
i.serveConn(ctx, newIpcConn(conn, i.connParams))
})
}
})

<-ctx.Done()
return i.stop()
}

// cleanupConnNoLock frees resources
func (i *Ipc) cleanupConnNoLock(conn net.Conn) {
_, ok := i.conns[conn]
delete(i.conns, conn)
if ok {
conn.Close()
}
}

// serveConn handles incoming connection.
func (i *Ipc) serveConn(ctx context.Context, conn net.Conn) {
defer func() {
i.mu.Lock()
defer i.mu.Unlock()
i.cleanupConnNoLock(conn)
}()
i.mu.Lock()
i.conns[conn] = struct{}{}
i.mu.Unlock()
if ctx.Err() != nil {
return
}

var err error
for err == nil {
i.events.OnNewRequest("any")
err = i.rpc.HandleReadWriter(ctx, conn)
}

if isSocketError(err) || isPipeError(err) {
return
}

i.log.Warnw("Closing ipc connection due to internal error", "err", err)
}

// stop gracefully shuts down the IPC handler.
func (i *Ipc) stop() error {
i.mu.Lock()
defer func() {
i.mu.Unlock()
i.connWg.Wait()
}()
err := i.listener.Close()
for conn := range i.conns {
i.cleanupConnNoLock(conn)
}
return err
}

type IpcConnParams struct {
// Maximum time to write a message.
WriteDuration time.Duration
}

type ipcConn struct {
IpcConnParams
net.Conn
}

func DefaultIpcConnParams() IpcConnParams {
return IpcConnParams{
WriteDuration: 5 * time.Second,
}
}

func newIpcConn(conn net.Conn, params IpcConnParams) *ipcConn {
return &ipcConn{
IpcConnParams: params,
Conn: conn,
}
}

func (ipc *ipcConn) Write(p []byte) (int, error) {
if err := ipc.Conn.SetWriteDeadline(time.Now().Add(ipc.WriteDuration)); err != nil {
return 0, err
}
return ipc.Conn.Write(p)
}

func isSocketError(err error) bool {
return errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF)
}

func isPipeError(err error) bool {
// broken pipe || conn reset
return errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET)
}
Loading

0 comments on commit 0a23c76

Please sign in to comment.