Skip to content

Commit 908ae03

Browse files
committed
chore: add optional ttl to ticket age
1 parent f942aaa commit 908ae03

File tree

4 files changed

+162
-40
lines changed

4 files changed

+162
-40
lines changed

internal/statestore/backfill_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
)
3535

3636
func TestCreateBackfillLastAckTime(t *testing.T) {
37-
cfg, closer := createRedis(t, false, "")
37+
cfg, closer := createRedis(t, false, "", 0)
3838
defer closer()
3939
service := New(cfg)
4040
require.NotNil(t, service)
@@ -56,7 +56,7 @@ func TestCreateBackfillLastAckTime(t *testing.T) {
5656
}
5757

5858
func TestCreateBackfill(t *testing.T) {
59-
cfg, closer := createRedis(t, false, "")
59+
cfg, closer := createRedis(t, false, "", 0)
6060
defer closer()
6161
service := New(cfg)
6262
require.NotNil(t, service)
@@ -118,7 +118,7 @@ func TestCreateBackfill(t *testing.T) {
118118
}
119119

120120
func TestUpdateExistingBackfillNoError(t *testing.T) {
121-
cfg, closer := createRedis(t, false, "")
121+
cfg, closer := createRedis(t, false, "", 0)
122122
defer closer()
123123
service := New(cfg)
124124
require.NotNil(t, service)
@@ -178,7 +178,7 @@ func TestUpdateExistingBackfillNoError(t *testing.T) {
178178
}
179179

180180
func TestUpdateBackfillDoNotExistCanNotUpdate(t *testing.T) {
181-
cfg, closer := createRedis(t, false, "")
181+
cfg, closer := createRedis(t, false, "", 0)
182182
defer closer()
183183
service := New(cfg)
184184
require.NotNil(t, service)
@@ -208,7 +208,7 @@ func TestUpdateBackfillDoNotExistCanNotUpdate(t *testing.T) {
208208
}
209209

210210
func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) {
211-
cfg, closer := createRedis(t, false, "")
211+
cfg, closer := createRedis(t, false, "", 0)
212212
defer closer()
213213
service := New(cfg)
214214
require.NotNil(t, service)
@@ -236,7 +236,7 @@ func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) {
236236
}
237237

238238
func TestUpdateBackfillExpiredContextErrExpected(t *testing.T) {
239-
cfg, closer := createRedis(t, false, "")
239+
cfg, closer := createRedis(t, false, "", 0)
240240
defer closer()
241241
service := New(cfg)
242242
require.NotNil(t, service)
@@ -254,7 +254,7 @@ func TestUpdateBackfillExpiredContextErrExpected(t *testing.T) {
254254
}
255255

256256
func TestGetBackfill(t *testing.T) {
257-
cfg, closer := createRedis(t, false, "")
257+
cfg, closer := createRedis(t, false, "", 0)
258258
defer closer()
259259
service := New(cfg)
260260
require.NotNil(t, service)
@@ -341,7 +341,7 @@ func TestGetBackfill(t *testing.T) {
341341
}
342342

343343
func TestDeleteBackfill(t *testing.T) {
344-
cfg, closer := createRedis(t, false, "")
344+
cfg, closer := createRedis(t, false, "", 0)
345345
defer closer()
346346
service := New(cfg)
347347
require.NotNil(t, service)
@@ -424,7 +424,7 @@ func TestDeleteBackfill(t *testing.T) {
424424
// TestUpdateAcknowledgmentTimestampLifecycle test statestore functions - UpdateAcknowledgmentTimestamp, GetExpiredBackfillIDs
425425
// and deleteExpiredBackfillID
426426
func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) {
427-
cfg, closer := createRedis(t, false, "")
427+
cfg, closer := createRedis(t, false, "", 0)
428428
defer closer()
429429

430430
service := New(cfg)
@@ -480,7 +480,7 @@ func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) {
480480
}
481481

482482
func TestUpdateAcknowledgmentTimestamp(t *testing.T) {
483-
cfg, closer := createRedis(t, false, "")
483+
cfg, closer := createRedis(t, false, "", 0)
484484
defer closer()
485485

486486
startTime := time.Now()
@@ -511,7 +511,7 @@ func TestUpdateAcknowledgmentTimestamp(t *testing.T) {
511511
}
512512

513513
func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T) {
514-
cfg, closer := createRedis(t, false, "")
514+
cfg, closer := createRedis(t, false, "", 0)
515515
defer closer()
516516
service := New(cfg)
517517
require.NotNil(t, service)
@@ -535,7 +535,7 @@ func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T)
535535
}
536536

537537
func TestUpdateAcknowledgmentTimestampConnectionError(t *testing.T) {
538-
cfg, closer := createRedis(t, false, "")
538+
cfg, closer := createRedis(t, false, "", 0)
539539
defer closer()
540540
service := New(cfg)
541541
require.NotNil(t, service)
@@ -560,7 +560,7 @@ func createInvalidRedisConfig() config.View {
560560
// TestGetExpiredBackfillIDs test statestore function GetExpiredBackfillIDs
561561
func TestGetExpiredBackfillIDs(t *testing.T) {
562562
// Prepare expired and normal BackfillIds in a Redis Sorted Set
563-
cfg, closer := createRedis(t, false, "")
563+
cfg, closer := createRedis(t, false, "", 0)
564564
defer closer()
565565

566566
expID := "expired"
@@ -584,7 +584,7 @@ func TestGetExpiredBackfillIDs(t *testing.T) {
584584
}
585585

586586
func TestIndexBackfill(t *testing.T) {
587-
cfg, closer := createRedis(t, false, "")
587+
cfg, closer := createRedis(t, false, "", 0)
588588
defer closer()
589589
service := New(cfg)
590590
require.NotNil(t, service)
@@ -617,7 +617,7 @@ func TestIndexBackfill(t *testing.T) {
617617
}
618618

619619
func TestDeindexBackfill(t *testing.T) {
620-
cfg, closer := createRedis(t, false, "")
620+
cfg, closer := createRedis(t, false, "", 0)
621621
defer closer()
622622
service := New(cfg)
623623
require.NotNil(t, service)
@@ -654,7 +654,7 @@ func TestDeindexBackfill(t *testing.T) {
654654
}
655655

656656
func TestGetIndexedBackfills(t *testing.T) {
657-
cfg, closer := createRedis(t, false, "")
657+
cfg, closer := createRedis(t, false, "", 0)
658658
defer closer()
659659
service := New(cfg)
660660
require.NotNil(t, service)
@@ -705,7 +705,7 @@ func generateBackfills(ctx context.Context, t *testing.T, service Service, amoun
705705

706706
func BenchmarkCleanupBackfills(b *testing.B) {
707707
t := &testing.T{}
708-
cfg, closer := createRedis(t, false, "")
708+
cfg, closer := createRedis(t, false, "", 0)
709709
defer closer()
710710
service := New(cfg)
711711
require.NotNil(t, service)
@@ -743,7 +743,7 @@ func BenchmarkCleanupBackfills(b *testing.B) {
743743
}
744744

745745
func TestCleanupBackfills(t *testing.T) {
746-
cfg, closer := createRedis(t, false, "")
746+
cfg, closer := createRedis(t, false, "", 0)
747747
defer closer()
748748
service := New(cfg)
749749
require.NotNil(t, service)

internal/statestore/redis_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
func TestNewMutex(t *testing.T) {
12-
cfg, closer := createRedis(t, false, "")
12+
cfg, closer := createRedis(t, false, "", 0)
1313
defer closer()
1414
service := New(cfg)
1515
require.NotNil(t, service)

internal/statestore/ticket.go

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,14 @@ func (rb *redisBackend) CreateTicket(ctx context.Context, ticket *pb.Ticket) err
5353
return status.Errorf(codes.Internal, "failed to marshal the ticket proto, id: %s: proto: Marshal called with nil", ticket.GetId())
5454
}
5555

56-
_, err = redisConn.Do("SET", ticket.GetId(), value)
56+
timeout, isSet := getTicketReleaseTimeout(rb.cfg)
57+
if isSet {
58+
ticketTimeout := timeout / time.Millisecond
59+
// set a TTL on the ticket if configured
60+
_, err = redisConn.Do("SET", ticket.GetId(), value, "PX", int64(ticketTimeout), "XX")
61+
} else {
62+
_, err = redisConn.Do("SET", ticket.GetId(), value)
63+
}
5764
if err != nil {
5865
err = errors.Wrapf(err, "failed to set the value for ticket, id: %s", ticket.GetId())
5966
return status.Errorf(codes.Internal, "%v", err)
@@ -94,6 +101,24 @@ func (rb *redisBackend) GetTicket(ctx context.Context, id string) (*pb.Ticket, e
94101
return nil, status.Errorf(codes.Internal, "%v", err)
95102
}
96103

104+
timeout, enabled := getTicketReleaseTimeout(rb.cfg)
105+
if enabled {
106+
ticketTimeout := int64(timeout / time.Millisecond)
107+
// refresh the ticket TTL
108+
err = redisConn.Send("SET", id, value, "PX", ticketTimeout, "XX")
109+
} else {
110+
err = redisConn.Send("SET", id, value)
111+
}
112+
if err != nil {
113+
err = errors.Wrapf(err, "failed to set the value for ticket, id: %s", id)
114+
return nil, status.Errorf(codes.Internal, "%v", err)
115+
}
116+
err = redisConn.Flush()
117+
if err != nil {
118+
err = errors.Wrapf(err, "failed to flush the redis connection for ticket id: %s", id)
119+
return nil, status.Errorf(codes.Internal, "%v", err)
120+
}
121+
97122
return ticket, nil
98123
}
99124

@@ -277,6 +302,17 @@ func (rb *redisBackend) GetTickets(ctx context.Context, ids []string) ([]*pb.Tic
277302
return nil, status.Errorf(codes.Internal, "%v", err)
278303
}
279304

305+
timeout, enabled := getTicketReleaseTimeout(rb.cfg)
306+
ticketTimeout := int64(timeout / time.Millisecond)
307+
308+
if enabled {
309+
err = redisConn.Send("MULTI")
310+
if err != nil {
311+
err = errors.Wrapf(err, "failed to send multi command: %v", ids)
312+
return nil, status.Errorf(codes.Internal, "%v", err)
313+
}
314+
}
315+
280316
r := make([]*pb.Ticket, 0, len(ids))
281317

282318
for i, b := range ticketBytes {
@@ -289,6 +325,23 @@ func (rb *redisBackend) GetTickets(ctx context.Context, ids []string) ([]*pb.Tic
289325
return nil, status.Errorf(codes.Internal, "%v", err)
290326
}
291327
r = append(r, t)
328+
329+
if enabled {
330+
// also update the ticket TTL
331+
err = redisConn.Send("SET", ids[i], b, "PX", ticketTimeout, "XX")
332+
if err != nil {
333+
err = errors.Wrapf(err, "failed to set ticket expiry time %v", ticketTimeout)
334+
return nil, status.Errorf(codes.Internal, "%v", err)
335+
}
336+
}
337+
}
338+
}
339+
340+
if enabled {
341+
err = redisConn.Flush()
342+
if err != nil {
343+
err = errors.Wrapf(err, "failed to flush ticket TTL update %v", ids)
344+
return nil, status.Errorf(codes.Internal, "%v", err)
292345
}
293346
}
294347

@@ -533,3 +586,16 @@ func getAssignedDeleteTimeout(cfg config.View) time.Duration {
533586

534587
return cfg.GetDuration(name)
535588
}
589+
func getTicketReleaseTimeout(cfg config.View) (time.Duration, bool) {
590+
const (
591+
name = "ticketDeleteTimeout"
592+
// Default timeout to delete tickets after assignment.
593+
defaultTicketDeleteTimeout = 10 * time.Minute
594+
)
595+
596+
if !cfg.IsSet(name) {
597+
return defaultTicketDeleteTimeout, false
598+
}
599+
600+
return cfg.GetDuration(name), true
601+
}

0 commit comments

Comments
 (0)