Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/app/backend/backend_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func synchronizeRecv(ctx context.Context, syncStream synchronizerStream, m *sync
err = createOrUpdateBackfill(ctx, backfill, ticketIds, store)
if err != nil {
e, ok := status.FromError(err)
if err == errBackfillGenerationMismatch || (ok && e.Code() == codes.NotFound) {
if err == errBackfillGenerationMismatch || (ok && (e.Code() == codes.NotFound || e.Code() == codes.FailedPrecondition)) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change

err = doReleaseTickets(ctx, ticketIds, store)
if err != nil {
logger.WithError(err).Errorf("failed to remove match tickets from pending release: %v", ticketIds)
Expand Down
17 changes: 11 additions & 6 deletions internal/app/frontend/frontend_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestCreateBackfill(t *testing.T) {
defer closer()
ctx := utilTesting.NewContext(t)
fs := frontendService{cfg: cfg, store: store}
var testCases = []struct {
testCases := []struct {
description string
request *pb.CreateBackfillRequest
result *pb.Backfill
Expand Down Expand Up @@ -133,7 +133,10 @@ func TestCreateBackfill(t *testing.T) {
SearchFields: &pb.SearchFields{
StringArgs: map[string]string{
"search": "me",
}}}},
},
},
},
},
expectedCode: codes.OK,
expectedMessage: "",
},
Expand Down Expand Up @@ -191,7 +194,7 @@ func TestUpdateBackfill(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)

var testCases = []struct {
testCases := []struct {
description string
request *pb.UpdateBackfillRequest
result *pb.Backfill
Expand Down Expand Up @@ -224,7 +227,10 @@ func TestUpdateBackfill(t *testing.T) {
SearchFields: &pb.SearchFields{
StringArgs: map[string]string{
"search": "me",
}}}},
},
},
},
},
expectedCode: codes.OK,
expectedMessage: "",
},
Expand Down Expand Up @@ -421,9 +427,8 @@ func TestAcknowledgeBackfill(t *testing.T) {
resp, err = fs.AcknowledgeBackfill(ctx, &pb.AcknowledgeBackfillRequest{BackfillId: fakeBackfill.Id, Assignment: &pb.Assignment{Connection: "10.0.0.1"}})
require.Nil(t, resp)
require.Error(t, err)
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
require.Contains(t, status.Convert(err).Message(), "can not acknowledge an expired backfill, id: 1")

}

func TestDoDeleteTicket(t *testing.T) {
Expand Down
16 changes: 6 additions & 10 deletions internal/statestore/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ import (
"open-match.dev/open-match/pkg/pb"
)

var (
logger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "statestore.redis",
})
)
var logger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "statestore.redis",
})

