Skip to content

Commit ecf5448

Browse files
committed
Interface implementations not required anymore
1 parent cbaf5ac commit ecf5448

File tree

10 files changed

+85
-109
lines changed

10 files changed

+85
-109
lines changed

README.md

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,19 @@ and the handler is having the following signature:
1010

1111
`func OnMyEventOccurred(event InterestingEvent)`
1212

13-
where `InterestingEvent` has to be a struct which has to implement two methods:
14-
15-
`type InterestingEvent struct{}`
13+
The event producer will simply do:
1614

17-
`func (e InterestingEvent) EventID() string{ return "MyUniqueName" }`
15+
`bus.Pub(InterestingEvent{})`
1816

19-
where the string which represents the name of the event has to be unique across the event system.
17+
Optional, to allow the bus to spin a goroutine for dispatching events, implement the following interface:
2018

2119
`func (e InterestingEvent) Async() bool{ return true }`
2220

23-
where we signal that the event will be passed to the listeners by spinning up a goroutine.
21+
or
2422

25-
The event producer will simply do:
23+
`func (e *InterestingEvent) Async() bool{ return true }`
2624

27-
`bus.Pub(InterestingEvent{})`
25+
By default, the bus is using sync events : waits for listeners to complete their jobs before calling the next listener.
2826

2927
Usage : `go get github.com/badu/bus`
3028

@@ -100,10 +98,6 @@ Inside the `test_scenarios` folder, you can find the following scenarios:
10098

10199
I am sure that you will find this technique interesting and having a large number of applications.
102100

