Skip to content

Commit

Permalink
added subscriptionID as prefix for setup multiple consumers for same …
Browse files Browse the repository at this point in the history
…topic
  • Loading branch information
adrianriobo committed Jun 6, 2022
1 parent 14daf00 commit 645df09
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
4 changes: 2 additions & 2 deletions pkg/manager/flows/inputs/umb/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
"github.com/adrianriobo/qe-eventmanager/pkg/configuration/flows"
"github.com/adrianriobo/qe-eventmanager/pkg/manager/flows/actions"
"github.com/adrianriobo/qe-eventmanager/pkg/manager/flows/inputs/ack"
inputACK "github.com/adrianriobo/qe-eventmanager/pkg/manager/flows/inputs/ack"
"github.com/adrianriobo/qe-eventmanager/pkg/services/messaging/umb"
"github.com/adrianriobo/qe-eventmanager/pkg/services/messaging/umb/api"
"github.com/adrianriobo/qe-eventmanager/pkg/util/json"
"github.com/adrianriobo/qe-eventmanager/pkg/util/logging"
)

func Add(flowName string, input flows.UMBInput, ack inputACK.ACK, action actions.Runnable) error {
func Add(flowName string, input flows.UMBInput, ack ack.ACK, action actions.Runnable) error {
if err := umb.Subscribe(
flowName,
input.Topic,
[]api.MessageHandler{new(flowName, input.Filters, ack, action)}); err != nil {
return err
Expand Down
15 changes: 8 additions & 7 deletions pkg/services/messaging/umb/umb.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func SendBytes(destination string, message []byte) error {
return _umb.client.Send(destination, message)
}

func Subscribe(topic string, handlers []api.MessageHandler) error {
func Subscribe(subscriptionID, topic string, handlers []api.MessageHandler) error {
_umb.subscribe.Lock()
defer _umb.subscribe.Unlock()
logging.Infof("Adding a subscription to %s", topic)
internalSubscription, err := _umb.client.Subscribe(umbTopic(topic), handlers)
logging.Infof("Adding a subscription %s on topic %s", subscriptionID, topic)
internalSubscription, err := _umb.client.Subscribe(umbTopic(subscriptionID, topic), handlers)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,10 +137,11 @@ func handle(msg []byte, handler api.MessageHandler) {
// Umb uses identified consumer queues acting as (virtual) topics
// for subscriptions, the full queue name is based on the pattern:
// "Consumer.$SERVICE_ACCOUNT_NAME.$SUBSCRIPTION_ID.VirtualTopic.>"
func umbTopic(topic string) string {
subscriptionId := strings.Split(topic, ".")
return fmt.Sprintf("Consumer.%s.%s.%s",
func umbTopic(subscriptionID, topic string) string {
topicCrumbs := strings.Split(topic, ".")
return fmt.Sprintf("Consumer.%s.%s-%s.%s",
_umb.consumerID,
subscriptionId[len(subscriptionId)-1],
subscriptionID,
topicCrumbs[len(topicCrumbs)-1],
topic)
}

0 comments on commit 645df09

Please sign in to comment.