@@ -18,10 +18,11 @@ import (
1818 "context"
1919
2020 "fmt"
21- "github.com/sirupsen/logrus"
2221 "sync"
2322 "time"
2423
24+ "github.com/sirupsen/logrus"
25+
2526 "github.com/cenkalti/backoff"
2627 "github.com/gomodule/redigo/redis"
2728 "github.com/pkg/errors"
@@ -359,7 +360,7 @@ func (rb *redisBackend) GetIndexedIDSet(ctx context.Context) (map[string]struct{
359360}
360361
361362// GetIndexedIDSetWithTTL returns the ids of all tickets currently indexed but within a given TTL.
362- func (rb * redisBackend ) GetIndexedIDSetWithTTL (ctx context.Context ) (map [string ]struct {}, error ) {
363+ func (rb * redisBackend ) GetIndexedIDSetWithTTL (ctx context.Context , limit int ) (map [string ]struct {}, error ) {
363364 redisConn , err := rb .redisPool .GetContext (ctx )
364365 if err != nil {
365366 return nil , status .Errorf (codes .Unavailable , "GetIndexedIDSetWithTTL, failed to connect to redis: %v" , err )
@@ -378,10 +379,16 @@ func (rb *redisBackend) GetIndexedIDSetWithTTL(ctx context.Context) (map[string]
378379 }
379380
380381 curTimeUnix := curTime .UnixNano ()
382+ args := redis.Args {
383+ allTicketsWithTTL , curTimeUnix , "+inf" ,
384+ }
385+ if limit > 0 {
386+ args = args .Add ("LIMIT" , 0 , limit )
387+ }
381388 // fetch only tickets with a score or ttl ahead of or equal to current time
382- idsIndexed , err := redis .Strings (redisConn .Do ("ZRANGEBYSCORE" , allTicketsWithTTL , curTimeUnix , "+inf" ))
389+ idsIndexed , err := redis .Strings (redisConn .Do ("ZRANGEBYSCORE" , args ... ))
383390 if err != nil {
384- return nil , status .Errorf (codes .Internal , "error getting all indexed ticket ids %v" , err )
391+ return nil , status .Errorf (codes .Internal , "error getting indexed ticket ids %v" , err )
385392 }
386393
387394 r := make (map [string ]struct {}, len (idsIndexed ))
@@ -696,7 +703,7 @@ func (rb *redisBackend) newConstantBackoffStrategy() backoff.BackOff {
696703}
697704
698705// GetExpiredTicketIDs gets all ticket IDs which are expired
699- func (rb * redisBackend ) GetExpiredTicketIDs (ctx context.Context ) ([]string , error ) {
706+ func (rb * redisBackend ) GetExpiredTicketIDs (ctx context.Context , limit int ) ([]string , error ) {
700707 redisConn , err := rb .redisPool .GetContext (ctx )
701708 if err != nil {
702709 return nil , status .Errorf (codes .Unavailable , "GetExpiredBackfillIDs, failed to connect to redis: %v" , err )
@@ -708,8 +715,15 @@ func (rb *redisBackend) GetExpiredTicketIDs(ctx context.Context) ([]string, erro
708715 endTimeInt := curTime .Add (- ticketTTL ).UnixNano () // anything before the now - ttl
709716 startTimeInt := 0 // unix epoc start time
710717
718+ args := redis.Args {
719+ allTicketsWithTTL , startTimeInt , endTimeInt ,
720+ }
721+ if limit > 0 {
722+ args = append (args , "LIMIT" , 0 , limit )
723+ }
724+
711725 // Filter out ticket IDs that are fetched but not assigned within TTL time (ms).
712- expiredTicketIds , err := redis .Strings (redisConn .Do ("ZRANGEBYSCORE" , allTicketsWithTTL , startTimeInt , endTimeInt ))
726+ expiredTicketIds , err := redis .Strings (redisConn .Do ("ZRANGEBYSCORE" , args ... ))
713727 if err != nil {
714728 return nil , status .Errorf (codes .Internal , "error getting expired tickets %v" , err )
715729 }
@@ -778,7 +792,7 @@ func (rb *redisBackend) cleanupTicketsWorker(ctx context.Context, ticketIDsCh <-
778792}
779793
780794func (rb * redisBackend ) CleanupTickets (ctx context.Context ) error {
781- expiredTicketIDs , err := rb .GetExpiredTicketIDs (ctx )
795+ expiredTicketIDs , err := rb .GetExpiredTicketIDs (ctx , 0 )
782796 if err != nil {
783797 return err
784798 }
0 commit comments