Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jsonrpc: implement unix handler #1362

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/juno/juno.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
wsF = "ws"
wsHostF = "ws-host"
wsPortF = "ws-port"
ipcF = "ipc"
ipcPathF = "ipc-path"
dbPathF = "db-path"
networkF = "network"
ethNodeF = "eth-node"
Expand Down Expand Up @@ -71,6 +73,7 @@ const (
defaultHTTPPort = 6060
defaultWS = false
defaultWSPort = 6061
defaultIpc = false
defaultEthNode = ""
defaultPprof = false
defaultPprofPort = 6062
Expand All @@ -96,6 +99,8 @@ const (
wsUsage = "Enables the Websocket RPC server on the default port."
wsHostUsage = "The interface on which the Websocket RPC server will listen for requests."
wsPortUsage = "The port on which the websocket server will listen for requests."
ipcUsage = "Enables the IPC RPC server."
ipcPathUsage = "The path on which the IPC RPC server will listen for requests."
dbPathUsage = "Location of the database files."
networkUsage = "Options: mainnet, goerli, goerli2, integration, sepolia, sepolia-integration."
pprofUsage = "Enables the pprof endpoint on the default port."
Expand Down Expand Up @@ -226,6 +231,8 @@ func NewCmd(config *node.Config, run func(*cobra.Command, []string) error) *cobr
junoCmd.Flags().Bool(wsF, defaultWS, wsUsage)
junoCmd.Flags().String(wsHostF, defaulHost, wsHostUsage)
junoCmd.Flags().Uint16(wsPortF, defaultWSPort, wsPortUsage)
junoCmd.Flags().Bool(ipcF, defaultIpc, ipcUsage)
junoCmd.Flags().String(ipcPathF, defaultDBPath, ipcPathUsage)
junoCmd.Flags().String(dbPathF, defaultDBPath, dbPathUsage)
junoCmd.Flags().Var(&defaultNetwork, networkF, networkUsage)
junoCmd.Flags().String(ethNodeF, defaultEthNode, ethNodeUsage)
Expand Down
27 changes: 25 additions & 2 deletions cmd/juno/juno_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestConfigPrecedence(t *testing.T) {
defaultHTTPPort := uint16(6060)
defaultWS := false
defaultWSPort := uint16(6061)
defaultIpc := false
defaultDBPath := filepath.Join(pwd, "juno")
defaultNetwork := utils.Mainnet
defaultPprof := false
Expand Down Expand Up @@ -65,6 +66,8 @@ func TestConfigPrecedence(t *testing.T) {
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
DatabasePath: defaultDBPath,
Network: defaultNetwork,
Pprof: defaultPprof,
Expand Down Expand Up @@ -94,6 +97,8 @@ func TestConfigPrecedence(t *testing.T) {
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -128,6 +133,8 @@ func TestConfigPrecedence(t *testing.T) {
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -164,6 +171,8 @@ pprof: true
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -197,6 +206,8 @@ http-port: 4576
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -229,6 +240,8 @@ http-port: 4576
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -260,6 +273,8 @@ http-port: 4576
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down Expand Up @@ -288,6 +303,8 @@ http-port: 4576
ws: true
ws-host: 0.0.0.0
ws-port: 4576
ipc: true
ipc-path: /home/config-file/.juno
metrics: true
metrics-host: 0.0.0.0
metrics-port: 4576
Expand All @@ -304,7 +321,7 @@ db-cache-size: 8
`,
inputArgs: []string{
"--log-level", "error", "--http", "--http-port", "4577", "--http-host", "127.0.0.1", "--ws", "--ws-port", "4577", "--ws-host", "127.0.0.1",
"--grpc", "--grpc-port", "4577", "--grpc-host", "127.0.0.1", "--metrics", "--metrics-port", "4577", "--metrics-host", "127.0.0.1",
"--ipc", "--ipc-path", "/home/flag/.juno", "--grpc", "--grpc-port", "4577", "--grpc-host", "127.0.0.1", "--metrics", "--metrics-port", "4577", "--metrics-host", "127.0.0.1",
"--db-path", "/home/flag/.juno", "--network", "integration", "--pprof", "--pending-poll-interval", time.Millisecond.String(),
"--db-cache-size", "9",
},
Expand All @@ -316,6 +333,8 @@ db-cache-size: 8
Websocket: true,
WebsocketHost: "127.0.0.1",
WebsocketPort: 4577,
IPC: true,
IPCPath: "/home/flag/.juno",
Metrics: true,
MetricsHost: "127.0.0.1",
MetricsPort: 4577,
Expand Down Expand Up @@ -351,6 +370,8 @@ network: goerli
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: defaultIpc,
IPCPath: defaultDBPath,
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand All @@ -373,7 +394,7 @@ network: goerli
"some setting set in default, config file and flags": {
cfgFile: true,
cfgFileContents: `network: goerli2`,
inputArgs: []string{"--db-path", "/home/flag/.juno", "--pprof"},
inputArgs: []string{"--ipc", "--ipc-path", "/home/flag/.juno", "--db-path", "/home/flag/.juno", "--pprof"},
expectedConfig: &node.Config{
LogLevel: defaultLogLevel,
HTTP: defaultHTTP,
Expand All @@ -382,6 +403,8 @@ network: goerli
Websocket: defaultWS,
WebsocketHost: defaultHost,
WebsocketPort: defaultWSPort,
IPC: true,
IPCPath: "/home/flag/.juno",
GRPC: defaultGRPC,
GRPCHost: defaultHost,
GRPCPort: defaultGRPCPort,
Expand Down
227 changes: 227 additions & 0 deletions jsonrpc/ipc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package jsonrpc

import (
"context"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/NethermindEth/juno/utils"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/sourcegraph/conc"
)

const (
// On Linux, path is 108 bytes in size.
// http://man7.org/linux/man-pages/man7/unix.7.html
maxUnixPathBytes = 108
// fileModePerms represents the permission mode for directories, which is set to 751.
// This mode is stricter than the common default of 755 for directories.
fileModePerms = 0o751
// socketPerms represents the permission mode for socket files, set to 600. It allows
// read and write access for the owner only, ensuring that socket is accessible only to the intended user.
socketPerms = 0o600
)

// createListener creates a Unix domain socket at the given path and sets proper permissions.
func createListener(endpoint string) (net.Listener, error) {
// path + terminator
if len(endpoint)+1 > maxUnixPathBytes {
return nil, errors.New("path too long")
}
// Try connecting first; if it works, then the socket is still live,
// so let's abort the creation of a new one.
if c, err := net.Dial("unix", endpoint); err == nil {
c.Close()
return nil, fmt.Errorf("%v: address already in use", endpoint)
}

if err := os.MkdirAll(filepath.Dir(endpoint), fileModePerms); err != nil {
return nil, err
}
// Remove any existing file at the specified path.
if err := os.Remove(endpoint); !os.IsNotExist(err) {
return nil, err
}
l, err := net.Listen("unix", endpoint)
if err != nil {
return nil, err
}
// Set permissions for the socket file to read and write for the owner only (0o600)
err = os.Chmod(endpoint, socketPerms)
return l, err
}

type Ipc struct {
rpc *Server
events NewRequestListener
log utils.SimpleLogger

connWg conc.WaitGroup // connWg is a WaitGroup for tracking active connections.

connParams IpcConnParams
listener net.Listener

// everything below is protected
mu sync.Mutex
conns map[net.Conn]struct{} // conns is a map that holds active connections.
}

// NewIpc creates a new IPC handler instance with the provided RPC server, endpoint and logger.
func NewIpc(rpc *Server, endpoint string, log utils.SimpleLogger) (*Ipc, error) {
listener, err := createListener(endpoint)
if err != nil {
return nil, err
}
return &Ipc{
rpc: rpc,
log: log,
connParams: DefaultIpcConnParams(),
conns: make(map[net.Conn]struct{}),
events: &SelectiveListener{},
listener: listener,
}, nil
}

// WithConnParams sanity checks and applies the provided params.
func (i *Ipc) WithConnParams(p *IpcConnParams) *Ipc {
i.connParams = *p
return i
}

// WithListener registers a NewRequestListener
func (i *Ipc) WithListener(listener NewRequestListener) *Ipc {
i.events = listener
return i
}

// Run launches the IPC handler and handles any potential errors.
// It is the caller's responsibility to cancel the provided context when they wish to
// gracefully shut down the IPC handler.
func (i *Ipc) Run(ctx context.Context) error {
var wg conc.WaitGroup
defer wg.Wait()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

wg.Go(func() {
defer cancel()
for {
conn, err := i.listener.Accept()
if netutil.IsTemporaryError(err) {
i.log.Warnw("Failed to accept connection", "err", err)
continue
} else if err != nil {
if !isSocketError(err) {
i.log.Warnw("Accept connection", "err", err)
}
return
}
i.connWg.Go(func() {
i.serveConn(ctx, newIpcConn(conn, i.connParams))
})
}
})

<-ctx.Done()
return i.stop()
}

// cleanupConnNoLock frees resources
func (i *Ipc) cleanupConnNoLock(conn net.Conn) {
_, ok := i.conns[conn]
delete(i.conns, conn)
if !ok {
return
}
if err := conn.Close(); err != nil {
i.log.Warnw("Error occurred while closing connection", "err", err)
}
}

// serveConn handles incoming connection.
func (i *Ipc) serveConn(ctx context.Context, conn net.Conn) {
defer func() {
i.mu.Lock()
defer i.mu.Unlock()
i.cleanupConnNoLock(conn)
}()
i.mu.Lock()
i.conns[conn] = struct{}{}
i.mu.Unlock()
if ctx.Err() != nil {
return
}

var err error
for err == nil {
i.events.OnNewRequest("any")
err = i.rpc.HandleReadWriter(ctx, conn)
}

if isSocketError(err) || isPipeError(err) {
return
}

i.log.Warnw("Closing ipc connection due to internal error", "err", err)
}

// stop gracefully shuts down the IPC handler.
func (i *Ipc) stop() error {
i.mu.Lock()
defer func() {
i.mu.Unlock()
i.connWg.Wait()
}()
err := i.listener.Close()
for conn := range i.conns {
i.cleanupConnNoLock(conn)
}
return err
}

type IpcConnParams struct {
// Maximum time to write a message.
WriteDuration time.Duration
}

type ipcConn struct {
IpcConnParams
net.Conn
}

func DefaultIpcConnParams() IpcConnParams {
return IpcConnParams{
WriteDuration: 5 * time.Second,
}
}

func newIpcConn(conn net.Conn, params IpcConnParams) *ipcConn {
return &ipcConn{
IpcConnParams: params,
Conn: conn,
}
}

func (ipc *ipcConn) Write(p []byte) (int, error) {
if err := ipc.Conn.SetWriteDeadline(time.Now().Add(ipc.WriteDuration)); err != nil {
return 0, err
}
return ipc.Conn.Write(p)
}

func isSocketError(err error) bool {
return errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF)
}

func isPipeError(err error) bool {
// broken pipe || conn reset
return errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET)
}
Loading
Loading