-
Notifications
You must be signed in to change notification settings - Fork 2.2k
protofsm: extend EmittedEvents with new Outbox field #10346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: actor
Are you sure you want to change the base?
Changes from 3 commits
eede789
f2c93bd
98643c0
2aa70d0
1387ac1
eb118f8
11d0375
6d762b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ import ( | |
| "github.com/btcsuite/btcd/chaincfg/chainhash" | ||
| "github.com/btcsuite/btcd/wire" | ||
| "github.com/btcsuite/btclog/v2" | ||
| "github.com/lightningnetwork/lnd/actor" | ||
| "github.com/lightningnetwork/lnd/chainntnfs" | ||
| "github.com/lightningnetwork/lnd/fn/v2" | ||
| "github.com/lightningnetwork/lnd/lnutils" | ||
|
|
@@ -42,6 +43,12 @@ type EmittedEvent[Event any] struct { | |
| // ExternalEvent is an optional external event that is to be sent to | ||
| // the daemon for dispatch. Usually, this is some form of I/O. | ||
| ExternalEvents DaemonEventSet | ||
|
|
||
| // Outbox is an optional set of events that are accumulated during event | ||
| // processing and returned to the caller for processing into the main | ||
| // state machine. This enables nested state machines to emit events that | ||
| // bubble up to their parent. | ||
| Outbox []Event | ||
| } | ||
|
|
||
| // StateTransition is a state transition type. It denotes the next state to go | ||
|
|
@@ -124,6 +131,18 @@ type stateQuery[Event any, Env Environment] struct { | |
| CurrentState chan State[Event, Env] | ||
| } | ||
|
|
||
| // syncEventRequest is used to send an event to the state machine synchronously, | ||
| // waiting for the event processing to complete and returning the accumulated | ||
| // outbox events. | ||
| type syncEventRequest[Event any] struct { | ||
| // event is the event to process. | ||
| event Event | ||
|
|
||
| // promise is used to signal completion and return the accumulated | ||
| // outbox events or an error. | ||
| promise actor.Promise[[]Event] | ||
| } | ||
|
|
||
| // StateMachine represents an abstract FSM that is able to process new incoming | ||
| // events and drive a state machine to termination. This implementation uses | ||
| // type params to abstract over the types of events and environment. Events | ||
|
|
@@ -140,6 +159,10 @@ type StateMachine[Event any, Env Environment] struct { | |
| // FSM. | ||
| events chan Event | ||
|
|
||
| // syncEvents is the channel that will be used to send synchronous event | ||
| // requests to the FSM, returning the accumulated outbox events. | ||
| syncEvents chan syncEventRequest[Event] | ||
|
|
||
| // newStateEvents is an EventDistributor that will be used to notify | ||
| // any relevant callers of new state transitions that occur. | ||
| newStateEvents *fn.EventDistributor[State[Event, Env]] | ||
|
|
@@ -214,6 +237,7 @@ func NewStateMachine[Event any, Env Environment]( | |
| fmt.Sprintf("FSM(%v):", cfg.Env.Name()), | ||
| ), | ||
| events: make(chan Event, 1), | ||
| syncEvents: make(chan syncEventRequest[Event], 1), | ||
| stateQuery: make(chan stateQuery[Event, Env]), | ||
| gm: *fn.NewGoroutineManager(), | ||
| newStateEvents: fn.NewEventDistributor[State[Event, Env]](), | ||
|
|
@@ -259,6 +283,63 @@ func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) { | |
| } | ||
| } | ||
|
|
||
| // AskEvent sends a new event to the state machine using the Ask pattern | ||
| // (request-response), waiting for the event to be fully processed. It | ||
| // returns a Future that will be resolved with the accumulated outbox events | ||
| // from all state transitions triggered by this event, including nested | ||
| // internal events. The Future's Await method will return fn.Result[[]Event] | ||
| // containing either the accumulated outbox events or an error if processing | ||
| // failed. | ||
| func (s *StateMachine[Event, Env]) AskEvent(ctx context.Context, | ||
| event Event) actor.Future[[]Event] { | ||
|
|
||
| s.log.Debugf("Asking event %T", event) | ||
|
|
||
| // Create a promise to signal completion and return results. | ||
| promise := actor.NewPromise[[]Event]() | ||
|
|
||
| req := syncEventRequest[Event]{ | ||
| event: event, | ||
| promise: promise, | ||
| } | ||
|
|
||
| // Check for context cancellation or shutdown first to avoid races. | ||
| select { | ||
| case <-ctx.Done(): | ||
| promise.Complete( | ||
| fn.Errf[[]Event]("context cancelled: %w", | ||
| ctx.Err()), | ||
| ) | ||
|
|
||
| return promise.Future() | ||
|
|
||
| case <-s.quit: | ||
| promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown)) | ||
|
|
||
| return promise.Future() | ||
|
|
||
| default: | ||
| } | ||
|
|
||
| // Send the request to the state machine. If we can't send it due to | ||
| // context cancellation or shutdown, complete the promise with an error. | ||
| select { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe I don't fully grok the future stuff, but wouldn't we start a goroutine here and return the future early? then complete it once anything from this select here happens?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good q. That's another way to write it, but then you need the extra book keeping around the goroutine. One benefit as you note is that things return instantly and you wait for the response later. I wrote it like this to be mostly single threaded when sending. The main executor itself is already single threaded itself, so it must process messages one by one.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. without the goroutine wouldn't this be essentially a forced .await() on the caller though? With the goroutine the caller could still just use await() to have it blocking until the goroutine finished thoughr right? |
||
| // Successfully sent, the promise will be completed by driveMachine. | ||
| case s.syncEvents <- req: | ||
|
|
||
| case <-ctx.Done(): | ||
| promise.Complete( | ||
| fn.Errf[[]Event]("context cancelled: %w", | ||
| ctx.Err()), | ||
| ) | ||
|
|
||
| case <-s.quit: | ||
| promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown)) | ||
| } | ||
Roasbeef marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return promise.Future() | ||
| } | ||
|
|
||
| // CanHandle returns true if the target message can be routed to the state | ||
| // machine. | ||
| func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Outbox events should be a new type, similiar to DaemonEvents