diff --git a/.gitignore b/.gitignore index a725465..518fae9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,10 @@ -vendor/ \ No newline at end of file +vendor/ + +# GoLand/IntelliJ +.idea + +# VSCode workspace settings +.vscode + +# macOS +*.DS_Store \ No newline at end of file diff --git a/fetcher.go b/fetcher.go index 3e9a720..f8931e3 100644 --- a/fetcher.go +++ b/fetcher.go @@ -91,7 +91,7 @@ func (f *simpleFetcher) tryFetchMessage() { } } else { f.sendMessage(message) - } + } } func (f *simpleFetcher) sendMessage(message string) { diff --git a/middleware_retry.go b/middleware_retry.go index e131c83..988d2e0 100644 --- a/middleware_retry.go +++ b/middleware_retry.go @@ -1,6 +1,8 @@ package workers import ( + "crypto/sha1" + "encoding/hex" "fmt" "math" "math/rand" @@ -56,8 +58,15 @@ func RetryMiddleware(queue string, mgr *Manager, next JobFunc) JobFunc { }() - err = next(message) - if err != nil { + switch next(message) { + case nil: + val, err := message.Get("unique").Bool() + if err == nil && val { + rc := mgr.opts.client + sum := sha1.Sum([]byte(message.Args().ToJson())) + rc.Del(hex.EncodeToString(sum[:])).Result() + } + default: err = retryProcessError(queue, mgr, message, err) } diff --git a/producer.go b/producer.go index 848b01e..9788073 100644 --- a/producer.go +++ b/producer.go @@ -2,6 +2,8 @@ package workers import ( "crypto/rand" + "crypto/sha1" + "encoding/hex" "encoding/json" "fmt" "io" @@ -31,6 +33,7 @@ type EnqueueOptions struct { RetryCount int `json:"retry_count,omitempty"` Retry bool `json:"retry,omitempty"` At float64 `json:"at,omitempty"` + Unique bool `json:"unique,omitempty"` } func NewProducer(options Options) (*Producer, error) { @@ -71,7 +74,25 @@ func (p *Producer) EnqueueAt(queue, class string, at time.Time, args interface{} return p.EnqueueWithOptions(queue, class, args, EnqueueOptions{At: timeToSecondsWithNanoPrecision(at)}) } +func (p *Producer) EnqueueUnique(queue, class string, at time.Time, args interface{}) (string, error) { + return p.EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision(), Unique: true}) +} + func (p *Producer) EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) { + rc := p.opts.client + + if opts.Unique { + bytes, err := json.Marshal(args) + if err != nil { + return "", err + } + sum := sha1.Sum(bytes) + if !rc.SetNX(hex.EncodeToString(sum[:]), "unique", 0).Val() { + // already in the list + return "", nil + } + } + now := nowToSecondsWithNanoPrecision() data := EnqueueData{ Queue: queue,