Skip to content
75 changes: 75 additions & 0 deletions protofsm/state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/actor"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnutils"
Expand Down Expand Up @@ -130,6 +131,18 @@ type stateQuery[Event any, Env Environment] struct {
CurrentState chan State[Event, Env]
}

// syncEventRequest is used to send an event to the state machine synchronously,
// waiting for the event processing to complete and returning the accumulated
// outbox events.
type syncEventRequest[Event any] struct {
// event is the event to process.
event Event

// promise is used to signal completion and return the accumulated
// outbox events or an error.
promise actor.Promise[[]Event]
}

// StateMachine represents an abstract FSM that is able to process new incoming
// events and drive a state machine to termination. This implementation uses
// type params to abstract over the types of events and environment. Events
Expand All @@ -146,6 +159,10 @@ type StateMachine[Event any, Env Environment] struct {
// FSM.
events chan Event

// syncEvents is the channel that will be used to send synchronous event
// requests to the FSM, returning the accumulated outbox events.
syncEvents chan syncEventRequest[Event]

// newStateEvents is an EventDistributor that will be used to notify
// any relevant callers of new state transitions that occur.
newStateEvents *fn.EventDistributor[State[Event, Env]]
Expand Down Expand Up @@ -220,6 +237,7 @@ func NewStateMachine[Event any, Env Environment](
fmt.Sprintf("FSM(%v):", cfg.Env.Name()),
),
events: make(chan Event, 1),
syncEvents: make(chan syncEventRequest[Event], 1),
stateQuery: make(chan stateQuery[Event, Env]),
gm: *fn.NewGoroutineManager(),
newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
Expand Down Expand Up @@ -265,6 +283,63 @@ func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
}
}

// AskEvent sends a new event to the state machine using the Ask pattern
// (request-response), waiting for the event to be fully processed. It
// returns a Future that will be resolved with the accumulated outbox events
// from all state transitions triggered by this event, including nested
// internal events. The Future's Await method will return fn.Result[[]Event]
// containing either the accumulated outbox events or an error if processing
// failed.
func (s *StateMachine[Event, Env]) AskEvent(ctx context.Context,
event Event) actor.Future[[]Event] {

s.log.Debugf("Asking event %T", event)

// Create a promise to signal completion and return results.
promise := actor.NewPromise[[]Event]()

req := syncEventRequest[Event]{
event: event,
promise: promise,
}

// Check for context cancellation or shutdown first to avoid races.
select {
case <-ctx.Done():
promise.Complete(
fn.Errf[[]Event]("context cancelled: %w",
ctx.Err()),
)

return promise.Future()

case <-s.quit:
promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown))

return promise.Future()

default:
}

// Send the request to the state machine. If we can't send it due to
// context cancellation or shutdown, complete the promise with an error.
select {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I don't fully grok the future stuff, but wouldn't we start a goroutine here and return the future early? then complete it once anything from this select here happens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good q. That's another way to write it, but then you need the extra book keeping around the goroutine. One benefit as you note is that things return instantly and you wait for the response later.

I wrote it like this to be mostly single threaded when sending. The main executor itself is already single threaded itself, so it must process messages one by one.

Copy link
Collaborator

@sputn1ck sputn1ck Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the goroutine wouldn't this be essentially a forced .await() on the caller though? With the goroutine the caller could still just use await() to have it blocking until the goroutine finished thoughr right?

// Successfully sent, the promise will be completed by driveMachine.
case s.syncEvents <- req:

case <-ctx.Done():
promise.Complete(
fn.Errf[[]Event]("context cancelled: %w",
ctx.Err()),
)

case <-s.quit:
promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown))
}

return promise.Future()
}

// CanHandle returns true if the target message can be routed to the state
// machine.
func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool {
Expand Down