Skip to content

redisqueue provides a producer and consumer of a queue that uses Redis streams

License

Notifications You must be signed in to change notification settings

go-admin-team/redisqueue

This branch is 3 commits ahead of wenjianzhang/redisqueue:master.

Folders and files

NameName
Last commit message
Last commit date
Jul 13, 2019
Jul 13, 2019
Jul 13, 2019
Jul 13, 2019
May 25, 2020
May 27, 2020
May 25, 2020
Oct 15, 2020
Jul 13, 2019
May 27, 2020
Oct 15, 2020
Nov 2, 2023
Nov 2, 2023
May 27, 2020
Nov 2, 2023
Jul 13, 2019
Nov 2, 2023
Nov 2, 2023
Nov 2, 2023
Nov 19, 2022
Jul 13, 2019
Jul 13, 2019
Jul 13, 2019

Repository files navigation

redisqueue

Version GoDoc Build Status Coverage Status Go Report Card License

redisqueue provides a producer and consumer of a queue that uses Redis streams.

Features

  • A Producer struct to make enqueuing messages easy.
  • A Consumer struct to make processing messages concurrenly.
  • Claiming and acknowledging messages if there's no error, so that if a consumer dies while processing, the message it was working on isn't lost. This guarantees at least once delivery.
  • A "visibility timeout" so that if a message isn't processed in a designated time frame, it will be be processed by another consumer.
  • A max length on the stream so that it doesn't store the messages indefinitely and run out of memory.
  • Graceful handling of Unix signals (SIGINT and SIGTERM) to let in-flight messages complete.
  • A channel that will surface any errors so you can handle them centrally.
  • Graceful handling of panics to avoid crashing the whole process.
  • A concurrency setting to control how many goroutines are spawned to process messages.
  • A batch size setting to limit the total messages in flight.
  • Support for multiple streams.

Installation

redisqueue requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing redisqueue:

go mod init github.com/my/repo
go get github.com/robinjoseph08/redisqueue/v2

Import:

import "github.com/robinjoseph08/redisqueue/v2"

Example

Here's an example of a producer that inserts 1000 messages into a queue:

package main

import (
	"fmt"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
		StreamMaxLength:      10000,
		ApproximateMaxLength: true,
	})
	if err != nil {
		panic(err)
	}

	for i := 0; i < 1000; i++ {
		err := p.Enqueue(&redisqueue.Message{
			Stream: "redisqueue:test",
			Values: map[string]interface{}{
				"index": i,
			},
		})
		if err != nil {
			panic(err)
		}

		if i%100 == 0 {
			fmt.Printf("enqueued %d\n", i)
		}
	}
}

And here's an example of a consumer that reads the messages off of that queue:

package main

import (
	"fmt"
	"time"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
		VisibilityTimeout: 60 * time.Second,
		BlockingTimeout:   5 * time.Second,
		ReclaimInterval:   1 * time.Second,
		BufferSize:        100,
		Concurrency:       10,
	})
	if err != nil {
		panic(err)
	}

	c.Register("redisqueue:test", process)

	go func() {
		for err := range c.Errors {
			// handle errors accordingly
			fmt.Printf("err: %+v\n", err)
		}
	}()

	fmt.Println("starting")
	c.Run()
	fmt.Println("stopped")
}

func process(msg *redisqueue.Message) error {
	fmt.Printf("processing message: %v\n", msg.Values["index"])
	return nil
}

About

redisqueue provides a producer and consumer of a queue that uses Redis streams

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 94.6%
  • Makefile 4.2%
  • Shell 1.2%