Skip to content

Commit ac4f652

Browse files
feat: add atomic room allocation API with TTL-based reservations (#684)
1 parent 1f6778f commit ac4f652

File tree

28 files changed

+1323
-88
lines changed

28 files changed

+1323
-88
lines changed

cmd/roomsapi/rooms_api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ func buildMuxWithMetricsMdlw(mdlw middleware.Middleware, mux *runtime.ServeMux)
161161
handler = std.Handler("/scheduler/:schedulerName/rooms/:roomID/status", mdlw, mux)
162162
case commom.MatchPath(path, fmt.Sprintf("^/scheduler/%s/rooms/%s/address", anyWordRegex, anyWordRegex)):
163163
handler = std.Handler("/scheduler/:schedulerName/rooms/:roomID/address", mdlw, mux)
164+
case commom.MatchPath(path, fmt.Sprintf("^/scheduler/%s/rooms/allocate$", anyWordRegex)):
165+
handler = std.Handler("/scheduler/:schedulerName/rooms/allocate", mdlw, mux)
164166
default:
165167
handler = std.Handler("", mdlw, mux)
166168
}
@@ -190,6 +192,8 @@ func buildMuxWithTracing(hndl http.Handler) http.Handler {
190192
handler = otelhttp.NewHandler(hndl, "/scheduler/:schedulerName/rooms/:roomID/status")
191193
case commom.MatchPath(path, fmt.Sprintf("^/scheduler/%s/rooms/%s/address", anyWordRegex, anyWordRegex)):
192194
handler = otelhttp.NewHandler(hndl, "/scheduler/:schedulerName/rooms/:roomID/address")
195+
case commom.MatchPath(path, fmt.Sprintf("^/scheduler/%s/rooms/allocate$", anyWordRegex)):
196+
handler = otelhttp.NewHandler(hndl, "/scheduler/:schedulerName/rooms/allocate")
193197
default:
194198
handler = otelhttp.NewHandler(hndl, "")
195199
}

config/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ services:
9191
roomPingTimeoutMillis: 240000
9292
roomInitializationTimeoutMillis: 120000
9393
roomDeletionTimeoutMillis: 120000
94+
roomAllocationTTLMillis: 300000
9495
roomValidationAttempts: 3
9596
operationManager:
9697
operationLeaseTTLMillis: 5000

internal/adapters/storage/redis/room/redis.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,38 @@ const (
5050
versionKey = "version"
5151
isValidationRoomKey = "is_validation_room"
5252
createdAtKey = "created_at"
53+
allocatedAtKey = "allocated_at"
5354
)
5455

56+
// allocateRoomScript atomically selects a ready room and marks it as occupied
57+
var allocateRoomScript = redis.NewScript(`
58+
local status_key = KEYS[1]
59+
local room_key_prefix = KEYS[2]
60+
local ready_status = 2 -- GameStatusReady
61+
local allocated_status = 8 -- GameStatusAllocated
62+
local current_time = ARGV[1]
63+
64+
-- Get one ready room (lowest score first, which is actually by room ID since all ready rooms have same score)
65+
local ready_rooms = redis.call('ZRANGEBYSCORE', status_key, ready_status, ready_status, 'LIMIT', 0, 1)
66+
if #ready_rooms == 0 then
67+
return "NO_ROOMS_AVAILABLE"
68+
end
69+
70+
local room_id = ready_rooms[1]
71+
72+
-- Atomically change the room status from ready to allocated using ZAddXXCh
73+
-- This only updates if the member already exists and returns 1 if score was changed
74+
local changed = redis.call('ZADD', status_key, 'XX', 'CH', allocated_status, room_id)
75+
if changed == 1 then
76+
-- Set the AllocatedAt timestamp
77+
local room_key = room_key_prefix .. room_id
78+
redis.call('HSET', room_key, 'allocated_at', current_time)
79+
return room_id
80+
else
81+
return "ALLOCATION_FAILED"
82+
end
83+
`)
84+
5585
type redisStateStorage struct {
5686
client *redis.Client
5787
}
@@ -112,6 +142,17 @@ func (r redisStateStorage) GetRoom(ctx context.Context, scheduler, roomID string
112142
}
113143

114144
room.CreatedAt = time.Unix(createdAt, 0)
145+
146+
// Parse AllocatedAt if it exists
147+
if allocatedAtStr := roomHashCmd.Val()[allocatedAtKey]; allocatedAtStr != "" {
148+
allocatedAtUnix, err := strconv.ParseInt(allocatedAtStr, 10, 64)
149+
if err != nil {
150+
return nil, errors.NewErrEncoding("error parsing room %s allocatedAtKey", roomID).WithError(err)
151+
}
152+
allocatedAt := time.Unix(allocatedAtUnix, 0)
153+
room.AllocatedAt = &allocatedAt
154+
}
155+
115156
return room, nil
116157
}
117158

