Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NK-588 Add storage index listing pagination support #1276

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr

### Changed
- Increased limit on runtimes group users list functions.
- Added pagination support to storage index listing.

### Fixed
- Ensure matchmaker stats behave correctly if matchmaker becomes fully empty and idle.
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0
github.com/heroiclabs/nakama-common v1.33.1-0.20240920140332-3cdf52bdf781
github.com/heroiclabs/nakama-common v1.33.1-0.20241021170115-c7b5486ef0aa
github.com/heroiclabs/sql-migrate v0.0.0-20240528102547-233afc8cf05a
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jackc/pgx/v5 v5.6.0
Expand Down Expand Up @@ -74,5 +74,3 @@ require (
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
)

replace github.com/heroiclabs/nakama-common => ../nakama-common
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZH
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/heroiclabs/nakama-common v1.33.1-0.20240920140332-3cdf52bdf781 h1:mVjenNkPCNqQscldkvoBmmeCVS6MXzoN1rDd2u/+BbE=
github.com/heroiclabs/nakama-common v1.33.1-0.20240920140332-3cdf52bdf781/go.mod h1:lPG64MVCs0/tEkh311Cd6oHX9NLx2vAPx7WW7QCJHQ0=
github.com/heroiclabs/nakama-common v1.33.1-0.20241021170115-c7b5486ef0aa h1:AWkD2AwYSMexj1mPArQvs2xkZTkvXvzqdvHlN4UeHOs=
github.com/heroiclabs/nakama-common v1.33.1-0.20241021170115-c7b5486ef0aa/go.mod h1:lPG64MVCs0/tEkh311Cd6oHX9NLx2vAPx7WW7QCJHQ0=
github.com/heroiclabs/sql-migrate v0.0.0-20240528102547-233afc8cf05a h1:tuL2ZPaeCbNw8rXmV9ywd00nXRv95V4/FmbIGKLQJAE=
github.com/heroiclabs/sql-migrate v0.0.0-20240528102547-233afc8cf05a/go.mod h1:hzCTPoEi/oml2BllVydJcNP63S7b56e5DzeQeLGvw1U=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
12 changes: 7 additions & 5 deletions server/runtime_go_nakama.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,27 +2151,29 @@ func (n *RuntimeGoNakamaModule) StorageDelete(ctx context.Context, deletes []*ru
// @param queryString(type=string) Query to filter index entries.
// @param limit(type=int) Maximum number of results to be returned.
// @param order(type=[]string, optional=true) The storage object fields to sort the query results by. The prefix '-' before a field name indicates descending order. All specified fields must be indexed and sortable.
// @param cursor(type=string) A cursor to fetch the next page of results.
// @return objects(*api.StorageObjectList) A list of storage objects.
// @return cursor(type=string) An optional cursor that can be used to retrieve the next page of records (if any).
// @return error(error) An optional error value if an error occurred.
func (n *RuntimeGoNakamaModule) StorageIndexList(ctx context.Context, callerID, indexName, query string, limit int, order []string) (*api.StorageObjects, error) {
func (n *RuntimeGoNakamaModule) StorageIndexList(ctx context.Context, callerID, indexName, query string, limit int, order []string, cursor string) (*api.StorageObjects, string, error) {
cid := uuid.Nil
if callerID != "" {
id, err := uuid.FromString(callerID)
if err != nil {
return nil, errors.New("expects caller id to be empty or a valid user id")
return nil, "", errors.New("expects caller id to be empty or a valid user id")
}
cid = id
}

if indexName == "" {
return nil, errors.New("expects a non-empty indexName")
return nil, "", errors.New("expects a non-empty indexName")
}

if limit < 1 || limit > 10_000 {
return nil, errors.New("limit must be 1-10000")
return nil, "", errors.New("limit must be 1-10000")
}

return n.storageIndex.List(ctx, cid, indexName, query, limit, order)
return n.storageIndex.List(ctx, cid, indexName, query, limit, order, cursor)
}

// @group users
Expand Down
45 changes: 29 additions & 16 deletions server/runtime_javascript_nakama.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (n *runtimeJavascriptNakamaModule) stringToBinary(r *goja.Runtime) func(goj
// @param limit(type=int) Maximum number of results to be returned.
// @param order(type=[]string, optional=true) The storage object fields to sort the query results by. The prefix '-' before a field name indicates descending order. All specified fields must be indexed and sortable.
// @param callerId(type=string, optional=true) User ID of the caller, will apply permissions checks of the user. If empty defaults to system user and permission checks are bypassed.
// @return objects(nkruntime.StorageObjectList) A list of storage objects.
// @return objects(nkruntime.StorageIndexResult) A list of storage objects.
// @return error(error) An optional error value if an error occurred.
func (n *runtimeJavascriptNakamaModule) storageIndexList(r *goja.Runtime) func(goja.FunctionCall) goja.Value {
return func(f goja.FunctionCall) goja.Value {
Expand Down Expand Up @@ -391,39 +391,52 @@ func (n *runtimeJavascriptNakamaModule) storageIndexList(r *goja.Runtime) func(g
callerID = cid
}

objectList, err := n.storageIndex.List(n.ctx, callerID, idxName, queryString, int(limit), order)
var cursor string
if !goja.IsUndefined(f.Argument(5)) && !goja.IsNull(f.Argument(5)) {
cursor = getJsString(r, f.Argument(5))
}

objectList, newCursor, err := n.storageIndex.List(n.ctx, callerID, idxName, queryString, int(limit), order, cursor)
if err != nil {
panic(r.NewGoError(fmt.Errorf("failed to lookup storage index: %s", err.Error())))
}

objects := make([]interface{}, 0, len(objectList.Objects))
objects := make([]any, 0, len(objectList.Objects))
for _, o := range objectList.Objects {
objectMap := make(map[string]interface{}, 9)
objectMap["key"] = o.Key
objectMap["collection"] = o.Collection
obj := r.NewObject()
_ = obj.Set("key", o.Key)
_ = obj.Set("collection", o.Collection)
if o.UserId != "" {
objectMap["userId"] = o.UserId
_ = obj.Set("userId", o.UserId)
} else {
objectMap["userId"] = nil
_ = obj.Set("userId", nil)
}
objectMap["version"] = o.Version
objectMap["permissionRead"] = o.PermissionRead
objectMap["permissionWrite"] = o.PermissionWrite
objectMap["createTime"] = o.CreateTime.Seconds
objectMap["updateTime"] = o.UpdateTime.Seconds
_ = obj.Set("version", o.Version)
_ = obj.Set("permissionRead", o.PermissionRead)
_ = obj.Set("permissionWrite", o.PermissionWrite)
_ = obj.Set("createTime", o.CreateTime.Seconds)
_ = obj.Set("updateTime", o.UpdateTime.Seconds)

valueMap := make(map[string]interface{})
err = json.Unmarshal([]byte(o.Value), &valueMap)
if err != nil {
panic(r.NewGoError(fmt.Errorf("failed to convert value to json: %s", err.Error())))
}
pointerizeSlices(valueMap)
objectMap["value"] = valueMap
_ = obj.Set("value", valueMap)

objects = append(objects, objectMap)
objects = append(objects, obj)
}

outObj := r.NewObject()
_ = outObj.Set("objects", r.NewArray(objects...))
if newCursor != "" {
_ = outObj.Set("cursor", newCursor)
} else {
_ = outObj.Set("cursor", goja.Null())
}

return r.ToValue(objects)
return r.ToValue(outObj)
}
}

Expand Down
13 changes: 11 additions & 2 deletions server/runtime_lua_nakama.go
Original file line number Diff line number Diff line change
Expand Up @@ -10391,6 +10391,7 @@ func (n *RuntimeLuaNakamaModule) channelIdBuild(l *lua.LState) int {
// @param order(type=[]string, optional=true) The storage object fields to sort the query results by. The prefix '-' before a field name indicates descending order. All specified fields must be indexed and sortable.
// @param callerId(type=string, optional=true) User ID of the caller, will apply permissions checks of the user. If empty defaults to system user and permission checks are bypassed.
// @return objects(table) A list of storage objects.
// @return objects(string) A cursor, if there's a next page of results, nil otherwise.
// @return error(error) An optional error value if an error occurred.
func (n *RuntimeLuaNakamaModule) storageIndexList(l *lua.LState) int {
idxName := l.CheckString(1)
Expand Down Expand Up @@ -10421,7 +10422,9 @@ func (n *RuntimeLuaNakamaModule) storageIndexList(l *lua.LState) int {
callerID = cid
}

objectList, err := n.storageIndex.List(l.Context(), callerID, idxName, queryString, limit, order)
cursor := l.OptString(6, "")

objectList, newCursor, err := n.storageIndex.List(l.Context(), callerID, idxName, queryString, limit, order, cursor)
if err != nil {
l.RaiseError(err.Error())
return 0
Expand Down Expand Up @@ -10456,7 +10459,13 @@ func (n *RuntimeLuaNakamaModule) storageIndexList(l *lua.LState) int {
}
l.Push(lv)

return 1
if newCursor != "" {
l.Push(lua.LString(newCursor))
} else {
l.Push(lua.LNil)
}

return 2
}

// @group satori
Expand Down
84 changes: 72 additions & 12 deletions server/storage_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
package server

import (
"bytes"
"context"
"database/sql"
"encoding/base64"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"slices"
"time"

"github.com/blugelabs/bluge"
Expand All @@ -35,7 +39,7 @@ import (
type StorageIndex interface {
Write(ctx context.Context, objects []*api.StorageObject) (creates int, deletes int)
Delete(ctx context.Context, objects StorageOpDeletes) (deletes int)
List(ctx context.Context, callerID uuid.UUID, indexName, query string, limit int, order []string) (*api.StorageObjects, error)
List(ctx context.Context, callerID uuid.UUID, indexName, query string, limit int, order []string, cursor string) (*api.StorageObjects, string, error)
Load(ctx context.Context) error
CreateIndex(ctx context.Context, name, collection, key string, fields []string, sortFields []string, maxEntries int, indexOnly bool) error
RegisterFilters(runtime *Runtime)
Expand Down Expand Up @@ -218,10 +222,17 @@ func (si *LocalStorageIndex) Delete(ctx context.Context, objects StorageOpDelete
return deletes
}

func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, indexName, query string, limit int, order []string) (*api.StorageObjects, error) {
type indexListCursor struct {
Query string
Offset int
Limit int
Order []string
}

func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, indexName, query string, limit int, order []string, cursor string) (*api.StorageObjects, string, error) {
idx, found := si.indexByName[indexName]
if !found {
return nil, fmt.Errorf("index %q not found", indexName)
return nil, "", fmt.Errorf("index %q not found", indexName)
}

if limit > idx.MaxEntries {
Expand All @@ -232,34 +243,83 @@ func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, index
query = "*"
}

var idxCursor *indexListCursor
if cursor != "" {
idxCursor = &indexListCursor{}
cb, err := base64.RawURLEncoding.DecodeString(cursor)
if err != nil {
si.logger.Error("Could not base64 decode notification cursor.", zap.String("cursor", cursor))
return nil, "", errors.New("invalid cursor")
}
if err := gob.NewDecoder(bytes.NewReader(cb)).Decode(idxCursor); err != nil {
si.logger.Error("Could not decode notification cursor.", zap.String("cursor", cursor))
return nil, "", errors.New("invalid cursor")
}

if query != idxCursor.Query {
return nil, "", fmt.Errorf("invalid cursor: query mismatch")
}
if limit != idxCursor.Limit {
return nil, "", fmt.Errorf("invalid cursor: limit mismatch")
}
if !slices.Equal(order, idxCursor.Order) {
return nil, "", fmt.Errorf("invalid cursor: order mismatch")
}
}

parsedQuery, err := ParseQueryString(query)
if err != nil {
return nil, err
return nil, "", err
}

searchReq := bluge.NewTopNSearch(limit, parsedQuery)
searchReq := bluge.NewTopNSearch(limit+1, parsedQuery)

if len(order) != 0 {
searchReq.SortBy(order)
}

if idxCursor != nil {
searchReq.SetFrom(idxCursor.Offset)
}

indexReader, err := idx.Index.Reader()
if err != nil {
return nil, err
return nil, "", err
}

results, err := indexReader.Search(ctx, searchReq)
if err != nil {
return nil, err
return nil, "", err
}

indexResults, err := si.queryMatchesToStorageIndexResults(results)
if err != nil {
return nil, err
return nil, "", err
}

var newCursor string
if len(indexResults) > limit {
indexResults = indexResults[:len(indexResults)-1]
offset := 0
if idxCursor != nil {
offset = idxCursor.Offset
}
newIdxCursor := &indexListCursor{
Query: query,
Offset: offset + limit,
Limit: limit,
Order: order,
}
cursorBuf := new(bytes.Buffer)
if err := gob.NewEncoder(cursorBuf).Encode(newIdxCursor); err != nil {
si.logger.Error("Failed to create new cursor.", zap.Error(err))
return nil, "", err
}
newCursor = base64.RawURLEncoding.EncodeToString(cursorBuf.Bytes())
}

if len(indexResults) == 0 {
return &api.StorageObjects{Objects: []*api.StorageObject{}}, nil
return &api.StorageObjects{Objects: []*api.StorageObject{}}, "", nil
}

if !si.config.DisableIndexOnly && idx.IndexOnly {
Expand All @@ -282,7 +342,7 @@ func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, index
})
}

return &api.StorageObjects{Objects: objects}, nil
return &api.StorageObjects{Objects: objects}, newCursor, nil
}

storageReads := make([]*api.ReadStorageObjectId, 0, len(indexResults))
Expand All @@ -296,7 +356,7 @@ func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, index

objects, err := StorageReadObjects(ctx, si.logger, si.db, callerID, storageReads)
if err != nil {
return nil, err
return nil, "", err
}

// Sort the objects read from the db according to the results from the index as StorageReadObjects does not guarantee ordering of the results
Expand All @@ -315,7 +375,7 @@ func (si *LocalStorageIndex) List(ctx context.Context, callerID uuid.UUID, index

objects.Objects = sortedObjects

return objects, nil
return objects, newCursor, nil
}

func (si *LocalStorageIndex) Load(ctx context.Context) error {
Expand Down
Loading
Loading