Skip to content

Commit 9d4acc4

Browse files
committed
Fixes and notes
1 parent 2962741 commit 9d4acc4

File tree

11 files changed

+131
-44
lines changed

11 files changed

+131
-44
lines changed

README.md

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ The event producer will simply do:
2626

2727
`bus.Pub(InterestingEvent{})`
2828

29+
Usage : `go get github.com/badu/bus`
30+
2931
## What Problem Does It Solve?
3032

3133
Decoupling of components: publishers and subscribers can operate independently of each other, with no direct knowledge
@@ -47,18 +49,77 @@ Each component can then be developed and tested independently, making the overal
4749

4850
Inside the `test_scenarios` folder, you can find the following scenarios:
4951

50-
1. Fire and Forget
52+
1. Fire and Forget.
53+
54+
Imagine a system / application where we have three services : `users`, `notifications` (email and
55+
SMS) and `audit`. When a user registers, we want to send welcoming messages via SMS and email, but we also want to
56+
audit that registration for reporting purposes.
57+
58+
The [UserRegisteredEvent](https://github.com/badu/bus/blob/main/test_scenarios/fire-and-forget/events/main.go#L10)
59+
will carry the freshly registered username (which is also the email) and phone to the email and sms services. The
60+
event is [triggered](https://github.com/badu/bus/blob/main/test_scenarios/fire-and-forget/users/service.go#L21) by
61+
the user service, which performs the creation of the user account. We're using the `fire and forget` technique here,
62+
because the operation of registration should not depend on the fact that we've been able to
63+
send an welcoming email or a sms, or the audit system malfunctions.
64+
65+
Simulating audit service malfunction is easy. Instead of using `Sub`, we're using `SubUnsub` to register the listener
66+
and return [`true`](https://github.com/badu/bus/blob/main/test_scenarios/fire-and-forget/audit/service.go#L36) to
67+
unsubscribe on events of that kind.
68+
69+
2. Factory Request Reply
70+
71+
Imagine a system / application where we need to communicate with different microservices, but in this case we don't
72+
want to bring them online, we're just wanting to stub the response as those services were alive.
73+
74+
This technique is useful when we need to test some complicated flows of business logic and facilitates the
75+
transformation of an integration test into a classic unit test.
76+
77+
The `cart` service requires two replies from two other microservices `inventory` and `prices`. In the past, I've been
78+
using a closure function to provide the service with both real GRPC clients or with mocks and stubs. The service
79+
signature gets complicated and large as we one service would depend on a lot of GRPC clients to aggregate data.
80+
81+
As you can see
82+
the [test here](https://github.com/badu/bus/blob/main/test_scenarios/factory-request-reply/main_test.go) it's much
83+
more elegant and the service constructor is much slimmer.
84+
85+
Events are one sync and one async, just to check it works in both scenarios.
86+
87+
Important to note that because a `WaitGroup` is being used in our event struct, we're forced to pass the events by
88+
using a pointer, instead of passing them by value.
89+
90+
3. Request Reply with Callback
91+
92+
In this example, we wanted to achieve two things. First is that the `service` and the `repository` are decoupled by
93+
events. More than that, we wanted that the events are generic on their own.
94+
95+
The orders service will dispatch a generic request event, one for placing an order, which will carry an `Order` (
96+
model) struct with that request and another `OrderStatus` (model) struct using the same generic event.
97+
98+
We are using a channel inside the generic `RequestEvent` to signal the `reply` to the publisher, which in this case
99+
is a callback function that returns the result as if the publisher would have called directly the listener.
100+
101+
I am sure that you will find this technique interesting and having a large number of applications.
102+
103+
An important note is about not forgetting to implement the `EventID() string` correctly, as incorrect naming triggers
104+
panic (expecting one type of event, but receiving another). To exemplify this, just alter the return of
105+
this [function](https://github.com/badu/bus/blob/main/test_scenarios/request-reply-callback/events/main.go#L24).
51106

52-
2. Request Reply with Callback
107+
4. Request Reply with Cancellation
53108

54-
3. Request Reply with Cancellation
109+
Last but, not least, this is an example about providing `context.Context` along the publisher subscriber chain.
110+
The `repository` is simulating a long database call, longer than the context's cancellation, so the service gets the
111+
deadline exceeded error.
55112

56-
4. Factory Request Reply
113+
Note that this final example is not using pointer to the event's struct, but it contains two properties which have
114+
pointers, so the `service` can access the altered `reply`.
57115

58116
## Recommendations
59117

60-
1. when using `sync.WaitGroup` inside your event struct, always use method receivers and pass the event as pointer,
61-
otherwise you will be passing a lock by value (which is `sync.Locker`).
118+
1. always place your events inside a separate `events` package, avoiding circular dependencies.
119+
2. in general, in `request-reply` scenarios, the events should be passed as pointers (even if it's somewhat slower),
120+
because changing properties that represents the `reply` would not be reflected. Also, when using `sync.WaitGroup`
121+
inside your event struct, always use method receivers and pass the event as pointer, otherwise you will be passing a
122+
lock by value (which is `sync.Locker`).
62123
2. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the
63124
dispatcher. You should still have at least one property of that event that is a pointer (see events
64125
in `request reply with cancellation` for example). Same technique can be applied when you need `sync.Waitgroup` to be

main_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ func TestSubTopicWhilePub(t *testing.T) {
4848
}
4949

5050
func TestReusePayloadPointerAsync(t *testing.T) {
51-
// if you reuse the payload, you can alter it's content
52-
// which is not recommended (see the "random" number of goroutines that catches that change)
51+
// if you reuse the payload, you can alter it's content, of course
5352

5453
topic := bus.NewTopic[*Uint32AsyncEvent]()
5554
c := uint32(0)

test_scenarios/fire-and-forget/audit/service.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,23 @@ type ServiceImpl struct {
1414

1515
func NewAuditService(sb *strings.Builder) ServiceImpl {
1616
result := ServiceImpl{sb: sb}
17+
bus.Sub(result.OnUserRegisteredEvent)
18+
bus.SubUnsub(result.OnSMSRequestEvent)
1719
bus.SubUnsub(result.OnSMSSentEvent)
1820
return result
1921
}
2022

23+
// OnUserRegisteredEvent is classic event handler
24+
func (s *ServiceImpl) OnUserRegisteredEvent(event events.UserRegisteredEvent) {
25+
// we can save audit data here
26+
}
27+
28+
// OnSMSRequestEvent is a pub-unsub type, we have to return 'false' to continue listening for this kind of events
29+
func (s *ServiceImpl) OnSMSRequestEvent(event events.SMSRequestEvent) bool {
30+
return false
31+
}
32+
33+
// OnSMSSentEvent is a pub-unsub type where we give up on listening after receiving first message
2134
func (s *ServiceImpl) OnSMSSentEvent(event events.SMSSentEvent) bool {
2235
s.sb.WriteString(fmt.Sprintf("audit event : an sms was %s sent to %s with message %s\n", event.Status, event.Request.Number, event.Request.Message))
2336
return true // after first event, audit will give up listening for events

test_scenarios/request-reply-callback/events/main.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package events
22

3+
import (
4+
"fmt"
5+
)
6+
37
const RequestEventType = "RequestEvent"
48

59
type RequestEvent[T any] struct {
@@ -8,8 +12,16 @@ type RequestEvent[T any] struct {
812
Done chan struct{}
913
}
1014

15+
func NewRequestEvent[T any](payload T) *RequestEvent[T] {
16+
return &RequestEvent[T]{
17+
Payload: payload,
18+
Done: make(chan struct{}),
19+
}
20+
}
21+
1122
func (i *RequestEvent[T]) EventID() string {
12-
return RequestEventType
23+
var t T
24+
return fmt.Sprintf("%s%T", RequestEventType, t)
1325
}
1426

1527
func (i *RequestEvent[T]) Async() bool {

test_scenarios/request-reply-callback/main_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
func TestRequestReplyCallback(t *testing.T) {
1212
var sb strings.Builder
13+
orders.NewRepository(&sb)
1314
svc := orders.NewService(&sb)
1415

1516
ctx := context.Background()
@@ -53,4 +54,6 @@ func TestRequestReplyCallback(t *testing.T) {
5354
}
5455
t.Logf("order #2 status : %s", stat2.Status)
5556

57+
t.Logf("%s", sb.String())
58+
5659
}

test_scenarios/request-reply-callback/orders/repository.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,10 @@ type RepositoryImpl struct {
2323
calls int
2424
}
2525

26-
func NewRepository(
27-
sb *strings.Builder,
28-
cBus *bus.Topic[*events.RequestEvent[Order]],
29-
sBus *bus.Topic[*events.RequestEvent[OrderStatus]],
30-
) RepositoryImpl {
26+
func NewRepository(sb *strings.Builder) RepositoryImpl {
3127
result := RepositoryImpl{sb: sb}
32-
cBus.Sub(result.onCreateOrder)
33-
sBus.Sub(result.onGetOrderStatus)
28+
bus.Sub(result.onCreateOrder)
29+
bus.Sub(result.onGetOrderStatus)
3430
return result
3531
}
3632

test_scenarios/request-reply-callback/orders/service.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,34 @@ package orders
22

33
import (
44
"context"
5+
"fmt"
56
"strings"
67

78
"github.com/badu/bus"
89
"github.com/badu/bus/test_scenarios/request-reply-callback/events"
910
)
1011

1112
type ServiceImpl struct {
12-
sb *strings.Builder
13-
CBus *bus.Topic[*events.RequestEvent[Order]]
14-
SBus *bus.Topic[*events.RequestEvent[OrderStatus]]
13+
sb *strings.Builder
1514
}
1615

1716
func NewService(sb *strings.Builder) ServiceImpl {
1817
result := ServiceImpl{sb: sb}
19-
result.CBus = bus.NewTopic[*events.RequestEvent[Order]]()
20-
result.SBus = bus.NewTopic[*events.RequestEvent[OrderStatus]]()
21-
NewRepository(sb, result.CBus, result.SBus)
2218
return result
2319
}
2420

2521
func (s *ServiceImpl) RegisterOrder(ctx context.Context, productIDs []int) (*Order, error) {
26-
event := events.RequestEvent[Order]{Payload: Order{ProductIDs: productIDs}, Done: make(chan struct{})}
27-
s.CBus.Pub(&event)
28-
<-event.Done
29-
return event.Callback()
22+
event := events.NewRequestEvent[Order](Order{ProductIDs: productIDs})
23+
s.sb.WriteString(fmt.Sprintf("dispatching event typed %s\n", event.EventID()))
24+
bus.Pub(event)
25+
<-event.Done // wait for "reply"
26+
return event.Callback() // return the callback, which is containing the actual result
3027
}
3128

3229
func (s *ServiceImpl) GetOrderStatus(ctx context.Context, orderID int) (*OrderStatus, error) {
33-
event := events.RequestEvent[OrderStatus]{Payload: OrderStatus{OrderID: orderID}, Done: make(chan struct{})}
34-
s.SBus.Pub(&event)
35-
<-event.Done
36-
return event.Callback()
30+
event := events.NewRequestEvent[OrderStatus](OrderStatus{OrderID: orderID})
31+
s.sb.WriteString(fmt.Sprintf("dispatching event typed %s\n", event.EventID()))
32+
bus.Pub(event)
33+
<-event.Done // wait for "reply"
34+
return event.Callback() // return the callback, which is containing the actual result
3735
}

test_scenarios/request-reply-with-cancellation/events/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@ func (s *EventState) Close() {
2222
close(s.Done)
2323
}
2424

25+
type NewOrder struct {
26+
ID int
27+
}
28+
2529
type CreateOrderEvent struct {
26-
OrderID int
30+
NewOrder *NewOrder
2731
ProductIDs []int
2832
State *EventState
2933
}

test_scenarios/request-reply-with-cancellation/main_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@ import (
55
"testing"
66
"time"
77

8-
"github.com/badu/bus"
9-
"github.com/badu/bus/test_scenarios/request-reply-with-cancellation/events"
108
"github.com/badu/bus/test_scenarios/request-reply-with-cancellation/orders"
119
)
1210

1311
func TestRequestReplyWithCancellation(t *testing.T) {
1412
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
15-
ebus := bus.NewTopic[events.CreateOrderEvent]()
16-
svc := orders.NewService(ebus)
17-
orders.NewRepository(ebus)
13+
svc := orders.NewService()
14+
orders.NewRepository()
1815

1916
response, err := svc.CreateOrder(ctx, []int{1, 2, 3})
2017
switch err {

test_scenarios/request-reply-with-cancellation/orders/repository.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type RepositoryImpl struct {
1616
calls int
1717
}
1818

19-
func NewRepository(bus *bus.Topic[events.CreateOrderEvent]) RepositoryImpl {
19+
func NewRepository() RepositoryImpl {
2020
result := RepositoryImpl{}
2121
bus.Sub(result.OnCreateOrder)
2222
return result
@@ -30,7 +30,7 @@ func (r *RepositoryImpl) OnCreateOrder(event events.CreateOrderEvent) {
3030
for {
3131
select {
3232
case <-time.After(4 * time.Second):
33-
event.OrderID = r.calls
33+
event.NewOrder = &events.NewOrder{ID: r.calls}
3434
event.State.Close()
3535
return
3636
case <-event.State.Ctx.Done():

0 commit comments

Comments
 (0)