@@ -440,3 +481,35 @@ func getRoomStatusUpdateChannel(scheduler, roomID string) string {
440481
func getRoomOccupancyRedisKey(scheduler string) string {
441482
return fmt.Sprintf("scheduler:%s:occupancy", scheduler)
442483
}
484+
485+
func (r *redisStateStorage) AllocateRoom(ctx context.Context, schedulerName string) (string, error) {
486+
statusKey := getRoomStatusSetRedisKey(schedulerName)
487+
roomKeyPrefix := fmt.Sprintf("scheduler:%s:rooms:", schedulerName)
488+
currentTime := time.Now().Unix()
489+
490+
var result interface{}
491+
var err error
492+
metrics.RunWithMetrics(roomStorageMetricLabel, func() error {
493+
result, err = allocateRoomScript.Run(ctx, r.client, []string{statusKey, roomKeyPrefix}, currentTime).Result()
494+
return err
495+
})
496+
497+
if err != nil {
498+
return "", errors.NewErrUnexpected("error executing room allocation script for scheduler %s", schedulerName).WithError(err)
499+
}
500+
501+
roomID, ok := result.(string)
502+
if !ok || roomID == "" {
503+
return "", errors.NewErrUnexpected("unexpected result %v from room allocation script for scheduler %s", result, schedulerName)
504+
}
505+
506+
// Return errors for descriptive error strings from Lua script
507+
if roomID == "NO_ROOMS_AVAILABLE" {
508+
return "", errors.NewErrUnexpected("no ready rooms available for scheduler %s", schedulerName)
509+
}
510+
if roomID == "ALLOCATION_FAILED" {
511+
return "", errors.NewErrUnexpected("room allocation failed for scheduler %s", schedulerName)
512+
}
513+
514+
return roomID, nil
515+
}

internal/adapters/storage/redis/room/redis_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,3 +782,151 @@ func TestRedisStateStorage_GetRunningMatchesCount(t *testing.T) {
782782
require.Equal(t, 6, count)
783783
})
784784
}
785+
786+
func TestRedisStateStorage_AllocateRoom(t *testing.T) {
787+
ctx := context.Background()
788+
789+
t.Run("successfully allocates a ready room", func(t *testing.T) {
790+
client := test.GetRedisConnection(t, redisAddress)
791+
storage := NewRedisStateStorage(client)
792+
793+
room := &game_room.GameRoom{
794+
ID: "room-1",
795+
SchedulerID: "game",
796+
Version: "1.0",
797+
Status: game_room.GameStatusReady,
798+
LastPingAt: time.Unix(1, 0),
799+
Metadata: map[string]interface{}{"host": "localhost"},
800+
}
801+
802+
require.NoError(t, storage.CreateRoom(ctx, room))
803+
804+
allocatedRoomID, err := storage.AllocateRoom(ctx, "game")
805+
require.NoError(t, err)
806+
require.Equal(t, "room-1", allocatedRoomID)
807+
808+
// Verify the room status was changed to allocated
809+
updatedRoom, err := storage.GetRoom(ctx, "game", "room-1")
810+
require.NoError(t, err)
811+
require.Equal(t, game_room.GameStatusAllocated, updatedRoom.Status)
812+
813+
// Verify the AllocatedAt timestamp was set
814+
require.NotNil(t, updatedRoom.AllocatedAt, "AllocatedAt should be set")
815+
require.True(t, time.Since(*updatedRoom.AllocatedAt) < 5*time.Second, "AllocatedAt should be recent")
816+
})
817+
818+
t.Run("returns error when no ready rooms available", func(t *testing.T) {
819+
client := test.GetRedisConnection(t, redisAddress)
820+
storage := NewRedisStateStorage(client)
821+
822+
room := &game_room.GameRoom{
823+
ID: "room-1",
824+
SchedulerID: "game",
825+
Version: "1.0",
826+
Status: game_room.GameStatusOccupied,
827+
LastPingAt: time.Unix(1, 0),
828+
Metadata: map[string]interface{}{"host": "localhost"},
829+
}
830+
831+
require.NoError(t, storage.CreateRoom(ctx, room))
832+
833+
allocatedRoomID, err := storage.AllocateRoom(ctx, "game")
834+
require.Error(t, err)
835+
require.Equal(t, "", allocatedRoomID)
836+
require.Contains(t, err.Error(), "no ready rooms available")
837+
})
838+
839+
t.Run("returns error when scheduler has no rooms", func(t *testing.T) {
840+
client := test.GetRedisConnection(t, redisAddress)
841+
storage := NewRedisStateStorage(client)
842+
843+
allocatedRoomID, err := storage.AllocateRoom(ctx, "nonexistent")
844+
require.Error(t, err)
845+
require.Equal(t, "", allocatedRoomID)
846+
require.Contains(t, err.Error(), "no ready rooms available")
847+
})
848+
849+
t.Run("allocates only one room when multiple ready rooms exist", func(t *testing.T) {
850+
client := test.GetRedisConnection(t, redisAddress)
851+
storage := NewRedisStateStorage(client)
852+
853+
rooms := []*game_room.GameRoom{
854+
{
855+
ID: "room-1",
856+
SchedulerID: "game",
857+
Version: "1.0",
858+
Status: game_room.GameStatusReady,
859+
LastPingAt: time.Unix(1, 0),
860+
Metadata: map[string]interface{}{"host": "localhost"},
861+
},
862+
{
863+
ID: "room-2",
864+
SchedulerID: "game",
865+
Version: "1.0",
866+
Status: game_room.GameStatusReady,
867+
LastPingAt: time.Unix(2, 0),
868+
Metadata: map[string]interface{}{"host": "localhost"},
869+
},
870+
}
871+
872+
for _, room := range rooms {
873+
require.NoError(t, storage.CreateRoom(ctx, room))
874+
}
875+
876+
allocatedRoomID, err := storage.AllocateRoom(ctx, "game")
877+
require.NoError(t, err)
878+
require.NotEmpty(t, allocatedRoomID)
879+
require.Contains(t, []string{"room-1", "room-2"}, allocatedRoomID)
880+
881+
// Verify only the allocated room was changed to allocated
882+
readyRooms, err := storage.GetRoomIDsByStatus(ctx, "game", game_room.GameStatusReady)
883+
require.NoError(t, err)
884+
require.Len(t, readyRooms, 1)
885+
886+
allocatedRooms, err := storage.GetRoomIDsByStatus(ctx, "game", game_room.GameStatusAllocated)
887+
require.NoError(t, err)
888+
require.Len(t, allocatedRooms, 1)
889+
require.Equal(t, allocatedRoomID, allocatedRooms[0])
890+
})
891+
892+
t.Run("ignores non-ready rooms", func(t *testing.T) {
893+
client := test.GetRedisConnection(t, redisAddress)
894+
storage := NewRedisStateStorage(client)
895+
896+
rooms := []*game_room.GameRoom{
897+
{
898+
ID: "room-pending",
899+
SchedulerID: "game",
900+
Version: "1.0",
901+
Status: game_room.GameStatusPending,
902+
LastPingAt: time.Unix(1, 0),
903+
Metadata: map[string]interface{}{"host": "localhost"},
904+
},
905+
{
906+
ID: "room-occupied",
907+
SchedulerID: "game",
908+
Version: "1.0",
909+
Status: game_room.GameStatusOccupied,
910+
LastPingAt: time.Unix(2, 0),
911+
Metadata: map[string]interface{}{"host": "localhost"},
912+
},
913+
{
914+
ID: "room-terminating",
915+
SchedulerID: "game",
916+
Version: "1.0",
917+
Status: game_room.GameStatusTerminating,
918+
LastPingAt: time.Unix(3, 0),
919+
Metadata: map[string]interface{}{"host": "localhost"},
920+
},
921+
}
922+
923+
for _, room := range rooms {
924+
require.NoError(t, storage.CreateRoom(ctx, room))
925+
}
926+
927+
allocatedRoomID, err := storage.AllocateRoom(ctx, "game")
928+
require.Error(t, err)
929+
require.Equal(t, "", allocatedRoomID)
930+
require.Contains(t, err.Error(), "no ready rooms available")
931+
})
932+
}

