Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ message QueryBackfillsResponse {
repeated Backfill backfills = 1;
}

message QueryExpiredTicketsRequest {
int32 limit = 1;
}

message QueryExpiredTicketsResponse {
// Tickets that meet all the filtering criteria requested by the pool.
repeated Ticket tickets = 1;
}

// The QueryService service implements helper APIs for Match Function to query Tickets from state storage.
service QueryService {
// QueryTickets gets a list of Tickets that match all Filters of the input Pool.
Expand Down Expand Up @@ -122,4 +131,13 @@ service QueryService {
body: "*"
};
}


// QueryExpiredTickets get the expired tickets limited by the specified number
rpc QueryExpiredTickets(QueryExpiredTicketsRequest) returns (stream QueryExpiredTicketsResponse) {
option (google.api.http) = {
post: "/v1/queryservice/tickets:expired"
body: "*"
};
}
}
71 changes: 71 additions & 0 deletions api/query.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,55 @@
]
}
},
"/v1/queryservice/tickets:expired": {
"post": {
"summary": "QueryExpiredTickets get the expired tickets limited by the specified number",
"operationId": "QueryService_QueryExpiredTickets",
"responses": {
"200": {
"description": "A successful response.(streaming responses)",
"schema": {
"type": "object",
"properties": {
"result": {
"$ref": "#/definitions/openmatchQueryExpiredTicketsResponse"
},
"error": {
"$ref": "#/definitions/rpcStatus"
}
},
"title": "Stream result of openmatchQueryExpiredTicketsResponse"
}
},
"404": {
"description": "Returned when the resource does not exist.",
"schema": {
"type": "string",
"format": "string"
}
},
"default": {
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/rpcStatus"
}
}
},
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/openmatchQueryExpiredTicketsRequest"
}
}
],
"tags": [
"QueryService"
]
}
},
"/v1/queryservice/tickets:query": {
"post": {
"summary": "QueryTickets gets a list of Tickets that match all Filters of the input Pool.\n - If the Pool contains no Filters, QueryTickets will return all Tickets in the state storage.\nQueryTickets pages the Tickets by `queryPageSize` and stream back responses.\n - queryPageSize is default to 1000 if not set, and has a minimum of 10 and maximum of 10000.",
Expand Down Expand Up @@ -335,6 +384,28 @@
},
"description": "BETA FEATURE WARNING: This Request messages are not finalized and \nstill subject to possible change or removal."
},
"openmatchQueryExpiredTicketsRequest": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"format": "int32"
}
}
},
"openmatchQueryExpiredTicketsResponse": {
"type": "object",
"properties": {
"tickets": {
"type": "array",
"items": {
"type": "object",
"$ref": "#/definitions/openmatchTicket"
},
"description": "Tickets that meet all the filtering criteria requested by the pool."
}
}
},
"openmatchQueryTicketIdsRequest": {
"type": "object",
"properties": {
Expand Down
3 changes: 2 additions & 1 deletion internal/app/query/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func updateTicketCache(store statestore.Service, value interface{}) error {

t := time.Now()
previousCount := len(tickets)
currentAll, err := store.GetIndexedIDSet(context.Background())
// get all indexed tickets within the valid time window
currentAll, err := store.GetIndexedIDSetWithTTL(context.Background(), 0)
if err != nil {
return err
}
Expand Down
28 changes: 28 additions & 0 deletions internal/app/query/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,34 @@ func (s *queryService) QueryBackfills(req *pb.QueryBackfillsRequest, responseSer
return nil
}

// QueryExpiredTickets get the expired tickets limited by the specified number
func (s *queryService) QueryExpiredTickets(req *pb.QueryExpiredTicketsRequest, responseServer pb.QueryService_QueryExpiredTicketsServer) error {
ctx := responseServer.Context()
limit := int(req.GetLimit())

tickets, err := s.bc.store.GetExpiredTickets(ctx, limit)
if err != nil {
return err
}

pSize := getPageSize(s.cfg)
for start := 0; start < len(tickets); start += pSize {
end := start + pSize
if end > len(tickets) {
end = len(tickets)
}

err := responseServer.Send(&pb.QueryExpiredTicketsResponse{
Tickets: tickets[start:end],
})
if err != nil {
return err
}
}

return nil
}

func getPageSize(cfg config.View) int {
const (
name = "queryPageSize"
Expand Down
36 changes: 18 additions & 18 deletions internal/statestore/backfill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

func TestCreateBackfillLastAckTime(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand All @@ -56,7 +56,7 @@ func TestCreateBackfillLastAckTime(t *testing.T) {
}

func TestCreateBackfill(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestCreateBackfill(t *testing.T) {
}

func TestUpdateExistingBackfillNoError(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestUpdateExistingBackfillNoError(t *testing.T) {
}

func TestUpdateBackfillDoNotExistCanNotUpdate(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestUpdateBackfillDoNotExistCanNotUpdate(t *testing.T) {
}

func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) {
}

func TestUpdateBackfillExpiredContextErrExpected(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand All @@ -254,7 +254,7 @@ func TestUpdateBackfillExpiredContextErrExpected(t *testing.T) {
}

func TestGetBackfill(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestGetBackfill(t *testing.T) {
}

func TestDeleteBackfill(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestDeleteBackfill(t *testing.T) {
// TestUpdateAcknowledgmentTimestampLifecycle test statestore functions - UpdateAcknowledgmentTimestamp, GetExpiredBackfillIDs
// and deleteExpiredBackfillID
func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()

service := New(cfg)
Expand Down Expand Up @@ -480,7 +480,7 @@ func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) {
}

func TestUpdateAcknowledgmentTimestamp(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()

startTime := time.Now()
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestUpdateAcknowledgmentTimestamp(t *testing.T) {
}

func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand All @@ -535,7 +535,7 @@ func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T)
}

func TestUpdateAcknowledgmentTimestampConnectionError(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand All @@ -560,7 +560,7 @@ func createInvalidRedisConfig() config.View {
// TestGetExpiredBackfillIDs test statestore function GetExpiredBackfillIDs
func TestGetExpiredBackfillIDs(t *testing.T) {
// Prepare expired and normal BackfillIds in a Redis Sorted Set
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()

expID := "expired"
Expand All @@ -584,7 +584,7 @@ func TestGetExpiredBackfillIDs(t *testing.T) {
}

func TestIndexBackfill(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -617,7 +617,7 @@ func TestIndexBackfill(t *testing.T) {
}

func TestDeindexBackfill(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestDeindexBackfill(t *testing.T) {
}

func TestGetIndexedBackfills(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -705,7 +705,7 @@ func generateBackfills(ctx context.Context, t *testing.T, service Service, amoun

func BenchmarkCleanupBackfills(b *testing.B) {
t := &testing.T{}
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down Expand Up @@ -743,7 +743,7 @@ func BenchmarkCleanupBackfills(b *testing.B) {
}

func TestCleanupBackfills(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down
15 changes: 15 additions & 0 deletions internal/statestore/instrumented.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,23 @@ func (is *instrumentedService) DeleteBackfillCompletely(ctx context.Context, id
return is.s.DeleteBackfillCompletely(ctx, id)
}

// GetIndexedTicketCount returns the current count of indexed tickets
func (is *instrumentedService) GetIndexedTicketCount(ctx context.Context) (int, error) {
ctx, span := trace.StartSpan(context.Background(), "statestore/instrumented.GetIndexedTicketCount")
defer span.End()
return is.s.GetIndexedTicketCount(ctx)
}

// GetExpiredTickets gets all ticket IDs which are expired
func (is *instrumentedService) GetExpiredTickets(ctx context.Context, limit int) ([]*pb.Ticket, error) {
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.GetExpiredTickets")
defer span.End()
return is.s.GetExpiredTickets(ctx, limit)
}

// GetIndexedIDSetWithTTL returns the ids of all tickets currently indexed but within a given TTL.
func (is *instrumentedService) GetIndexedIDSetWithTTL(ctx context.Context, limit int) (map[string]struct{}, error) {
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.GetIndexedIDSetWithTTL")
defer span.End()
return is.s.GetIndexedIDSetWithTTL(ctx, 0)
}
6 changes: 6 additions & 0 deletions internal/statestore/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ type Service interface {

// GetIndexedTicketCount returns the current count of indexed tickets
GetIndexedTicketCount(ctx context.Context) (int, error)

// GetExpiredTickets gets all tickets which are expired
GetExpiredTickets(ctx context.Context, limit int) ([]*pb.Ticket, error)

// GetIndexedIDSetWithTTL returns the ids of all tickets currently indexed but within a given TTL.
GetIndexedIDSetWithTTL(ctx context.Context, limit int) (map[string]struct{}, error)
}

// New creates a Service based on the configuration.
Expand Down
2 changes: 1 addition & 1 deletion internal/statestore/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestNewMutex(t *testing.T) {
cfg, closer := createRedis(t, false, "")
cfg, closer := createRedis(t, false, "", 0)
defer closer()
service := New(cfg)
require.NotNil(t, service)
Expand Down
Loading