From 1a60368f826b630e55630bd86987f0a8b889739b Mon Sep 17 00:00:00 2001 From: Neil Date: Fri, 19 Apr 2019 10:48:21 -0400 Subject: [PATCH 1/4] updated .gitignore --- .gitignore | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index a725465..518fae9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,10 @@ -vendor/ \ No newline at end of file +vendor/ + +# GoLand/IntelliJ +.idea + +# VSCode workspace settings +.vscode + +# macOS +*.DS_Store \ No newline at end of file From 4481ada198a199abd13030dd7f0a7e3a45b4b01f Mon Sep 17 00:00:00 2001 From: Neil Date: Fri, 19 Apr 2019 11:09:05 -0400 Subject: [PATCH 2/4] Unique Jobs --- fetcher.go | 7 ++++++- middleware_retry.go | 9 +++++++++ producer.go | 22 +++++++++++++++++++++- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/fetcher.go b/fetcher.go index 72adcc8..774785b 100644 --- a/fetcher.go +++ b/fetcher.go @@ -108,7 +108,12 @@ func (f *simpleFetcher) sendMessage(message string) { } func (f *simpleFetcher) Acknowledge(message *Msg) { - f.client.LRem(f.inprogressQueue(), -1, message.OriginalJson()).Result() + count := int64(-1) + val, err := message.Get("unique").Bool() + if err == nil && val { + count = 0 + } + f.client.LRem(f.inprogressQueue(), count, message.OriginalJson()).Result() } func (f *simpleFetcher) Messages() chan *Msg { diff --git a/middleware_retry.go b/middleware_retry.go index 0e45081..8428da9 100644 --- a/middleware_retry.go +++ b/middleware_retry.go @@ -1,6 +1,8 @@ package workers import ( + "crypto/sha1" + "encoding/hex" "fmt" "math" "math/rand" @@ -61,6 +63,13 @@ func RetryMiddleware(queue string, mgr *Manager, next JobFunc) JobFunc { err = next(message) if err != nil { err = retryProcessError(queue, mgr, message, err) + } else { + val, err := message.Get("unique").Bool() + if err == nil && val { + rc := mgr.opts.client + sum := sha1.Sum([]byte(message.Args().ToJson())) + rc.Del(hex.EncodeToString(sum[:])).Result() + } } return diff --git a/producer.go b/producer.go index 3cb7bbf..06d8580 100644 --- a/producer.go +++ b/producer.go @@ -2,6 +2,8 @@ package workers import ( "crypto/rand" + "crypto/sha1" + "encoding/hex" "encoding/json" "fmt" "io" @@ -31,6 +33,7 @@ type EnqueueOptions struct { RetryCount int `json:"retry_count,omitempty"` Retry bool `json:"retry,omitempty"` At float64 `json:"at,omitempty"` + Unique bool `json:"unique,omitempty"` } func NewProducer(options Options) (*Producer, error) { @@ -69,7 +72,25 @@ func (p *Producer) EnqueueAt(queue, class string, at time.Time, args interface{} return p.EnqueueWithOptions(queue, class, args, EnqueueOptions{At: timeToSecondsWithNanoPrecision(at)}) } +func (p *Producer) EnqueueUnique(queue, class string, at time.Time, args interface{}) (string, error) { + return p.EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision(), Unique: true}) +} + func (p *Producer) EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) { + rc := p.opts.client + + if opts.Unique { + bytes, err := json.Marshal(args) + if err != nil { + return "", err + } + sum := sha1.Sum(bytes) + if !rc.SetNX(hex.EncodeToString(sum[:]), "unique", 0).Val() { + // already in the list + return "", nil + } + } + now := nowToSecondsWithNanoPrecision() data := EnqueueData{ Queue: queue, @@ -90,7 +111,6 @@ func (p *Producer) EnqueueWithOptions(queue, class string, args interface{}, opt return data.Jid, err } - rc := p.opts.client _, err = rc.SAdd(p.opts.Namespace+"queues", queue).Result() if err != nil { From f3a5593309c035250480cd89a2471dfa6d488f67 Mon Sep 17 00:00:00 2001 From: Neil Date: Fri, 19 Apr 2019 11:50:09 -0400 Subject: [PATCH 3/4] prefer switch over if/else --- fetcher.go | 19 +++++++++---------- middleware_retry.go | 8 ++++---- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/fetcher.go b/fetcher.go index 774785b..cc681db 100644 --- a/fetcher.go +++ b/fetcher.go @@ -82,17 +82,16 @@ func (f *simpleFetcher) Fetch() { func (f *simpleFetcher) tryFetchMessage() { message, err := f.client.BRPopLPush(f.queue, f.inprogressQueue(), 1*time.Second).Result() - - if err != nil { - // If redis returns null, the queue is empty. - // Just ignore empty queue errors; print all other errors. - if err != redis.Nil { - Logger.Println("ERR: ", f.queue, err) - } - - time.Sleep(1 * time.Second) - } else { + switch err { + case nil: f.sendMessage(message) + case redis.Nil: + // If redis returns null, the queue is empty. + // Just ignore empty queue errors. + time.Sleep(1*time.Second) + default: + // Print all other errors. + Logger.Println("ERR: ", f.queue, err) } } diff --git a/middleware_retry.go b/middleware_retry.go index 8428da9..7d375f2 100644 --- a/middleware_retry.go +++ b/middleware_retry.go @@ -60,16 +60,16 @@ func RetryMiddleware(queue string, mgr *Manager, next JobFunc) JobFunc { }() - err = next(message) - if err != nil { - err = retryProcessError(queue, mgr, message, err) - } else { + switch next(message) { + case nil: val, err := message.Get("unique").Bool() if err == nil && val { rc := mgr.opts.client sum := sha1.Sum([]byte(message.Args().ToJson())) rc.Del(hex.EncodeToString(sum[:])).Result() } + default: + err = retryProcessError(queue, mgr, message, err) } return From c21a8dc91c35f123521d442e4a22f1d50836adfd Mon Sep 17 00:00:00 2001 From: Neil Date: Sat, 27 Apr 2019 10:12:54 -0400 Subject: [PATCH 4/4] preserve if/else behavior with switch --- fetcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/fetcher.go b/fetcher.go index cc681db..8695a8c 100644 --- a/fetcher.go +++ b/fetcher.go @@ -92,6 +92,7 @@ func (f *simpleFetcher) tryFetchMessage() { default: // Print all other errors. Logger.Println("ERR: ", f.queue, err) + time.Sleep(1*time.Second) } }