Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 120 additions & 2 deletions cmd/sshpiperd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"os"
"path"
"path/filepath"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
"github.com/tg123/sshpiper/cmd/sshpiperd/internal/plugin"
"github.com/urfave/cli/v2"
"golang.org/x/crypto/ssh"
"tailscale.com/util/singleflight"
)

type daemon struct {
Expand All @@ -29,8 +31,16 @@ type daemon struct {
usernameAsRecorddir bool
filterHostkeysReqeust bool
replyPing bool

// Plugin initialization fields.
pluginConfigs [][]string
quit chan error
pluginsInitialized atomic.Bool
pluginIniSingleFlight singleflight.Group[unused, unused]
}

type unused struct{}

func generateSshKey(keyfile string) error {
_, privateKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
Expand Down Expand Up @@ -222,12 +232,120 @@ func (d *daemon) install(plugins ...*plugin.GrpcPlugin) error {
return m.InstallPiperConfig(d.config)
}

func (d *daemon) setPluginConfigs(configs [][]string, quit chan error) {
d.pluginConfigs = configs
d.quit = quit
}

func (d *daemon) initializePlugins() error {
var plugins []*plugin.GrpcPlugin
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit worry about partial plugin failure and might create too many zombies

do not retry on inited plugin?


for _, args := range d.pluginConfigs {
var p *plugin.GrpcPlugin

switch args[0] {
case "grpc":
log.Info("starting net grpc plugin: ")

grpcplugin, err := createNetGrpcPlugin(args)
if err != nil {
return err
}

p = grpcplugin

default:
cmdplugin, err := createCmdPlugin(args)
if err != nil {
return err
}

go func() {
d.quit <- <-cmdplugin.Quit
}()

p = &cmdplugin.GrpcPlugin
}

go func() {
if err := p.RecvLogs(log.StandardLogger().Out); err != nil {
log.Errorf("plugin %v recv logs error: %v", p.Name, err)
}
}()

plugins = append(plugins, p)
}

if err := d.install(plugins...); err != nil {
return err
}
return nil
}

// tryInitializePlugins attempts a single-flighted plugin initialization.
func (d *daemon) tryInitializePlugins() error {
if d.pluginsInitialized.Load() {
return nil // previously initialized successfully
}

_, err, _ := d.pluginIniSingleFlight.Do(unused{}, func() (unused, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may i know if a simple lock works?
seems introducing tailscale is too heavy

err := d.initializePlugins()
if err == nil {
d.pluginsInitialized.Store(true)
}
return unused{}, err
})

if err != nil {
return fmt.Errorf("plugin initialization failed: %w", err)
}
return nil
}

// lazyPluginListener wraps a net.Listener to initialize plugins on first connection
type lazyPluginListener struct {
net.Listener
daemon *daemon
}

func (l *lazyPluginListener) Accept() (net.Conn, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may i know the reason using a listener to active
why not put this to
for {
accept

}

conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
if l.daemon.pluginsInitialized.Load() {
return conn, nil
}

if err := l.daemon.tryInitializePlugins(); err != nil {
log.Errorf("%s", err)
conn.Close()
return nil, err
}

return conn, nil
}

func (d *daemon) run() error {
defer d.lis.Close()
log.Infof("sshpiperd is listening on: %v", d.lis.Addr().String())
tcpAddr, ok := d.lis.Addr().(*net.TCPAddr)
port := 0
if ok {
port = tcpAddr.Port
}
log.WithFields(log.Fields{
"port": port,
"addr": d.lis.Addr().String(),
}).Info("sshpiperd is listening")

// Wrap the listener with lazy plugin initialization
lazyListener := &lazyPluginListener{
Listener: d.lis,
daemon: d,
}

for {
conn, err := d.lis.Accept()
conn, err := lazyListener.Accept()
if err != nil {
log.Debugf("failed to accept connection: %v", err)
continue
Expand Down
46 changes: 11 additions & 35 deletions cmd/sshpiperd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func main() {
}
}

var plugins []*plugin.GrpcPlugin
var pluginConfigs [][]string

args := ctx.Args().Slice()

Expand Down Expand Up @@ -312,43 +312,19 @@ func main() {
continue
}

var p *plugin.GrpcPlugin

switch args[0] {
case "grpc":
log.Info("starting net grpc plugin: ")

grpcplugin, err := createNetGrpcPlugin(args)
if err != nil {
return err
}

p = grpcplugin

default:
cmdplugin, err := createCmdPlugin(args)
if err != nil {
return err
}

go func() {
quit <- <-cmdplugin.Quit
}()

p = &cmdplugin.GrpcPlugin
}

go func() {
if err := p.RecvLogs(log.StandardLogger().Out); err != nil {
log.Errorf("plugin %v recv logs error: %v", p.Name, err)
}
}()
pluginConfigs = append(pluginConfigs, args)
}

plugins = append(plugins, p)
if len(pluginConfigs) == 0 {
return fmt.Errorf("no plugins configured")
}

if err := d.install(plugins...); err != nil {
return err
d.setPluginConfigs(pluginConfigs, quit)

// Best effort plug-in initialization.
// If this fails, we will retry on incoming connection(s).
if err := d.tryInitializePlugins(); err != nil {
log.Warnf("startup %s", err)
}

d.recorddir = ctx.String("screen-recording-dir")
Expand Down
21 changes: 10 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/tg123/sshpiper

go 1.24.0

toolchain go1.24.3
go 1.24.4

replace golang.org/x/crypto => ./crypto

Expand All @@ -28,6 +26,7 @@ require (
k8s.io/apimachinery v0.34.0
k8s.io/client-go v0.34.0
k8s.io/code-generator v0.34.0
tailscale.com v1.86.5
)

require (
Expand All @@ -45,13 +44,13 @@ require (
github.com/google/gnostic-models v0.7.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/atomicwriter v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.37.0 // indirect
Expand All @@ -68,15 +67,15 @@ require (
)

require (
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.12.2 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/jsonreference v0.20.4 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-cmp v0.7.0 // indirect
Expand All @@ -89,7 +88,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pquerna/otp v1.5.0
github.com/russross/blackfriday/v2 v2.1.0 // indirect
Expand All @@ -101,7 +100,7 @@ require (
golang.org/x/sys v0.35.0 // indirect
golang.org/x/term v0.34.0
golang.org/x/text v0.28.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/time v0.11.0 // indirect
golang.org/x/tools v0.35.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gotest.tools/v3 v3.3.0 // indirect
Expand Down
Loading
Loading