Skip to content

Commit c6b9175

Browse files
committed
add topic filter to handle incoming messages for specific topic
1 parent f6f5e59 commit c6b9175

File tree

7 files changed

+16
-22
lines changed

7 files changed

+16
-22
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Make use of the client by importing it in your Go client source code. For exampl
1616

1717
import "github.com/unit-io/unitdb-go"
1818

19-
Samples are available in the cmd directory for reference. To build unitdb server from latest source code use "replace" in go.mod to point to your local module.
19+
Samples are available in the examples directory for reference. To build unitdb server from latest source code use "replace" in go.mod to point to your local module.
2020

2121
```golang
2222
go mod edit -replace github.com/unit-io/unitdb=$GOPATH/src/github.com/unit-io/unitdb

client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ func (c *client) TopicFilter(subscriptionTopic string) (*TopicFilter, error) {
327327
validateTopicParts); err != nil {
328328
return nil, err
329329
}
330-
t := &TopicFilter{subscriptionTopic: topic, updates: make(chan []PubMessage)}
330+
t := &TopicFilter{subscriptionTopic: topic, updates: make(chan []*PubMessage)}
331331
c.notifier.addFilter(t.filter)
332332

333333
return t, nil

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ go 1.16
44

55
require (
66
github.com/golang/protobuf v1.5.2
7-
github.com/unit-io/unitdb v0.1.1
7+
github.com/unit-io/unitdb v0.2.0
88
google.golang.org/grpc v1.39.0
99
)
1010

11-
// replace github.com/unit-io/unitdb => /src/github.com/unit-io/unitdb
11+
// replace github.com/unit-io/unitdb => github.com/unit-io/unitdb

go.sum

+2-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
1414
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
1515
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
1616
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
17-
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
1817
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
1918
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
2019
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -55,9 +54,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
5554
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
5655
github.com/unit-io/bpool v0.0.0-20200906005724-1643bbf59264 h1:31MK/k8NNVPI3UEUCJ8+9aEzlFxJRhawKk2gqxSbJeA=
5756
github.com/unit-io/bpool v0.0.0-20200906005724-1643bbf59264/go.mod h1:jLqAtkF257MDiAc5K8svPVUGjfig2qdIhnWs3OCDwKg=
58-
github.com/unit-io/unitdb v0.1.1 h1:KzTFWpsDfyo2qV1AYpoHmoI2FGMJY74KncvZM8CAgHU=
59-
github.com/unit-io/unitdb v0.1.1/go.mod h1:B8CPnLiFt1OcRxWIj+69kNxMRuSoS7o3muBzgqLZbkk=
60-
github.com/unit-io/unitdb-go v0.0.0-20210407101657-d9db9270f78d/go.mod h1:HU0IfRu0nXM2O9vMCRhDhvBUBc604AMmCDwWEG5C7zk=
57+
github.com/unit-io/unitdb v0.2.0 h1:EWjq1JfAtrLZU83ka+wpS+aam39GN47bc1oxlvKETRI=
58+
github.com/unit-io/unitdb v0.2.0/go.mod h1:E2yJHO6xjTwqRmOwKO0BcbKpTj6oHM5xldD3nL4NP5Q=
6159
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
6260
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
6361
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -126,7 +124,6 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
126124
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
127125
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
128126
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
129-
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
130127
google.golang.org/grpc v1.39.0 h1:Klz8I9kdtkIN6EpHHUOMLCYhTn/2WAe5a0s1hcBkdTI=
131128
google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
132129
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

message.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
type Message interface {
1414
DeliveryMode() uint8
1515
MessageID() uint16
16-
Messages() []PubMessage
16+
Messages() []*PubMessage
1717
Ack()
1818
}
1919

@@ -25,7 +25,7 @@ type PubMessage struct {
2525
type message struct {
2626
deliveryMode uint8
2727
messageID uint16
28-
messages []PubMessage
28+
messages []*PubMessage
2929
once sync.Once
3030
ack func()
3131
}
@@ -38,7 +38,7 @@ func (m *message) MessageID() uint16 {
3838
return m.messageID
3939
}
4040

41-
func (m *message) Messages() []PubMessage {
41+
func (m *message) Messages() []*PubMessage {
4242
return m.messages
4343
}
4444

@@ -47,9 +47,9 @@ func (m *message) Ack() {
4747
}
4848

4949
func messageFromPublish(p *utp.Publish, ack func()) *message {
50-
var messages []PubMessage
50+
var messages []*PubMessage
5151
for _, pubMsg := range p.Messages {
52-
msg := PubMessage{
52+
msg := &PubMessage{
5353
Topic: pubMsg.Topic,
5454
Payload: pubMsg.Payload,
5555
}

notifier.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const (
3030
type filter func(notice *Notice) error
3131

3232
type Notice struct {
33-
messages []PubMessage
33+
messages []*PubMessage
3434
}
3535

3636
// func (n *Notice) Messages() []PubMessage {
@@ -85,7 +85,7 @@ func (n *notifier) hasObservers() bool {
8585
return len(n.filters) > 0
8686
}
8787

88-
func (n *notifier) notify(messages []PubMessage) {
88+
func (n *notifier) notify(messages []*PubMessage) {
8989
if !n.hasObservers() {
9090
return
9191
}

topic.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ type (
5252
}
5353
TopicFilter struct {
5454
subscriptionTopic *topic
55-
updates chan []PubMessage
55+
updates chan []*PubMessage
5656
}
5757
)
5858

59-
func (t *TopicFilter) Updates() <-chan []PubMessage {
59+
func (t *TopicFilter) Updates() <-chan []*PubMessage {
6060
return t.updates
6161
}
6262

@@ -226,8 +226,7 @@ func validateTopicParts(t *topic) error {
226226
}
227227

228228
func (t *TopicFilter) filter(notice *Notice) error {
229-
fmt.Println("filter: message count", len(notice.messages))
230-
messages := make([]PubMessage, 0)
229+
messages := make([]*PubMessage, 0)
231230

232231
for _, pubMsg := range notice.messages {
233232
pubTopic := new(topic)
@@ -246,13 +245,11 @@ func (t *TopicFilter) filter(notice *Notice) error {
246245
}
247246

248247
if t.subscriptionTopic.matches(pubTopic) {
249-
fmt.Println("matches: subTopic, pubTopic ", t.subscriptionTopic.topic, pubTopic.topic)
250248
messages = append(messages, notice.messages...)
251249
}
252250
}
253251

254252
if len(messages) > 0 {
255-
fmt.Println("filtered: message count", len(messages))
256253
t.updates <- messages
257254
}
258255

0 commit comments

Comments
 (0)