Skip to content

Commit 3772566

Browse files
committed
chore: move validation
1 parent 908ae03 commit 3772566

File tree

6 files changed

+106
-73
lines changed

6 files changed

+106
-73
lines changed

internal/statestore/backfill_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
)
3535

3636
func TestCreateBackfillLastAckTime(t *testing.T) {
37-
cfg, closer := createRedis(t, false, "", 0)
37+
cfg, closer, _ := createRedis(t, false, "", 0)
3838
defer closer()
3939
service := New(cfg)
4040
require.NotNil(t, service)
@@ -56,7 +56,7 @@ func TestCreateBackfillLastAckTime(t *testing.T) {
5656
}
5757

5858
func TestCreateBackfill(t *testing.T) {
59-
cfg, closer := createRedis(t, false, "", 0)
59+
cfg, closer, _ := createRedis(t, false, "", 0)
6060
defer closer()
6161
service := New(cfg)
6262
require.NotNil(t, service)
@@ -118,7 +118,7 @@ func TestCreateBackfill(t *testing.T) {
118118
}
119119

120120
func TestUpdateExistingBackfillNoError(t *testing.T) {
121-
cfg, closer := createRedis(t, false, "", 0)
121+
cfg, closer, _ := createRedis(t, false, "", 0)
122122
defer closer()
123123
service := New(cfg)
124124
require.NotNil(t, service)
@@ -178,7 +178,7 @@ func TestUpdateExistingBackfillNoError(t *testing.T) {
178178
}
179179

180180
func TestUpdateBackfillDoNotExistCanNotUpdate(t *testing.T) {
181-
cfg, closer := createRedis(t, false, "", 0)
181+
cfg, closer, _ := createRedis(t, false, "", 0)
182182
defer closer()
183183
service := New(cfg)
184184
require.NotNil(t, service)
@@ -208,7 +208,7 @@ func TestUpdateBackfillDoNotExistCanNotUpdate(t *testing.T) {
208208
}
209209

210210
func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) {
211-
cfg, closer := createRedis(t, false, "", 0)
211+
cfg, closer, _ := createRedis(t, false, "", 0)
212212
defer closer()
213213
service := New(cfg)
214214
require.NotNil(t, service)
@@ -236,7 +236,7 @@ func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) {
236236
}
237237

238238
func TestUpdateBackfillExpiredContextErrExpected(t *testing.T) {
239-
cfg, closer := createRedis(t, false, "", 0)
239+
cfg, closer, _ := createRedis(t, false, "", 0)
240240
defer closer()
241241
service := New(cfg)
242242
require.NotNil(t, service)
@@ -254,7 +254,7 @@ func TestUpdateBackfillExpiredContextErrExpected(t *testing.T) {
254254
}
255255

256256
func TestGetBackfill(t *testing.T) {
257-
cfg, closer := createRedis(t, false, "", 0)
257+
cfg, closer, _ := createRedis(t, false, "", 0)
258258
defer closer()
259259
service := New(cfg)
260260
require.NotNil(t, service)
@@ -341,7 +341,7 @@ func TestGetBackfill(t *testing.T) {
341341
}
342342

343343
func TestDeleteBackfill(t *testing.T) {
344-
cfg, closer := createRedis(t, false, "", 0)
344+
cfg, closer, _ := createRedis(t, false, "", 0)
345345
defer closer()
346346
service := New(cfg)
347347
require.NotNil(t, service)
@@ -424,7 +424,7 @@ func TestDeleteBackfill(t *testing.T) {
424424
// TestUpdateAcknowledgmentTimestampLifecycle test statestore functions - UpdateAcknowledgmentTimestamp, GetExpiredBackfillIDs
425425
// and deleteExpiredBackfillID
426426
func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) {
427-
cfg, closer := createRedis(t, false, "", 0)
427+
cfg, closer, _ := createRedis(t, false, "", 0)
428428
defer closer()
429429

430430
service := New(cfg)
@@ -480,7 +480,7 @@ func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) {
480480
}
481481

482482
func TestUpdateAcknowledgmentTimestamp(t *testing.T) {
483-
cfg, closer := createRedis(t, false, "", 0)
483+
cfg, closer, _ := createRedis(t, false, "", 0)
484484
defer closer()
485485

486486
startTime := time.Now()
@@ -511,7 +511,7 @@ func TestUpdateAcknowledgmentTimestamp(t *testing.T) {
511511
}
512512

513513
func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T) {
514-
cfg, closer := createRedis(t, false, "", 0)
514+
cfg, closer, _ := createRedis(t, false, "", 0)
515515
defer closer()
516516
service := New(cfg)
517517
require.NotNil(t, service)
@@ -535,7 +535,7 @@ func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T)
535535
}
536536

537537
func TestUpdateAcknowledgmentTimestampConnectionError(t *testing.T) {
538-
cfg, closer := createRedis(t, false, "", 0)
538+
cfg, closer, _ := createRedis(t, false, "", 0)
539539
defer closer()
540540
service := New(cfg)
541541
require.NotNil(t, service)
@@ -560,7 +560,7 @@ func createInvalidRedisConfig() config.View {
560560
// TestGetExpiredBackfillIDs test statestore function GetExpiredBackfillIDs
561561
func TestGetExpiredBackfillIDs(t *testing.T) {
562562
// Prepare expired and normal BackfillIds in a Redis Sorted Set
563-
cfg, closer := createRedis(t, false, "", 0)
563+
cfg, closer, _ := createRedis(t, false, "", 0)
564564
defer closer()
565565

566566
expID := "expired"
@@ -584,7 +584,7 @@ func TestGetExpiredBackfillIDs(t *testing.T) {
584584
}
585585

586586
func TestIndexBackfill(t *testing.T) {
587-
cfg, closer := createRedis(t, false, "", 0)
587+
cfg, closer, _ := createRedis(t, false, "", 0)
588588
defer closer()
589589
service := New(cfg)
590590
require.NotNil(t, service)
@@ -617,7 +617,7 @@ func TestIndexBackfill(t *testing.T) {
617617
}
618618

619619
func TestDeindexBackfill(t *testing.T) {
620-
cfg, closer := createRedis(t, false, "", 0)
620+
cfg, closer, _ := createRedis(t, false, "", 0)
621621
defer closer()
622622
service := New(cfg)
623623
require.NotNil(t, service)
@@ -654,7 +654,7 @@ func TestDeindexBackfill(t *testing.T) {
654654
}
655655

656656
func TestGetIndexedBackfills(t *testing.T) {
657-
cfg, closer := createRedis(t, false, "", 0)
657+
cfg, closer, _ := createRedis(t, false, "", 0)
658658
defer closer()
659659
service := New(cfg)
660660
require.NotNil(t, service)
@@ -705,7 +705,7 @@ func generateBackfills(ctx context.Context, t *testing.T, service Service, amoun
705705

706706
func BenchmarkCleanupBackfills(b *testing.B) {
707707
t := &testing.T{}
708-
cfg, closer := createRedis(t, false, "", 0)
708+
cfg, closer, _ := createRedis(t, false, "", 0)
709709
defer closer()
710710
service := New(cfg)
711711
require.NotNil(t, service)
@@ -743,7 +743,7 @@ func BenchmarkCleanupBackfills(b *testing.B) {
743743
}
744744

745745
func TestCleanupBackfills(t *testing.T) {
746-
cfg, closer := createRedis(t, false, "", 0)
746+
cfg, closer, _ := createRedis(t, false, "", 0)
747747
defer closer()
748748
service := New(cfg)
749749
require.NotNil(t, service)

internal/statestore/redis_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
func TestNewMutex(t *testing.T) {
12-
cfg, closer := createRedis(t, false, "", 0)
12+
cfg, closer, _ := createRedis(t, false, "", 0)
1313
defer closer()
1414
service := New(cfg)
1515
require.NotNil(t, service)

internal/statestore/ticket.go

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ func (rb *redisBackend) CreateTicket(ctx context.Context, ticket *pb.Ticket) err
5353
return status.Errorf(codes.Internal, "failed to marshal the ticket proto, id: %s: proto: Marshal called with nil", ticket.GetId())
5454
}
5555

56-
timeout, isSet := getTicketReleaseTimeout(rb.cfg)
57-
if isSet {
56+
args := redis.Args{ticket.GetId(), value}
57+
timeout, ttlEnabled := getTicketReleaseTimeout(rb.cfg)
58+
if ttlEnabled {
5859
ticketTimeout := timeout / time.Millisecond
59-
// set a TTL on the ticket if configured
60-
_, err = redisConn.Do("SET", ticket.GetId(), value, "PX", int64(ticketTimeout), "XX")
61-
} else {
62-
_, err = redisConn.Do("SET", ticket.GetId(), value)
60+
args = append(args, "PX", int64(ticketTimeout))
6361
}
62+
63+
_, err = redisConn.Do("SET", args...)
6464
if err != nil {
6565
err = errors.Wrapf(err, "failed to set the value for ticket, id: %s", ticket.GetId())
6666
return status.Errorf(codes.Internal, "%v", err)
@@ -101,14 +101,15 @@ func (rb *redisBackend) GetTicket(ctx context.Context, id string) (*pb.Ticket, e
101101
return nil, status.Errorf(codes.Internal, "%v", err)
102102
}
103103

104-
timeout, enabled := getTicketReleaseTimeout(rb.cfg)
105-
if enabled {
104+
args := redis.Args{id, value}
105+
timeout, ttlEnabled := getTicketReleaseTimeout(rb.cfg)
106+
if ttlEnabled && ticket.Assignment == nil {
107+
// make sure not to overwrite the assigned ticket timeout
106108
ticketTimeout := int64(timeout / time.Millisecond)
107-
// refresh the ticket TTL
108-
err = redisConn.Send("SET", id, value, "PX", ticketTimeout, "XX")
109-
} else {
110-
err = redisConn.Send("SET", id, value)
109+
args = append(args, "PX", ticketTimeout, "XX")
111110
}
111+
112+
err = redisConn.Send("SET", args...)
112113
if err != nil {
113114
err = errors.Wrapf(err, "failed to set the value for ticket, id: %s", id)
114115
return nil, status.Errorf(codes.Internal, "%v", err)
@@ -302,17 +303,9 @@ func (rb *redisBackend) GetTickets(ctx context.Context, ids []string) ([]*pb.Tic
302303
return nil, status.Errorf(codes.Internal, "%v", err)
303304
}
304305

305-
timeout, enabled := getTicketReleaseTimeout(rb.cfg)
306+
timeout, ttlEnabled := getTicketReleaseTimeout(rb.cfg)
306307
ticketTimeout := int64(timeout / time.Millisecond)
307308

308-
if enabled {
309-
err = redisConn.Send("MULTI")
310-
if err != nil {
311-
err = errors.Wrapf(err, "failed to send multi command: %v", ids)
312-
return nil, status.Errorf(codes.Internal, "%v", err)
313-
}
314-
}
315-
316309
r := make([]*pb.Ticket, 0, len(ids))
317310

318311
for i, b := range ticketBytes {
@@ -326,8 +319,8 @@ func (rb *redisBackend) GetTickets(ctx context.Context, ids []string) ([]*pb.Tic
326319
}
327320
r = append(r, t)
328321

329-
if enabled {
330-
// also update the ticket TTL
322+
if ttlEnabled && t.Assignment == nil {
323+
// only update TTL if ticket is not assigned
331324
err = redisConn.Send("SET", ids[i], b, "PX", ticketTimeout, "XX")
332325
if err != nil {
333326
err = errors.Wrapf(err, "failed to set ticket expiry time %v", ticketTimeout)
@@ -337,7 +330,7 @@ func (rb *redisBackend) GetTickets(ctx context.Context, ids []string) ([]*pb.Tic
337330
}
338331
}
339332

340-
if enabled {
333+
if ttlEnabled {
341334
err = redisConn.Flush()
342335
if err != nil {
343336
err = errors.Wrapf(err, "failed to flush ticket TTL update %v", ids)
@@ -597,5 +590,6 @@ func getTicketReleaseTimeout(cfg config.View) (time.Duration, bool) {
597590
return defaultTicketDeleteTimeout, false
598591
}
599592

600-
return cfg.GetDuration(name), true
593+
ttlTimeout := cfg.GetDuration(name)
594+
return ttlTimeout, ttlTimeout > 0
601595
}

0 commit comments

Comments
 (0)