Skip to content

Commit 418cd9d

Browse files
committed
feat(cosmos): support InstallBundle and SendChunk
1 parent 56f5f98 commit 418cd9d

File tree

6 files changed

+509
-17
lines changed

6 files changed

+509
-17
lines changed

golang/cosmos/x/swingset/abci.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type afterCommitBlockAction struct {
3535
func BeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, keeper Keeper) error {
3636
defer telemetry.ModuleMeasureSince(types.ModuleName, time.Now(), telemetry.MetricKeyBeginBlocker)
3737

38+
keeper.PruneExpiredBundleInstalls(ctx)
39+
3840
action := beginBlockAction{
3941
ChainID: ctx.ChainID(),
4042
Params: keeper.GetParams(ctx),

golang/cosmos/x/swingset/keeper/grpc_query.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,20 @@ func (k Querier) Mailbox(c context.Context, req *types.QueryMailboxRequest) (*ty
6161
Value: value,
6262
}, nil
6363
}
64+
65+
func (k Querier) PendingInstall(c context.Context, req *types.QueryPendingInstallRequest) (*types.QueryPendingInstallResponse, error) {
66+
if req == nil {
67+
return nil, status.Error(codes.InvalidArgument, "empty request")
68+
}
69+
ctx := sdk.UnwrapSDKContext(c)
70+
71+
msg := k.GetPendingBundleInstall(ctx, req.PendingId)
72+
if msg == nil {
73+
return nil, status.Error(codes.NotFound, "pending install not found")
74+
}
75+
76+
return &types.QueryPendingInstallResponse{
77+
PendingId: req.PendingId,
78+
BundleChunks: msg.BundleChunks,
79+
}, nil
80+
}

golang/cosmos/x/swingset/keeper/keeper.go

Lines changed: 157 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,11 @@ const (
4747
)
4848

4949
const (
50-
stateKey = "state"
51-
swingStoreKeyPrefix = "swingStore."
50+
stateKey = "state"
51+
pendingChunkDataKeyPrefix = "pendingChunkData."
52+
pendingBundleInstallKeyPrefix = "pendingBundleInstall."
53+
pendingNodeKeyPrefix = "pending."
54+
swingStoreKeyPrefix = "swingStore."
5255
)
5356

5457
// Keeper maintains the link to data vstorage and exposes getter/setter methods for the various parts of the state machine
@@ -485,6 +488,158 @@ func (k Keeper) SetMailbox(ctx sdk.Context, peer string, mailbox string) {
485488
k.vstorageKeeper.LegacySetStorageAndNotify(ctx, agoric.NewKVEntry(path, mailbox))
486489
}
487490

491+
func (k Keeper) GetPendingChunkData(ctx sdk.Context, pendingId uint64, chunkIndex uint64) []byte {
492+
store := ctx.KVStore(k.storeKey)
493+
pendingChunkData := prefix.NewStore(store, []byte(pendingChunkDataKeyPrefix))
494+
495+
key := append(sdk.Uint64ToBigEndian(pendingId), sdk.Uint64ToBigEndian(chunkIndex)...)
496+
if !pendingChunkData.Has(key) {
497+
return nil
498+
}
499+
return pendingChunkData.Get(key)
500+
}
501+
502+
func (k Keeper) SetPendingChunkData(ctx sdk.Context, pendingId uint64, chunkIndex uint64, data []byte) {
503+
store := ctx.KVStore(k.storeKey)
504+
pendingChunkData := prefix.NewStore(store, []byte(pendingChunkDataKeyPrefix))
505+
506+
key := append(sdk.Uint64ToBigEndian(pendingId), sdk.Uint64ToBigEndian(chunkIndex)...)
507+
if len(data) == 0 {
508+
pendingChunkData.Delete(key)
509+
return
510+
}
511+
pendingChunkData.Set(key, data)
512+
}
513+
514+
func (k Keeper) GetPendingBundleInstall(ctx sdk.Context, pendingId uint64) *types.MsgInstallBundle {
515+
store := ctx.KVStore(k.storeKey)
516+
pendingStore := prefix.NewStore(store, []byte(pendingBundleInstallKeyPrefix))
517+
key := sdk.Uint64ToBigEndian(pendingId)
518+
if !pendingStore.Has(key) {
519+
return nil
520+
}
521+
bz := pendingStore.Get(key)
522+
msg := &types.MsgInstallBundle{}
523+
k.cdc.MustUnmarshal(bz, msg)
524+
return msg
525+
}
526+
527+
func (k Keeper) AddPendingBundleInstall(ctx sdk.Context, msg *types.MsgInstallBundle) uint64 {
528+
state := k.GetState(ctx)
529+
state.LastPendingId++
530+
k.SetState(ctx, state)
531+
532+
pendingId := state.LastPendingId
533+
k.SetPendingBundleInstall(ctx, pendingId, msg)
534+
535+
// Attach the pending install node to the linked list.
536+
node := &types.PendingInstallNode{
537+
PendingId: pendingId,
538+
StartTime: ctx.BlockTime().Unix(),
539+
StartBlock: ctx.BlockHeight(),
540+
}
541+
store := ctx.KVStore(k.storeKey)
542+
key := sdk.Uint64ToBigEndian(pendingId)
543+
startStore := prefix.NewStore(store, []byte(pendingNodeKeyPrefix))
544+
bz := k.cdc.MustMarshal(node)
545+
startStore.Set(key, bz)
546+
547+
return pendingId
548+
}
549+
550+
// PruneExpiredBundleInstalls removes pending bundle installs that have passed
551+
// their deadline, as set by the keeper parameters.
552+
func (k Keeper) PruneExpiredBundleInstalls(ctx sdk.Context) {
553+
params := k.GetParams(ctx)
554+
currentSeconds := ctx.BlockTime().Unix()
555+
currentBlocks := ctx.BlockHeight()
556+
deadlineSeconds := params.PendingInstallationDeadlineSeconds
557+
deadlineBlocks := params.PendingInstallationDeadlineBlocks
558+
559+
store := ctx.KVStore(k.storeKey)
560+
startStore := prefix.NewStore(store, []byte(pendingNodeKeyPrefix))
561+
562+
state := k.GetState(ctx)
563+
for state.FirstPendingId != 0 {
564+
pendingId := state.FirstPendingId
565+
key := sdk.Uint64ToBigEndian(pendingId)
566+
bz := startStore.Get(key)
567+
node := &types.PendingInstallNode{}
568+
k.cdc.MustUnmarshal(bz, node)
569+
570+
if deadlineSeconds < 0 || currentSeconds-node.StartTime < deadlineSeconds {
571+
if deadlineBlocks < 0 || currentBlocks-node.StartBlock < deadlineBlocks {
572+
// Still alive. Stop the search.
573+
break
574+
}
575+
}
576+
577+
// This pending bundle install is dead. Remove it.
578+
k.SetPendingBundleInstall(ctx, pendingId, nil)
579+
580+
// Advance to the next node.
581+
state.FirstPendingId = node.NextId
582+
}
583+
584+
k.SetState(ctx, state)
585+
}
586+
587+
func (k Keeper) makeListTools(ctx sdk.Context) *types.ListTools {
588+
store := ctx.KVStore(k.storeKey)
589+
listStore := prefix.NewStore(store, []byte(pendingNodeKeyPrefix))
590+
return types.NewListTools(ctx, listStore, k.cdc)
591+
}
592+
593+
func (k Keeper) SetPendingBundleInstall(ctx sdk.Context, pendingId uint64, newMsg *types.MsgInstallBundle) {
594+
store := ctx.KVStore(k.storeKey)
595+
pendingStore := prefix.NewStore(store, []byte(pendingBundleInstallKeyPrefix))
596+
597+
key := sdk.Uint64ToBigEndian(pendingId)
598+
if newMsg != nil {
599+
bz := k.cdc.MustMarshal(newMsg)
600+
pendingStore.Set(key, bz)
601+
return
602+
}
603+
604+
var msg types.MsgInstallBundle
605+
k.cdc.MustUnmarshal(pendingStore.Get(key), &msg)
606+
if msg.BundleChunks != nil && len(msg.BundleChunks.Chunks) > 0 {
607+
// Remove the chunks.
608+
for i := range msg.BundleChunks.Chunks {
609+
k.SetPendingChunkData(ctx, pendingId, uint64(i), nil)
610+
}
611+
}
612+
pendingStore.Delete(key)
613+
614+
// Remove auxilliary data structure entries.
615+
k.RemovePendingInstallNode(ctx, pendingId)
616+
}
617+
618+
// RemovePendingInstallNode removes this pending install from the keeper's
619+
// ordered linked list structures, and deletes it from the store.
620+
func (k Keeper) RemovePendingInstallNode(ctx sdk.Context, pendingId uint64) types.PendingInstallNode {
621+
lt := k.makeListTools(ctx)
622+
623+
victimKey := lt.Key(pendingId)
624+
victimNode := lt.Fetch(victimKey)
625+
626+
// Remove the victim from the linked list, keeping the structure intact.
627+
lt.Unlink(victimNode, func(firstp, lastp *uint64) {
628+
state := k.GetState(ctx)
629+
if firstp != nil {
630+
state.FirstPendingId = *firstp
631+
}
632+
if lastp != nil {
633+
state.LastPendingId = *lastp
634+
}
635+
k.SetState(ctx, state)
636+
})
637+
638+
// Finally, delete the victim node's storage.
639+
lt.Delete(victimKey)
640+
return victimNode
641+
}
642+
488643
func (k Keeper) GetSwingStore(ctx sdk.Context) sdk.KVStore {
489644
store := ctx.KVStore(k.storeKey)
490645
return prefix.NewStore(store, []byte(swingStoreKeyPrefix))

golang/cosmos/x/swingset/keeper/msg_server.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package keeper
22

33
import (
4+
"bytes"
45
"context"
6+
"crypto/sha256"
7+
"encoding/hex"
8+
"fmt"
59

610
"github.com/Agoric/agoric-sdk/golang/cosmos/vm"
711
"github.com/Agoric/agoric-sdk/golang/cosmos/x/swingset/types"
@@ -190,6 +194,26 @@ type installBundleAction struct {
190194

191195
func (keeper msgServer) InstallBundle(goCtx context.Context, msg *types.MsgInstallBundle) (*types.MsgInstallBundleResponse, error) {
192196
ctx := sdk.UnwrapSDKContext(goCtx)
197+
if msg.BundleChunks == nil || len(msg.BundleChunks.Chunks) == 0 {
198+
return keeper.InstallFinishedBundle(goCtx, msg)
199+
}
200+
201+
// Mark all the chunks as in-flight.
202+
bc := *msg.BundleChunks
203+
bc.Chunks = make([]*types.ChunkInfo, len(bc.Chunks))
204+
for i, chunk := range bc.Chunks {
205+
ci := *chunk
206+
ci.State = types.ChunkState_IN_FLIGHT
207+
bc.Chunks[i] = &ci
208+
}
209+
msg.BundleChunks = &bc
210+
211+
pendingId := keeper.AddPendingBundleInstall(ctx, msg)
212+
return &types.MsgInstallBundleResponse{PendingId: pendingId}, nil
213+
}
214+
215+
func (keeper msgServer) InstallFinishedBundle(goCtx context.Context, msg *types.MsgInstallBundle) (*types.MsgInstallBundleResponse, error) {
216+
ctx := sdk.UnwrapSDKContext(goCtx)
193217

194218
err := msg.Uncompress()
195219
if err != nil {
@@ -207,3 +231,102 @@ func (keeper msgServer) InstallBundle(goCtx context.Context, msg *types.MsgInsta
207231

208232
return &types.MsgInstallBundleResponse{}, nil
209233
}
234+
235+
func (keeper msgServer) SendChunk(goCtx context.Context, msg *types.MsgSendChunk) (*types.MsgSendChunkResponse, error) {
236+
ctx := sdk.UnwrapSDKContext(goCtx)
237+
238+
inst := keeper.GetPendingBundleInstall(ctx, msg.PendingId)
239+
if inst == nil {
240+
return nil, fmt.Errorf("no upload in progress for pending Id %d", msg.PendingId)
241+
}
242+
243+
bc := inst.BundleChunks
244+
245+
if msg.ChunkIndex < 0 || msg.ChunkIndex >= uint64(len(bc.Chunks)) {
246+
return nil, fmt.Errorf("chunk index %d out of range for pending Id %d", msg.ChunkIndex, msg.PendingId)
247+
}
248+
249+
if bc.Chunks[msg.ChunkIndex].State != types.ChunkState_IN_FLIGHT {
250+
return nil, fmt.Errorf("chunk %d is not in flight for pending Id %d", msg.ChunkIndex, msg.PendingId)
251+
}
252+
253+
// Verify the chunk data.
254+
ci := bc.Chunks[msg.ChunkIndex]
255+
if ci.ChunkSize != uint64(len(msg.ChunkData)) {
256+
return nil, fmt.Errorf("chunk %d size mismatch for pending Id %d", msg.ChunkIndex, msg.PendingId)
257+
}
258+
259+
sha256Hash, err := hex.DecodeString(ci.Hash)
260+
if err != nil {
261+
return nil, fmt.Errorf("chunk %d cannot decode hash %s: %w", msg.ChunkIndex, ci.Hash, err)
262+
}
263+
264+
hasher := sha256.New()
265+
sum := hasher.Sum(msg.ChunkData)
266+
if !bytes.Equal(sum, sha256Hash) {
267+
return nil, fmt.Errorf("chunk %d hash mismatch; expected %x, got %x", msg.ChunkIndex, sha256Hash, sum)
268+
}
269+
270+
// Data is valid, so store it.
271+
keeper.SetPendingChunkData(ctx, msg.PendingId, msg.ChunkIndex, msg.ChunkData)
272+
273+
// Mark the chunk as received, and store the pending installation.
274+
ci.State = types.ChunkState_RECEIVED
275+
keeper.SetPendingBundleInstall(ctx, msg.PendingId, inst)
276+
277+
err = keeper.MaybeFinalizeBundle(ctx, msg.PendingId)
278+
279+
return &types.MsgSendChunkResponse{
280+
PendingId: msg.PendingId,
281+
Chunk: ci,
282+
}, err
283+
}
284+
285+
func (keeper msgServer) MaybeFinalizeBundle(ctx sdk.Context, pendingId uint64) error {
286+
msg := keeper.GetPendingBundleInstall(ctx, pendingId)
287+
if msg == nil {
288+
return nil
289+
}
290+
291+
// If any chunks are not received, then bail (without error).
292+
bc := msg.BundleChunks
293+
totalChunkSize := uint64(0)
294+
for _, chunk := range bc.Chunks {
295+
if chunk.State != types.ChunkState_RECEIVED {
296+
return nil
297+
}
298+
totalChunkSize += chunk.ChunkSize
299+
}
300+
301+
chunkData := make([]byte, 0, totalChunkSize)
302+
for i := range bc.Chunks {
303+
bz := keeper.GetPendingChunkData(ctx, pendingId, uint64(i))
304+
chunkData = append(chunkData, bz...)
305+
}
306+
307+
// Verify the hash of the concatenated chunks.
308+
hasher := sha256.New()
309+
sum := hasher.Sum(chunkData)
310+
sha256Hash, err := hex.DecodeString(bc.BundleHash)
311+
if err != nil {
312+
return fmt.Errorf("cannot decode hash %s: %w", bc.BundleHash, err)
313+
}
314+
if !bytes.Equal(sum, sha256Hash) {
315+
return fmt.Errorf("bundle hash mismatch; expected %x, got %x", sha256Hash, sum)
316+
}
317+
318+
// Is it compressed or not?
319+
if msg.UncompressedSize > 0 {
320+
msg.CompressedBundle = chunkData
321+
} else {
322+
msg.Bundle = string(chunkData)
323+
}
324+
325+
// Clean up the pending installation state.
326+
msg.BundleChunks = nil
327+
keeper.SetPendingBundleInstall(ctx, pendingId, nil)
328+
329+
// Install the bundle now that all the chunks are processed.
330+
_, err = keeper.InstallFinishedBundle(ctx, msg)
331+
return err
332+
}

0 commit comments

Comments
 (0)