internal/api/handlers/rooms_handler.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,33 @@ func (h *RoomsHandler) GetRoomAddress(ctx context.Context, message *api.GetRoomA
122122
}
123123
return requestadapters.FromInstanceEntityToGameRoomAddressResponse(instance), nil
124124
}
125+
126+
func (h *RoomsHandler) AllocateRoom(ctx context.Context, message *api.AllocateRoomRequest) (*api.AllocateRoomResponse, error) {
127+
handlerLogger := h.logger.With(zap.String(logs.LogFieldSchedulerName, message.SchedulerName))
128+
handlerLogger.Debug("handling room allocation request", zap.Any("message", message))
129+
130+
roomID, err := h.roomManager.AllocateRoom(ctx, message.SchedulerName)
131+
if err != nil {
132+
handlerLogger.Error("error allocating room", zap.Any("allocation_request", message), zap.Error(err))
133+
return nil, status.Error(codes.Unknown, err.Error())
134+
}
135+
136+
// Get room instance to retrieve address information
137+
instance, err := h.roomManager.GetRoomInstance(ctx, message.SchedulerName, roomID)
138+
if err != nil {
139+
handlerLogger.Error("error getting allocated room instance", zap.String(logs.LogFieldRoomID, roomID), zap.Error(err))
140+
return nil, status.Error(codes.Unknown, err.Error())
141+
}
142+
143+
// Convert instance to address response format and include in allocation response
144+
addressResponse := requestadapters.FromInstanceEntityToGameRoomAddressResponse(instance)
145+
146+
handlerLogger.Debug("room allocated successfully", zap.String(logs.LogFieldRoomID, roomID))
147+
return &api.AllocateRoomResponse{
148+
RoomId: roomID,
149+
Success: true,
150+
Message: "",
151+
Ports: addressResponse.Ports,
152+
Host: addressResponse.Host,
153+
}, nil
154+
}

0 commit comments

Comments
 (0)