const (
backfillLastAckTime = "backfill_last_ack_time"
Expand Down Expand Up @@ -199,7 +197,7 @@ func (rb *redisBackend) UpdateBackfill(ctx context.Context, backfill *pb.Backfil
}

if expired {
return status.Errorf(codes.Unavailable, "can not update an expired backfill, id: %s", backfill.Id)
return status.Errorf(codes.FailedPrecondition, "can not update an expired backfill, id: %s", backfill.Id)
}

bf := ipb.BackfillInternal{
Expand Down Expand Up @@ -338,7 +336,7 @@ func (rb *redisBackend) UpdateAcknowledgmentTimestamp(ctx context.Context, id st
}

if expired {
return status.Errorf(codes.Unavailable, "can not acknowledge an expired backfill, id: %s", id)
return status.Errorf(codes.FailedPrecondition, "can not acknowledge an expired backfill, id: %s", id)
}

return doUpdateAcknowledgmentTimestamp(redisConn, id)
Expand Down Expand Up @@ -380,7 +378,6 @@ func (rb *redisBackend) GetExpiredBackfillIDs(ctx context.Context) ([]string, er

// deleteExpiredBackfillID deletes expired BackfillID from a sorted set
func (rb *redisBackend) deleteExpiredBackfillID(conn redis.Conn, backfillID string) error {

_, err := conn.Do("ZREM", backfillLastAckTime, backfillID)
if err != nil {
return status.Errorf(codes.Internal, "failed to delete expired backfill ID %s from Sorted Set %s",
Expand Down Expand Up @@ -459,7 +456,6 @@ func (rb *redisBackend) GetIndexedBackfills(ctx context.Context) (map[string]int
}

return r, nil

}

func getBackfillReleaseTimeout(cfg config.View) time.Duration {
Expand Down
17 changes: 8 additions & 9 deletions internal/statestore/backfill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestCreateBackfill(t *testing.T) {
Generation: 1,
}

var testCases = []struct {
testCases := []struct {
description string
backfill *pb.Backfill
ticketIDs []string
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) {

err = service.UpdateBackfill(ctx, &bf, nil)
require.Error(t, err)
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
require.Contains(t, status.Convert(err).Message(), fmt.Sprintf("can not update an expired backfill, id: %s", bfID))
}

Expand Down Expand Up @@ -274,7 +274,7 @@ func TestGetBackfill(t *testing.T) {
_, err = c.Do("SET", "wrong-type-key", "wrong-type-value")
require.NoError(t, err)

var testCases = []struct {
testCases := []struct {
description string
backfillID string
expectedCode codes.Code
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestDeleteBackfill(t *testing.T) {
defer service.Close()
ctx := utilTesting.NewContext(t)

//Last Acknowledge timestamp is updated on Frontend CreateBackfill
// Last Acknowledge timestamp is updated on Frontend CreateBackfill
bfID := "mockBackfillID"
err := service.CreateBackfill(ctx, &pb.Backfill{
Id: bfID,
Expand All @@ -363,7 +363,7 @@ func TestDeleteBackfill(t *testing.T) {
require.NoError(t, err)
require.True(t, ts > 0, "timestamp is not valid")

var testCases = []struct {
testCases := []struct {
description string
backfillID string
expectedCode codes.Code
Expand Down Expand Up @@ -419,7 +419,6 @@ func TestDeleteBackfill(t *testing.T) {
require.Error(t, err)
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
require.Contains(t, status.Convert(err).Message(), "DeleteBackfill, id: 12345, failed to connect to redis:")

}

// TestUpdateAcknowledgmentTimestampLifecycle test statestore functions - UpdateAcknowledgmentTimestamp, GetExpiredBackfillIDs
Expand Down Expand Up @@ -464,12 +463,12 @@ func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) {

err = service.UpdateAcknowledgmentTimestamp(ctx, bf1)
require.Error(t, err)
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
require.Contains(t, status.Convert(err).Message(), fmt.Sprintf("can not acknowledge an expired backfill, id: %s", bf1))

err = service.UpdateAcknowledgmentTimestamp(ctx, bf2)
require.Error(t, err)
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
require.Contains(t, status.Convert(err).Message(), fmt.Sprintf("can not acknowledge an expired backfill, id: %s", bf2))

err = service.DeleteBackfill(ctx, bfIDs[0])
Expand Down Expand Up @@ -531,7 +530,7 @@ func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T)

err = service.UpdateAcknowledgmentTimestamp(ctx, bfID)
require.Error(t, err)
require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String())
require.Equal(t, codes.FailedPrecondition.String(), status.Convert(err).Code().String())
require.Contains(t, status.Convert(err).Message(), fmt.Sprintf("can not acknowledge an expired backfill, id: %s", bfID))
}

Expand Down
71 changes: 65 additions & 6 deletions testing/e2e/backfill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"regexp"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -186,11 +187,12 @@ func TestAcknowledgeBackfillDeletedTicket(t *testing.T) {
om := newOM(t)
ctx := context.Background()

bf := &pb.Backfill{SearchFields: &pb.SearchFields{
StringArgs: map[string]string{
"search": "me",
bf := &pb.Backfill{
SearchFields: &pb.SearchFields{
StringArgs: map[string]string{
"search": "me",
},
},
},
}
createdBf, err := om.Frontend().CreateBackfill(ctx, &pb.CreateBackfillRequest{Backfill: bf})
require.NoError(t, err)
Expand Down Expand Up @@ -412,7 +414,7 @@ func TestProposedBackfillUpdate(t *testing.T) {
"field1": "value1",
},
}
//using DefaultEvaluationCriteria just for testing purposes only
// using DefaultEvaluationCriteria just for testing purposes only
b.Extensions = map[string]*anypb.Any{
"evaluation_input": mustAny(&pb.DefaultEvaluationCriteria{
Score: 10,
Expand Down Expand Up @@ -571,7 +573,8 @@ func TestBackfillSkipNotfoundError(t *testing.T) {
StringArgs: map[string]string{
"search": "me",
},
}}})
},
}})
require.NoError(t, err)
require.NotNil(t, b1)

Expand Down Expand Up @@ -613,6 +616,62 @@ func TestBackfillSkipNotfoundError(t *testing.T) {
require.Equal(t, fmt.Sprintf("rpc error: code = NotFound desc = Backfill id: %s not found", b1.Id), err.Error())
}

func TestBackfillSkipExpiredError(t *testing.T) {
ctx := context.Background()
om := newOM(t)

t1, err := om.Frontend().CreateTicket(ctx, &pb.CreateTicketRequest{Ticket: &pb.Ticket{}})
require.NoError(t, err)
require.NotNil(t, t1)

b1, err := om.Frontend().CreateBackfill(ctx, &pb.CreateBackfillRequest{Backfill: &pb.Backfill{
SearchFields: &pb.SearchFields{
StringArgs: map[string]string{
"search": "me",
},
},
}})
require.NoError(t, err)
require.NotNil(t, b1)

m := &pb.Match{
MatchId: "1",
Tickets: []*pb.Ticket{t1},
Backfill: b1,
}

om.SetMMF(func(ctx context.Context, profile *pb.MatchProfile, out chan<- *pb.Match) error {
out <- m
return nil
})

om.SetEvaluator(func(ctx context.Context, in <-chan *pb.Match, out chan<- string) error {
p, ok := <-in
require.True(t, ok)

out <- p.MatchId
return nil
})

// Depends on pendingReleaseTimeout
time.Sleep(1 * time.Second)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit ugly, would be nice if we could have something like k8s.io/utils/clock/testing to pass the time


// Make sure backfill has expired
_, err = om.Frontend().AcknowledgeBackfill(ctx, &pb.AcknowledgeBackfillRequest{BackfillId: b1.Id, Assignment: &pb.Assignment{}})
require.Error(t, err)
require.Equal(t, fmt.Sprintf("rpc error: code = FailedPrecondition desc = can not acknowledge an expired backfill, id: %s", b1.Id), err.Error())

stream, err := om.Backend().FetchMatches(ctx, &pb.FetchMatchesRequest{
Config: om.MMFConfigGRPC(),
Profile: &pb.MatchProfile{},
})
require.NoError(t, err)
resp, err := stream.Recv()
require.Nil(t, resp)
require.Error(t, err)
require.Equal(t, io.EOF.Error(), err.Error())
}

func mustAny(m proto.Message) *anypb.Any {
result, err := anypb.New(m)
if err != nil {
Expand Down