Skip to content

Commit 4d2a9c3

Browse files
committed
Release 1.0.2
1 parent d62fd6d commit 4d2a9c3

File tree

4 files changed

+76
-17
lines changed

4 files changed

+76
-17
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ or
2424

2525
By default, the bus is using sync events : waits for listeners to complete their jobs before calling the next listener.
2626

27+
Or, even easier, just call `PubAsync` methods.
28+
2729
Usage : `go get github.com/badu/bus`
2830

2931
## What Problem Does It Solve?
@@ -53,9 +55,9 @@ Inside the `test_scenarios` folder, you can find the following scenarios:
5355
SMS) and `audit`. When a user registers, we want to send welcoming messages via SMS and email, but we also want to
5456
audit that registration for reporting purposes.
5557

56-
The [UserRegisteredEvent](https://github.com/badu/bus/blob/main/test_scenarios/fire-and-forget/events/main.go#L10)
58+
The [UserRegisteredEvent](https://github.com/badu/bus/blob/main/test_scenarios/fire-and-forget/events/main.go#L3)
5759
will carry the freshly registered username (which is also the email) and phone to the email and sms services. The
58-
event is [triggered](https://github.com/badu/bus/blob/main/test_scenarios/fire-and-forget/users/service.go#L21) by
60+
event is [triggered](https://github.com/badu/bus/blob/main/test_scenarios/fire-and-forget/users/service.go#L23) by
5961
the user service, which performs the creation of the user account. We're using the `fire and forget` technique here,
6062
because the operation of registration should not depend on the fact that we've been able to
6163
send an welcoming email or a sms, or the audit system malfunctions.

bench_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,10 @@ type Uint32SyncEvent struct {
1111
u uint32
1212
}
1313

14-
func (u Uint32SyncEvent) EventID() string {
15-
return "Uint32SyncEvent"
16-
}
17-
1814
type Uint32AsyncEvent struct {
1915
u uint32
2016
}
2117

22-
func (u Uint32AsyncEvent) EventID() string {
23-
return "Uint32AsyncEvent"
24-
}
25-
2618
func BenchmarkBroadcast_0008Sync(b *testing.B) {
2719
topic := bus.NewTopic[Uint32SyncEvent]()
2820
c := uint32(0)

main.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ type iEventName interface {
1313
EventID() string //
1414
}
1515

16+
// if developers implement this interface, we're spinning a goroutine if the event says it is async
17+
type iAsync interface {
18+
Async() bool
19+
}
20+
1621
// Listener is being returned when you subscribe to a topic, so you can unsubscribe or access the parent topic
1722
type Listener[T any] struct {
1823
parent *Topic[T] // so we can call unsubscribe from parent
@@ -90,17 +95,32 @@ func (s *Listener[T]) Topic() *Topic[T] {
9095
// Pub allows you to publish an event in that topic
9196
func (b *Topic[T]) Pub(event T) {
9297
b.rwMu.RLock()
93-
for topic := range b.subs {
94-
b.subs[topic].callback(event)
98+
99+
isAsync := false
100+
switch m := any(event).(type) {
101+
case iAsync:
102+
isAsync = m.Async()
95103
}
104+
105+
for sub := range b.subs {
106+
if isAsync {
107+
go b.subs[sub].callback(event)
108+
continue
109+
}
110+
111+
b.subs[sub].callback(event)
112+
}
113+
96114
b.rwMu.RUnlock()
97115
}
98116

99117
func (b *Topic[T]) PubAsync(event T) {
100118
b.rwMu.RLock()
101-
for topic := range b.subs {
102-
go b.subs[topic].callback(event)
119+
120+
for sub := range b.subs {
121+
go b.subs[sub].callback(event)
103122
}
123+
104124
b.rwMu.RUnlock()
105125
}
106126

@@ -121,7 +141,7 @@ func (o *Bus[T]) Unsub() {
121141
func SubUnsub[T any](callback func(event T) bool) *Bus[T] {
122142
var (
123143
event T
124-
key string
144+
key string
125145
)
126146

127147
switch m := any(event).(type) {
@@ -157,7 +177,7 @@ func SubUnsub[T any](callback func(event T) bool) *Bus[T] {
157177
func Sub[T any](callback func(event T)) *Bus[T] {
158178
var (
159179
event T
160-
key string
180+
key string
161181
)
162182

163183
switch m := any(event).(type) {
@@ -206,6 +226,7 @@ func Pub[T any](event T) {
206226
// PubAsync publishes an event which will be dispatched to all listeners
207227
func PubAsync[T any](event T) {
208228
var key string
229+
209230
switch m := any(event).(type) {
210231
case iEventName:
211232
key = m.EventID()

main_test.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package bus_test
22

33
import (
4+
"sync"
45
"sync/atomic"
56
"testing"
67

@@ -24,7 +25,7 @@ func TestSubTopicWhilePub(t *testing.T) {
2425
go func() {
2526
<-start
2627
topic.PubAsync(&Uint32AsyncEvent{u: 1})
27-
close(finishPubWait)
28+
defer close(finishPubWait)
2829
}()
2930

3031
newSubCalled := false
@@ -42,6 +43,7 @@ func TestSubTopicWhilePub(t *testing.T) {
4243
<-finishPubWait // wait for pub to finish
4344

4445
<-finishSubWait // wait for sub to finish
46+
4547
if newSubCalled {
4648
t.Fatal("new subscriber should not be called")
4749
}
@@ -78,3 +80,45 @@ func TestReusePayloadPointerAsync(t *testing.T) {
7880

7981
t.Logf("altered payload %d for %d listeners", payload.u, c)
8082
}
83+
84+
func TestAsyncBus(t *testing.T) {
85+
c := uint32(0)
86+
87+
var wg sync.WaitGroup
88+
wg.Add(4096)
89+
bus.Sub(
90+
func(event Uint32AsyncEvent) {
91+
atomic.AddUint32(&c, 1)
92+
wg.Done()
93+
},
94+
)
95+
96+
go func() {
97+
for i := 0; i < 1024; i++ {
98+
bus.PubAsync(Uint32AsyncEvent{})
99+
}
100+
}()
101+
go func() {
102+
for i := 0; i < 1024; i++ {
103+
bus.PubAsync(Uint32AsyncEvent{})
104+
}
105+
}()
106+
go func() {
107+
for i := 0; i < 1024; i++ {
108+
bus.PubAsync(Uint32AsyncEvent{})
109+
}
110+
}()
111+
go func() {
112+
for i := 0; i < 1024; i++ {
113+
bus.PubAsync(Uint32AsyncEvent{})
114+
}
115+
}()
116+
117+
wg.Wait()
118+
119+
if c != 4096 {
120+
t.Fatalf("error : counter should be 4096 but is %d", c)
121+
}
122+
123+
t.Logf("%d", c)
124+
}

0 commit comments

Comments
 (0)