Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ require (
pgregory.net/rapid v1.2.0
)

require github.com/lightningnetwork/lnd/actor v0.0.1-alpha

require (
dario.cat/mergo v1.0.1 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
Expand Down Expand Up @@ -213,6 +215,9 @@ replace github.com/gogo/protobuf => github.com/gogo/protobuf v1.3.2
// allows us to specify that as an option.
replace google.golang.org/protobuf => github.com/lightninglabs/protobuf-go-hex-display v1.33.0-hex-display

// Use the local actor module for development.
replace github.com/lightningnetwork/lnd/actor => ./actor

// If you change this please also update docs/INSTALL.md and GO_VERSION in
// Makefile (then run `make lint` to see where else it needs to be updated as
// well).
Expand Down
132 changes: 127 additions & 5 deletions protofsm/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/actor"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnutils"
Expand Down Expand Up @@ -42,6 +43,12 @@ type EmittedEvent[Event any] struct {
// ExternalEvent is an optional external event that is to be sent to
// the daemon for dispatch. Usually, this is some form of I/O.
ExternalEvents DaemonEventSet

// Outbox is an optional set of events that are accumulated during event
// processing and returned to the caller for processing into the main
// state machine. This enables nested state machines to emit events that
// bubble up to their parent.
Outbox []Event
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Outbox events should be a new type, similiar to DaemonEvents

}

// StateTransition is a state transition type. It denotes the next state to go
Expand Down Expand Up @@ -124,6 +131,18 @@ type stateQuery[Event any, Env Environment] struct {
CurrentState chan State[Event, Env]
}

// syncEventRequest is used to send an event to the state machine synchronously,
// waiting for the event processing to complete and returning the accumulated
// outbox events.
type syncEventRequest[Event any] struct {
// event is the event to process.
event Event

// promise is used to signal completion and return the accumulated
// outbox events or an error.
promise actor.Promise[[]Event]
}

// StateMachine represents an abstract FSM that is able to process new incoming
// events and drive a state machine to termination. This implementation uses
// type params to abstract over the types of events and environment. Events
Expand All @@ -140,6 +159,10 @@ type StateMachine[Event any, Env Environment] struct {
// FSM.
events chan Event

// syncEvents is the channel that will be used to send synchronous event
// requests to the FSM, returning the accumulated outbox events.
syncEvents chan syncEventRequest[Event]

// newStateEvents is an EventDistributor that will be used to notify
// any relevant callers of new state transitions that occur.
newStateEvents *fn.EventDistributor[State[Event, Env]]
Expand Down Expand Up @@ -214,6 +237,7 @@ func NewStateMachine[Event any, Env Environment](
fmt.Sprintf("FSM(%v):", cfg.Env.Name()),
),
events: make(chan Event, 1),
syncEvents: make(chan syncEventRequest[Event], 1),
stateQuery: make(chan stateQuery[Event, Env]),
gm: *fn.NewGoroutineManager(),
newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
Expand Down Expand Up @@ -259,6 +283,63 @@ func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
}
}

// AskEvent sends a new event to the state machine using the Ask pattern
// (request-response), waiting for the event to be fully processed. It
// returns a Future that will be resolved with the accumulated outbox events
// from all state transitions triggered by this event, including nested
// internal events. The Future's Await method will return fn.Result[[]Event]
// containing either the accumulated outbox events or an error if processing
// failed.
func (s *StateMachine[Event, Env]) AskEvent(ctx context.Context,
event Event) actor.Future[[]Event] {

s.log.Debugf("Asking event %T", event)

// Create a promise to signal completion and return results.
promise := actor.NewPromise[[]Event]()

req := syncEventRequest[Event]{
event: event,
promise: promise,
}

// Check for context cancellation or shutdown first to avoid races.
select {
case <-ctx.Done():
promise.Complete(
fn.Errf[[]Event]("context cancelled: %w",
ctx.Err()),
)

return promise.Future()

case <-s.quit:
promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown))

return promise.Future()

default:
}

// Send the request to the state machine. If we can't send it due to
// context cancellation or shutdown, complete the promise with an error.
select {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I don't fully grok the future stuff, but wouldn't we start a goroutine here and return the future early? then complete it once anything from this select here happens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good q. That's another way to write it, but then you need the extra book keeping around the goroutine. One benefit as you note is that things return instantly and you wait for the response later.

I wrote it like this to be mostly single threaded when sending. The main executor itself is already single threaded itself, so it must process messages one by one.

Copy link
Collaborator

@sputn1ck sputn1ck Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the goroutine wouldn't this be essentially a forced .await() on the caller though? With the goroutine the caller could still just use await() to have it blocking until the goroutine finished thoughr right?

// Successfully sent, the promise will be completed by driveMachine.
case s.syncEvents <- req:

case <-ctx.Done():
promise.Complete(
fn.Errf[[]Event]("context cancelled: %w",
ctx.Err()),
)

case <-s.quit:
promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown))
}

return promise.Future()
}

// CanHandle returns true if the target message can be routed to the state
// machine.
func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool {
Expand Down Expand Up @@ -563,13 +644,19 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,

// applyEvents applies a new event to the state machine. This will continue
// until no further events are emitted by the state machine. Along the way,
// we'll also ensure to execute any daemon events that are emitted.
// we'll also ensure to execute any daemon events that are emitted. The
// function returns the final state, any accumulated outbox events, and an
// error if one occurred.
func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
currentState State[Event, Env], newEvent Event) (State[Event, Env],
error) {
[]Event, error) {

eventQueue := fn.NewQueue(newEvent)

// outbox accumulates all outbox events from state transitions during
// the entire event processing chain.
var outbox []Event

// Given the next event to handle, we'll process the event, then add
// any new emitted internal events to our event queue. This continues
// until we reach a terminal state, or we run out of internal events to
Expand Down Expand Up @@ -613,6 +700,10 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
eventQueue.Enqueue(inEvent)
}

// Accumulate any outbox events from this state
// transition.
outbox = append(outbox, events.Outbox...)

return nil
})
if err != nil {
Expand All @@ -636,11 +727,11 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
return nil
})
if err != nil {
return currentState, err
return currentState, nil, err
}
}

return currentState, nil
return currentState, outbox, nil
}

// driveMachine is the main event loop of the state machine. It accepts any new
Expand Down Expand Up @@ -671,7 +762,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
// machine forward until we either run out of internal events,
// or we reach a terminal state.
case newEvent := <-s.events:
newState, err := s.applyEvents(
newState, _, err := s.applyEvents(
ctx, currentState, newEvent,
)
if err != nil {
Expand All @@ -688,6 +779,37 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {

currentState = newState

// We have a synchronous event request that expects the
// accumulated outbox events to be returned via the promise.
case syncReq := <-s.syncEvents:
newState, outbox, err := s.applyEvents(
ctx, currentState, syncReq.event,
)
if err != nil {
s.cfg.ErrorReporter.ReportError(err)

s.log.ErrorS(ctx, "Unable to apply sync event",
err)

// Complete the promise with the error.
//
// TODO(roasbeef): distinguish between error
// types? state vs processing
syncReq.promise.Complete(fn.Err[[]Event](err))

// An error occurred, so we'll tear down the
// entire state machine as we can't proceed.
go s.Stop()

return
}

currentState = newState

// Complete the promise with the accumulated outbox
// events.
syncReq.promise.Complete(fn.Ok(outbox))

// An outside caller is querying our state, so we'll return the
// latest state.
case stateQuery := <-s.stateQuery:
Expand Down
Loading
Loading