Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions api/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,30 @@ message QueryTicketsResponse {
repeated Ticket tickets = 1;
}

message BatchQueryTicketsRequest {
// Pools to query tickets for.
repeated Pool pools = 1;

// Max amount of tickets to query and filter for each pool. Defaults to 10000.
int32 query_limit = 2;

// Max amount of tickets per pool to return. Defaults to 1000.
int32 result_limit = 3;
}

message BatchQueryTicketsResponse {
// Ticket ids per pool
message PoolTickets {
repeated string ticket_ids = 1;
}

// Pool id to tickets id mapping
map<string, PoolTickets> pool_tickets = 1;

// Ticket id to ticket mapping
map<string, Ticket> tickets = 2;
}

message QueryTicketIdsRequest {
// The Pool representing the set of Filters to be queried.
Pool pool = 1;
Expand Down Expand Up @@ -102,6 +126,14 @@ service QueryService {
};
}

// BatchQueryTickets
rpc BatchQueryTickets(BatchQueryTicketsRequest) returns (BatchQueryTicketsResponse) {
option (google.api.http) = {
post: "/v1/queryservice/batchtickets:query"
body: "*"
};
}

// QueryTicketIds gets the list of TicketIDs that meet all the filtering criteria requested by the pool.
// - If the Pool contains no Filters, QueryTicketIds will return all TicketIDs in the state storage.
// QueryTicketIds pages the TicketIDs by `queryPageSize` and stream back responses.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
)

