Skip to content

Commit e87634d

Browse files
author
Tim Middleton
committed
initial subscriber events
1 parent 56693fc commit e87634d

File tree

2 files changed

+111
-1
lines changed

2 files changed

+111
-1
lines changed

coherence/topics.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ type Publisher[V any] interface {
8787
// Close closes a publisher and releases all resources associated with it in the client
8888
// and on the server.
8989
Close(ctx context.Context) error
90+
91+
// AddLifecycleListener adds a [PublisherLifecycleListener] to this topic.
92+
AddLifecycleListener(listener PublisherLifecycleListener[V]) error
93+
94+
// RemoveLifecycleListener removes a [PublisherLifecycleListener] from this topic.
95+
RemoveLifecycleListener(listener PublisherLifecycleListener[V]) error
9096
}
9197

9298
// Subscriber subscribes directly to a [NamedTopic], or to a subscriber group of a [NamedTopic].
@@ -167,7 +173,7 @@ type topicPublisher[V any] struct {
167173
channelCount int32
168174
options *publisher.Options
169175
valueSerializer Serializer[V]
170-
mutex *sync.RWMutex
176+
mutex sync.RWMutex
171177
lifecycleListenersV1 []*PublisherLifecycleListener[V]
172178
}
173179

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates.
3+
* Licensed under the Universal Permissive License v 1.0 as shown at
4+
* https://oss.oracle.com/licenses/upl.
5+
*/
6+
7+
package topics
8+
9+
import (
10+
"context"
11+
"github.com/onsi/gomega"
12+
"github.com/oracle/coherence-go-client/v2/coherence"
13+
"github.com/oracle/coherence-go-client/v2/test/utils"
14+
"sync/atomic"
15+
"testing"
16+
"time"
17+
)
18+
19+
func TestPublisherEventsDestroyOnServer(t *testing.T) {
20+
var (
21+
g = gomega.NewWithT(t)
22+
err error
23+
)
24+
25+
const topicName = "publisher-events"
26+
27+
t.Setenv("COHERENCE_LOG_LEVEL", "ALL")
28+
29+
session1, topic1 := getSessionAndTopic[string](g, topicName)
30+
defer session1.Close()
31+
32+
pub, err := topic1.CreatePublisher(context.Background())
33+
g.Expect(err).Should(gomega.Not(gomega.HaveOccurred()))
34+
35+
utils.Sleep(5)
36+
37+
listener := NewCountingPublisherListener[string](topicName)
38+
39+
g.Expect(pub.AddLifecycleListener(listener.listener)).ShouldNot(gomega.HaveOccurred())
40+
41+
_, err = utils.IssueGetRequest(utils.GetTestContext().RestURL + "/destroyTopic/" + topicName)
42+
g.Expect(err).Should(gomega.Not(gomega.HaveOccurred()))
43+
44+
utils.Sleep(5)
45+
46+
g.Eventually(func() int32 { return listener.destroyCount }).
47+
WithTimeout(10 * time.Second).Should(gomega.Equal(int32(1)))
48+
49+
g.Expect(topic1.Close(context.Background())).Should(gomega.Equal(coherence.ErrTopicDestroyedOrReleased))
50+
}
51+
52+
func TestPublisherEventsDestroyByClient(t *testing.T) {
53+
var (
54+
g = gomega.NewWithT(t)
55+
ctx = context.Background()
56+
)
57+
58+
const topicName = "publisher-events-client-destroy"
59+
60+
t.Setenv("COHERENCE_LOG_LEVEL", "ALL")
61+
62+
session1, topic1 := getSessionAndTopic[string](g, topicName)
63+
defer session1.Close()
64+
65+
pub, err := topic1.CreatePublisher(context.Background())
66+
g.Expect(err).Should(gomega.Not(gomega.HaveOccurred()))
67+
68+
utils.Sleep(5)
69+
70+
listener := NewCountingPublisherListener[string](topicName)
71+
72+
g.Expect(pub.AddLifecycleListener(listener.listener)).ShouldNot(gomega.HaveOccurred())
73+
74+
g.Expect(pub.Close(ctx)).ShouldNot(gomega.HaveOccurred())
75+
utils.Sleep(1)
76+
77+
g.Eventually(func() int32 { return listener.releaseCount }).
78+
WithTimeout(10 * time.Second).Should(gomega.Equal(int32(1)))
79+
}
80+
81+
func NewCountingPublisherListener[V any](name string) *CountingPublisherListener[V] {
82+
countingListener := CountingPublisherListener[V]{
83+
name: name,
84+
listener: coherence.NewPublisherLifecycleListener[V](),
85+
}
86+
87+
countingListener.listener.OnDestroyed(func(_ coherence.PublisherLifecycleEvent[V]) {
88+
atomic.AddInt32(&countingListener.destroyCount, 1)
89+
}).OnReleased(func(_ coherence.PublisherLifecycleEvent[V]) {
90+
atomic.AddInt32(&countingListener.releaseCount, 1)
91+
}).OnAny(func(_ coherence.PublisherLifecycleEvent[V]) {
92+
atomic.AddInt32(&countingListener.allCount, 1)
93+
})
94+
95+
return &countingListener
96+
}
97+
98+
type CountingPublisherListener[V any] struct {
99+
listener coherence.PublisherLifecycleListener[V]
100+
name string
101+
releaseCount int32
102+
destroyCount int32
103+
allCount int32
104+
}

0 commit comments

Comments
 (0)