103-
An important note is about not forgetting to implement the `EventID() string` correctly, as incorrect naming triggers
104-
panic (expecting one type of event, but receiving another). To exemplify this, just alter the return of
105-
this [function](https://github.com/badu/bus/blob/main/test_scenarios/request-reply-callback/events/main.go#L24).
106-
107101
4. Request Reply with Cancellation
108102

109103
Last but, not least, this is an example about providing `context.Context` along the publisher subscriber chain.
@@ -120,7 +114,9 @@ Inside the `test_scenarios` folder, you can find the following scenarios:
120114
because changing properties that represents the `reply` would not be reflected. Also, when using `sync.WaitGroup`
121115
inside your event struct, always use method receivers and pass the event as pointer, otherwise you will be passing a
122116
lock by value (which is `sync.Locker`).
123-
2. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the
117+
3. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the
124118
dispatcher. You should still have at least one property of that event that is a pointer (see events
125119
in `request reply with cancellation` for example). Same technique can be applied when you need `sync.Waitgroup` to be
126120
passed around with an event that is being sent by value, not by pointer.
121+
4. you can override the event name (which is by default, built using `fmt.Sprintf("%T", yourEvent)`) you need to
122+
implement `EventID() string` interface.

bench_test.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,10 @@ type Uint32SyncEvent struct {
1111
u uint32
1212
}
1313

14-
func (u Uint32SyncEvent) EventID() string {
15-
return "Uint32SyncEvent"
16-
}
17-
18-
func (u Uint32SyncEvent) Async() bool {
19-
return false
20-
}
21-
2214
type Uint32AsyncEvent struct {
2315
u uint32
2416
}
2517

26-
func (u Uint32AsyncEvent) EventID() string {
27-
return "Uint32AsyncEvent"
28-
}
29-
3018
func (u Uint32AsyncEvent) Async() bool {
3119
return true
3220
}

main.go

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,38 @@
11
package bus
22

33
import (
4+
"fmt"
45
"sync"
56
"sync/atomic"
67
)
78

8-
var mapper sync.Map // holds key (event id, typed string) versus topic values
9+
var mapper sync.Map // holds key (event name - string) versus topic values
910

10-
// internal interface that all the events must implement
11-
type iEvent interface {
11+
// we allow developers to override event names. They should be careful about name collisions
12+
type iEventName interface {
1213
EventID() string //
13-
Async() bool // if returns true, this event will be triggered by spinning a goroutine
14+
}
15+
16+
// if developers implement this interface, we're spinning a goroutine if the event says it is async
17+
type iAsync interface {
18+
Async() bool
1419
}
1520

1621
// Listener is being returned when you subscribe to a topic, so you can unsubscribe or access the parent topic
17-
type Listener[T iEvent] struct {
22+
type Listener[T any] struct {
1823
parent *Topic[T] // so we can call unsubscribe from parent
1924
callback func(event T) // the function that we're going to call
2025
}
2126

2227
// Topic keeps the subscribers of one topic
23-
type Topic[T iEvent] struct {
28+
type Topic[T any] struct {
2429
subs []*Listener[T] // list of listeners
2530
rwMu sync.RWMutex // guards subs
2631
lisnsPool sync.Pool // a pool of listeners
2732
}
2833

2934
// NewTopic creates a new topic for a specie of events
30-
func NewTopic[T iEvent]() *Topic[T] {
35+
func NewTopic[T any]() *Topic[T] {
3136
result := &Topic[T]{}
3237
result.lisnsPool.New = func() any {
3338
return &Listener[T]{
@@ -90,19 +95,26 @@ func (s *Listener[T]) Topic() *Topic[T] {
9095
// Pub allows you to publish an event in that topic
9196
func (b *Topic[T]) Pub(event T) {
9297
b.rwMu.RLock()
93-
for topic := range b.subs {
94-
if event.Async() {
95-
go b.subs[topic].callback(event)
98+
99+
isAsync := false
100+
switch m := any(event).(type) {
101+
case iAsync:
102+
isAsync = m.Async()
103+
}
104+
105+
for sub := range b.subs {
106+
if isAsync {
107+
go b.subs[sub].callback(event)
96108
continue
97109
}
98110

99-
b.subs[topic].callback(event)
111+
b.subs[sub].callback(event)
100112
}
101113
b.rwMu.RUnlock()
102114
}
103115

104116
// Bus is being returned when you subscribe, so you can manually Unsub
105-
type Bus[T iEvent] struct {
117+
type Bus[T any] struct {
106118
listener *Listener[T]
107119
stop atomic.Uint32 // flag for unsubscribing after receiving one event
108120
}
@@ -115,11 +127,20 @@ func (o *Bus[T]) Unsub() {
115127
}
116128

117129
// SubUnsub can be used if you need to unsubscribe immediately after receiving an event, by making your function return true
118-
func SubUnsub[T iEvent](callback func(event T) bool) *Bus[T] {
130+
func SubUnsub[T any](callback func(event T) bool) *Bus[T] {
119131
var event T
120-
topic, ok := mapper.Load(event.EventID())
132+
133+
key := ""
134+
switch m := any(event).(type) {
135+
case iEventName:
136+
key = m.EventID()
137+
default:
138+
key = fmt.Sprintf("%T", event)
139+
}
140+
141+
topic, ok := mapper.Load(key)
121142
if !ok || topic == nil {
122-
topic, _ = mapper.LoadOrStore(event.EventID(), NewTopic[T]())
143+
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
123144
}
124145

125146
var result Bus[T]
@@ -140,11 +161,20 @@ func SubUnsub[T iEvent](callback func(event T) bool) *Bus[T] {
140161
}
141162

142163
// Sub subscribes a callback function to listen for a specie of events
143-
func Sub[T iEvent](callback func(event T)) *Bus[T] {
164+
func Sub[T any](callback func(event T)) *Bus[T] {
144165
var event T
145-
topic, ok := mapper.Load(event.EventID())
166+
167+
key := ""
168+
switch m := any(event).(type) {
169+
case iEventName:
170+
key = m.EventID()
171+
default:
172+
key = fmt.Sprintf("%T", event)
173+
}
174+
175+
topic, ok := mapper.Load(key)
146176
if !ok || topic == nil {
147-
topic, _ = mapper.LoadOrStore(event.EventID(), NewTopic[T]())
177+
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
148178
}
149179

150180
var result Bus[T]
@@ -160,10 +190,19 @@ func Sub[T iEvent](callback func(event T)) *Bus[T] {
160190
}
161191

162192
// Pub publishes an event which will be dispatched to all listeners
163-
func Pub[T iEvent](event T) {
164-
topic, ok := mapper.Load(event.EventID())
193+
func Pub[T any](event T) {
194+
key := ""
195+
switch m := any(event).(type) {
196+
case iEventName:
197+
key = m.EventID()
198+
default:
199+
key = fmt.Sprintf("%T", event)
200+
}
201+
202+
topic, ok := mapper.Load(key)
165203
if !ok || topic == nil { // create new topic, even if there are no listeners (otherwise we will have to panic)
166-
topic, _ = mapper.LoadOrStore(event.EventID(), NewTopic[T]())
204+
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
167205
}
206+
168207
topic.(*Topic[T]).Pub(event)
169208
}

test_scenarios/factory-request-reply/events/main.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,6 @@ import (
77
"github.com/badu/bus/test_scenarios/factory-request-reply/prices"
88
)
99

10-
const (
11-
InventoryGRPCClientRequestEventType = "InventoryGRPCClientRequestEvent"
12-
PricesGRPCClientRequestEventType = "PricesGRPCClientRequestEvent"
13-
)
14-
1510
type InventoryGRPCClientRequestEvent struct {
1611
wg sync.WaitGroup
1712
Conn Closer // should be *grpc.ClientConn, but we're avoiding the import
@@ -24,10 +19,6 @@ func NewInventoryGRPCClientRequestEvent() *InventoryGRPCClientRequestEvent {
2419
return &result
2520
}
2621

27-
func (i *InventoryGRPCClientRequestEvent) EventID() string {
28-
return InventoryGRPCClientRequestEventType
29-
}
30-
3122
func (i *InventoryGRPCClientRequestEvent) Async() bool {
3223
return true // this one is async
3324
}
@@ -52,14 +43,6 @@ func NewPricesGRPCClientRequestEvent() *PricesGRPCClientRequestEvent {
5243
return &result
5344
}
5445

55-
func (p *PricesGRPCClientRequestEvent) EventID() string {
56-
return PricesGRPCClientRequestEventType
57-
}
58-
59-
func (p *PricesGRPCClientRequestEvent) Async() bool {
60-
return false // this one is sync
61-
}
62-
6346
func (p *PricesGRPCClientRequestEvent) WaitReply() {
6447
p.wg.Wait()
6548
}
Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,10 @@
11
package events
22

3-
const (
4-
UserRegisteredEventType string = "UserRegisteredEvent"
5-
SMSRequestEventType string = "SMSRequestEvent"
6-
SMSSentEventType string = "SmsSentEvent"
7-
DummyEventType string = "DummyEvent"
8-
)
9-
103
type UserRegisteredEvent struct {
114
UserName string
125
Phone string
136
}
147

15-
func (e UserRegisteredEvent) EventID() string {
16-
return UserRegisteredEventType
17-
}
18-
198
func (e UserRegisteredEvent) Async() bool {
209
return true
2110
}
@@ -25,10 +14,6 @@ type SMSRequestEvent struct {
2514
Message string
2615
}
2716

28-
func (e SMSRequestEvent) EventID() string {
29-
return SMSRequestEventType
30-
}
31-
3217
func (e SMSRequestEvent) Async() bool {
3318
return true
3419
}
@@ -38,21 +23,14 @@ type SMSSentEvent struct {
3823
Status string
3924
}
4025

41-
func (e SMSSentEvent) EventID() string {
42-
return SMSSentEventType
43-
}
44-
4526
func (e SMSSentEvent) Async() bool {
4627
return true
4728
}
4829

4930
type DummyEvent struct {
50-
}
51-
52-
func (e *DummyEvent) EventID() string {
53-
return DummyEventType
31+
AlteredAsync bool
5432
}
5533

5634
func (e *DummyEvent) Async() bool {
57-
return true
35+
return e.AlteredAsync
5836
}

test_scenarios/fire-and-forget/main_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,22 @@ package fire_and_forget
22

33
import (
44
"context"
5+
"fmt"
56
"strings"
67
"testing"
78
"time"
89

10+
"github.com/badu/bus"
911
"github.com/badu/bus/test_scenarios/fire-and-forget/audit"
12+
"github.com/badu/bus/test_scenarios/fire-and-forget/events"
1013
"github.com/badu/bus/test_scenarios/fire-and-forget/notifications"
1114
"github.com/badu/bus/test_scenarios/fire-and-forget/users"
1215
)
1316

17+
func OnDummyEvent(event *events.DummyEvent) {
18+
fmt.Println("dummy event async ?", event.Async())
19+
}
20+
1421
func TestUserRegistration(t *testing.T) {
1522
var sb strings.Builder
1623

@@ -19,6 +26,8 @@ func TestUserRegistration(t *testing.T) {
1926
notifications.NewEmailService(&sb)
2027
audit.NewAuditService(&sb)
2128

29+
bus.Sub(OnDummyEvent)
30+
2231
userSvc.RegisterUser(context.Background(), "Badu", "+40742222222")
2332

2433
<-time.After(500 * time.Millisecond)

test_scenarios/fire-and-forget/users/service.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
type ServiceImpl struct {
1212
sb *strings.Builder
13+
c int
1314
}
1415

1516
func NewService(sb *strings.Builder) ServiceImpl {
@@ -18,6 +19,7 @@ func NewService(sb *strings.Builder) ServiceImpl {
1819
}
1920

2021
func (s *ServiceImpl) RegisterUser(ctx context.Context, name, phone string) {
22+
s.c++
2123
bus.Pub(events.UserRegisteredEvent{UserName: name, Phone: phone})
22-
bus.Pub(&events.DummyEvent{}) // nobody listens on this one
24+
bus.Pub(&events.DummyEvent{AlteredAsync: s.c%2 == 0}) // nobody listens on this one
2325
}
Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package events
22

3-
import (
4-
"fmt"
5-
)
6-
7-
const RequestEventType = "RequestEvent"
8-
93
type RequestEvent[T any] struct {
104
Payload T
115
Callback func() (*T, error)
@@ -19,11 +13,6 @@ func NewRequestEvent[T any](payload T) *RequestEvent[T] {
1913
}
2014
}
2115

22-
func (i *RequestEvent[T]) EventID() string {
23-
var t T
24-
return fmt.Sprintf("%s%T", RequestEventType, t)
25-
}
26-
2716
func (i *RequestEvent[T]) Async() bool {
2817
return true // this one is async
2918
}

0 commit comments

Comments
 (0)