require (
github.com/hashicorp/golang-lru v0.6.0
google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c
google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
24 changes: 21 additions & 3 deletions internal/app/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package query

import (
"fmt"

lru "github.com/hashicorp/golang-lru"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"google.golang.org/grpc"
Expand Down Expand Up @@ -97,13 +100,28 @@ var (
}
)

const defaultBatchCacheSize = 100_000

// BindService creates the query service and binds it to the serving harness.
func BindService(p *appmain.Params, b *appmain.Bindings) error {
store := statestore.New(p.Config())

batchCacheSize := p.Config().GetInt("queryBatchCacheSize")
if batchCacheSize <= 0 {
batchCacheSize = defaultBatchCacheSize
}

batchCache, err := lru.New(batchCacheSize)
if err != nil {
return fmt.Errorf("failed to create lru cache: %w", err)
}

service := &queryService{
cfg: p.Config(),
tc: newTicketCache(b, store),
bc: newBackfillCache(b, store),
store: store,
batchCache: batchCache,
cfg: p.Config(),
tc: newTicketCache(b, store),
bc: newBackfillCache(b, store),
}

b.AddHandleFunc(func(s *grpc.Server) {
Expand Down
121 changes: 118 additions & 3 deletions internal/app/query/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@
package query

import (
"context"
"fmt"
"hash/crc32"
"runtime/trace"
"sync"

lru "github.com/hashicorp/golang-lru"
"go.opencensus.io/stats"
"golang.org/x/sync/singleflight"
"google.golang.org/protobuf/proto"
"open-match.dev/open-match/internal/statestore"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -36,9 +46,12 @@ var (
// queryService API provides utility functions for common MMF functionality such
// as retrieving Tickets from state storage.
type queryService struct {
cfg config.View
tc *cache
bc *cache
cfg config.View
tc *cache
bc *cache
store statestore.Service
batchCache *lru.Cache
batchSingleFlight singleflight.Group
}

func (s *queryService) QueryTickets(req *pb.QueryTicketsRequest, responseServer pb.QueryService_QueryTicketsServer) error {
Expand Down Expand Up @@ -91,6 +104,108 @@ func (s *queryService) QueryTickets(req *pb.QueryTicketsRequest, responseServer
return nil
}

const (
defaultBatchQueryTicketsQueryLimit = 10_000
defaultBatchQueryTicketsResultLimit = 1_000
)

func (s *queryService) BatchQueryTickets(ctx context.Context, req *pb.BatchQueryTicketsRequest) (*pb.BatchQueryTicketsResponse, error) {
queryLimit := defaultBatchQueryTicketsQueryLimit
if req.QueryLimit > 0 {
queryLimit = int(req.QueryLimit)
}

resultLimit := defaultBatchQueryTicketsResultLimit
if req.ResultLimit > 0 {
resultLimit = int(req.ResultLimit)
}

// TODO: If the properties of the pools is not stable in order when we create them, we can just go by the pool
// names here instead of the full request.
data, _ := proto.Marshal(req)
h := crc32.NewIEEE()
_, _ = h.Write(data)
singleFlightKey := string(h.Sum(nil))

v, err, _ := s.batchSingleFlight.Do(singleFlightKey, func() (interface{}, error) {
r := trace.StartRegion(ctx, "getRandomIndexIDSet")
ticketIDs, err := s.store.GetRandomIndexedIDSet(ctx, queryLimit)
if err != nil {
return nil, fmt.Errorf("failed to get random indexed ids: %w", err)
}
r.End()

r = trace.StartRegion(ctx, "fetchCached")
tickets := make(map[string]*pb.Ticket, len(ticketIDs))

var missingIDs []string
for ticketID := range ticketIDs {
if t, ok := s.batchCache.Get(ticketID); ok {
tickets[ticketID] = t.(*pb.Ticket)
continue
}
missingIDs = append(missingIDs, ticketID)
}
r.End()

r = trace.StartRegion(ctx, "fetchMissing")
if len(missingIDs) > 0 {
missingTickets, err := s.store.GetTickets(ctx, missingIDs)
if err != nil {
return nil, fmt.Errorf("failed to get missing tickets: %w", err)
}

for _, ticket := range missingTickets {
tickets[ticket.Id] = ticket
s.batchCache.Add(ticket.Id, ticket)
}
}
r.End()

poolFilters := make(map[string]*filter.PoolFilter, len(req.Pools))
poolTickets := make(map[string]*pb.BatchQueryTicketsResponse_PoolTickets, len(req.Pools))
for _, pool := range req.Pools {
poolTickets[pool.Name] = &pb.BatchQueryTicketsResponse_PoolTickets{}

pf, err := filter.NewPoolFilter(pool)
if err != nil {
return nil, err
}
poolFilters[pool.Name] = pf
}

r = trace.StartRegion(ctx, "filterTickets")
wg := &sync.WaitGroup{}
for _, pool := range req.Pools {
wg.Add(1)
go func() {
defer wg.Done()
for _, ticket := range tickets {
if len(poolTickets[pool.Name].TicketIds) >= resultLimit {
continue
}

if poolFilters[pool.Name].In(ticket) {
poolTickets[pool.Name].TicketIds = append(poolTickets[pool.Name].TicketIds)
}
}
}()
}
wg.Wait()
r.End()

return &pb.BatchQueryTicketsResponse{
PoolTickets: poolTickets,
Tickets: tickets,
}, nil
})
if err != nil {
return nil, err
}

return v.(*pb.BatchQueryTicketsResponse), nil
}

func (s *queryService) QueryTicketIds(req *pb.QueryTicketIdsRequest, responseServer pb.QueryService_QueryTicketIdsServer) error {
ctx := responseServer.Context()
pool := req.GetPool()
Expand Down
6 changes: 6 additions & 0 deletions internal/statestore/instrumented.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func (is *instrumentedService) GetIndexedIDSet(ctx context.Context) (map[string]
return is.s.GetIndexedIDSet(ctx)
}

func (is *instrumentedService) GetRandomIndexedIDSet(ctx context.Context, limit int) (map[string]struct{}, error) {
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.GetRandomIndexedIDSet")
defer span.End()
return is.s.GetRandomIndexedIDSet(ctx, limit)
}

func (is *instrumentedService) UpdateAssignments(ctx context.Context, req *pb.AssignTicketsRequest) (*pb.AssignTicketsResponse, []*pb.Ticket, error) {
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.UpdateAssignments")
defer span.End()
Expand Down
3 changes: 3 additions & 0 deletions internal/statestore/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Service interface {
// GetIndexedIDSet returns the ids of all tickets currently indexed.
GetIndexedIDSet(ctx context.Context) (map[string]struct{}, error)

// GetRandomIndexedIDSet returns random ids of all tickets currently indexed.
GetRandomIndexedIDSet(ctx context.Context, limit int) (map[string]struct{}, error)

// GetTickets returns multiple tickets from storage.
// Missing tickets are silently ignored.
GetTickets(ctx context.Context, ids []string) ([]*pb.Ticket, error)
Expand Down
35 changes: 35 additions & 0 deletions internal/statestore/ticket.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,41 @@ func (rb *redisBackend) GetIndexedIDSet(ctx context.Context) (map[string]struct{
return r, nil
}

// GetRandomIndexedIDSet returns random ids of currently indexed tickets.
func (rb *redisBackend) GetRandomIndexedIDSet(ctx context.Context, limit int) (map[string]struct{}, error) {
redisConn, err := rb.redisPool.GetContext(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "GetRandomIndexedIDSet, failed to connect to redis: %v", err)
}
defer handleConnectionClose(&redisConn)

ttl := getBackfillReleaseTimeout(rb.cfg)
curTime := time.Now()
endTimeInt := curTime.Add(time.Hour).UnixNano()
startTimeInt := curTime.Add(-ttl).UnixNano()

// Filter out tickets that are fetched but not assigned within ttl time (ms).
idsInPendingReleases, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", proposedTicketIDs, startTimeInt, endTimeInt))
if err != nil {
return nil, status.Errorf(codes.Internal, "error getting pending release %v", err)
}

idsIndexed, err := redis.Strings(redisConn.Do("SRANDMEMBER", allTickets, limit))
if err != nil {
return nil, status.Errorf(codes.Internal, "error getting random indexed ticket ids %v", err)
}

r := make(map[string]struct{}, len(idsIndexed))
for _, id := range idsIndexed {
r[id] = struct{}{}
}
for _, id := range idsInPendingReleases {
delete(r, id)
}

return r, nil
}

// GetIndexedTicketCount retrieves the current ticket count
func (rb *redisBackend) GetIndexedTicketCount(ctx context.Context) (int, error) {
redisConn, err := rb.redisPool.GetContext(ctx)
Expand Down
Loading