Skip to content

[feature] Add NATS async consumer #882

Closed
@matsuev

Description

@matsuev

Many projects use NATS as a distributed message bus between microservices.

NATS async consumer will allow for efficient interaction between services and Centrifugo.

You can try the fully working NATS consumer code here:

https://github.com/matsuev/centrifugo/tree/nats-consumer

It uses NATS server (version >= 2.10.0) and JetStream for guaranteed message delivery.

WorkQueue policy provides distributed processing of messages across multiple instances of Centrifugo (in a cluster architecture).

Automatic reconnection to the NATS server allows you to start the Centrifugo server before the NATS server is available (or restore the connection after a reboot NATS or Centrifugo).

Current limitations:

  • TLS connections are not implemented (feature)
  • Distributed delivery (in Centrifugo cluster) does not guarantee the order in which messages are processed
  • The maximum message size is limited by NATS server settings (see Configuring NATS Server)

Example of usage:

In the Centrifugo server config define NATS consumer

...
"consumers": [
	...
	{
		"name": "my_nats_consumer",
		"type": "nats",
		"nats": {
			"brokers": ["nats://token@hostname:4222","nats://user:password@hostname:4222", ...],
			"subjects": ["commands"],
			"stream_name": "CENTRIFUGO",
			"consumer_group": "group_name",
			"max_poll_records": 25,			// optional, default 100
			"disable_reconnect": false,		// optional, default false
			"heartbeat_interval": "10s",		// optional, default "5s"
			"create_stream_if_not_exist": true	// !!!EXPERIMENTAL!!!, default false
		}
	},
	... 
],
...

Important configuration options:

  • The "subjects" option must be identical for all consumers with the same "consumer_group"
  • The "create_stream_if_not_exist" option is for development purposes only and will be removed in the future. This option enables the creation of a stream in JetStream if it does not exist. I think this is a bad idea and you should initialize all streams in JetStream before connecting to them.

Enable and configure JetStream.

See Configuring JetStream

In application code you can send commands:

package main

import (
	...
	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
	...
)

func main() {
	...
	// Try to connect 
	nc, err := nats.Connect("nats://hostname:4222", nats.UserInfo("user", "password"))
	if err != nil {
		log.Fatalln("NATS connect error:", err)
	}
	defer func() { _ = nc.Drain() }()

	// Try to open JetStream 
	js, err := jetstream.New(nc)
	if err != nil {
		log.Fatalln("NATS create JetStream error:", err)
	}

	// Prepare command message (see Centrifugo API documentation)
	msg := map[string]any{
		"method": "publish",
		"payload": map[string]any{
			"channel": "news",
			"data": map[string]any{
				"message": "test message",
			},
		},
	}

	// Try to send message
	if data, err := json.Marshal(msg); err == nil {
		if _, err := js.Publish(ctx, "commands", data); err != nil {
			log.Println(err)
		}
	} else {
		log.Println(err)
	}
	...
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions