Skip to content

Commit 6dcbe8c

Browse files
committed
feat(cosmos): support InstallBundle and SendChunk
1 parent 73d1391 commit 6dcbe8c

File tree

11 files changed

+576
-37
lines changed

11 files changed

+576
-37
lines changed

golang/cosmos/x/swingset/abci.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type afterCommitBlockAction struct {
3535
func BeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, keeper Keeper) error {
3636
defer telemetry.ModuleMeasureSince(types.ModuleName, time.Now(), telemetry.MetricKeyBeginBlocker)
3737

38+
keeper.PruneExpiredBundleInstalls(ctx)
39+
3840
action := beginBlockAction{
3941
ChainID: ctx.ChainID(),
4042
Params: keeper.GetParams(ctx),

golang/cosmos/x/swingset/keeper/grpc_query.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,27 @@ func (k Querier) Mailbox(c context.Context, req *types.QueryMailboxRequest) (*ty
6161
Value: value,
6262
}, nil
6363
}
64+
65+
func (k Querier) PendingInstall(c context.Context, req *types.QueryPendingInstallRequest) (*types.QueryPendingInstallResponse, error) {
66+
if req == nil {
67+
return nil, status.Error(codes.InvalidArgument, "empty request")
68+
}
69+
ctx := sdk.UnwrapSDKContext(c)
70+
71+
msg := k.GetPendingBundleInstall(ctx, req.PendingId)
72+
if msg == nil {
73+
return nil, status.Error(codes.NotFound, "pending install not found")
74+
}
75+
76+
pin := k.GetPendingInstallNode(ctx, req.PendingId)
77+
if pin == nil {
78+
return nil, status.Error(codes.NotFound, "pending install node not found")
79+
}
80+
81+
return &types.QueryPendingInstallResponse{
82+
PendingId: req.PendingId,
83+
BundleChunks: msg.BundleChunks,
84+
StartTime: pin.StartTime,
85+
StartBlock: pin.StartBlock,
86+
}, nil
87+
}

golang/cosmos/x/swingset/keeper/keeper.go

Lines changed: 161 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,11 @@ const (
4747
)
4848

4949
const (
50-
stateKey = "state"
51-
swingStoreKeyPrefix = "swingStore."
50+
stateKey = "state"
51+
pendingChunkDataKeyPrefix = "pendingChunkData."
52+
pendingBundleInstallKeyPrefix = "pendingBundleInstall."
53+
pendingNodeKeyPrefix = "pending."
54+
swingStoreKeyPrefix = "swingStore."
5255
)
5356

5457
// Keeper maintains the link to data vstorage and exposes getter/setter methods for the various parts of the state machine
@@ -485,6 +488,162 @@ func (k Keeper) SetMailbox(ctx sdk.Context, peer string, mailbox string) {
485488
k.vstorageKeeper.LegacySetStorageAndNotify(ctx, agoric.NewKVEntry(path, mailbox))
486489
}
487490

491+
func (k Keeper) GetPendingChunkData(ctx sdk.Context, pendingId uint64, chunkIndex uint64) []byte {
492+
store := ctx.KVStore(k.storeKey)
493+
pendingChunkData := prefix.NewStore(store, []byte(pendingChunkDataKeyPrefix))
494+
495+
key := append(sdk.Uint64ToBigEndian(pendingId), sdk.Uint64ToBigEndian(chunkIndex)...)
496+
if !pendingChunkData.Has(key) {
497+
return nil
498+
}
499+
return pendingChunkData.Get(key)
500+
}
501+
502+
func (k Keeper) SetPendingChunkData(ctx sdk.Context, pendingId uint64, chunkIndex uint64, data []byte) {
503+
store := ctx.KVStore(k.storeKey)
504+
pendingChunkData := prefix.NewStore(store, []byte(pendingChunkDataKeyPrefix))
505+
506+
key := append(sdk.Uint64ToBigEndian(pendingId), sdk.Uint64ToBigEndian(chunkIndex)...)
507+
if len(data) == 0 {
508+
pendingChunkData.Delete(key)
509+
return
510+
}
511+
pendingChunkData.Set(key, data)
512+
}
513+
514+
func (k Keeper) GetPendingBundleInstall(ctx sdk.Context, pendingId uint64) *types.MsgInstallBundle {
515+
store := ctx.KVStore(k.storeKey)
516+
pendingStore := prefix.NewStore(store, []byte(pendingBundleInstallKeyPrefix))
517+
key := sdk.Uint64ToBigEndian(pendingId)
518+
if !pendingStore.Has(key) {
519+
return nil
520+
}
521+
bz := pendingStore.Get(key)
522+
msg := &types.MsgInstallBundle{}
523+
k.cdc.MustUnmarshal(bz, msg)
524+
return msg
525+
}
526+
527+
func (k Keeper) AddPendingBundleInstall(ctx sdk.Context, msg *types.MsgInstallBundle) uint64 {
528+
state := k.GetState(ctx)
529+
state.LastPendingId++
530+
k.SetState(ctx, state)
531+
532+
pendingId := state.LastPendingId
533+
k.SetPendingBundleInstall(ctx, pendingId, msg)
534+
535+
// Attach the pending install node to the linked list.
536+
node := &types.PendingInstallNode{
537+
PendingId: pendingId,
538+
StartTime: ctx.BlockTime().Unix(),
539+
StartBlock: ctx.BlockHeight(),
540+
}
541+
store := ctx.KVStore(k.storeKey)
542+
key := sdk.Uint64ToBigEndian(pendingId)
543+
startStore := prefix.NewStore(store, []byte(pendingNodeKeyPrefix))
544+
bz := k.cdc.MustMarshal(node)
545+
startStore.Set(key, bz)
546+
547+
return pendingId
548+
}
549+
550+
// PruneExpiredBundleInstalls removes pending bundle installs that have passed
551+
// their deadline, as set by the keeper parameters.
552+
func (k Keeper) PruneExpiredBundleInstalls(ctx sdk.Context) {
553+
params := k.GetParams(ctx)
554+
currentSeconds := ctx.BlockTime().Unix()
555+
currentBlocks := ctx.BlockHeight()
556+
deadlineSeconds := params.InstallationDeadlineSeconds
557+
deadlineBlocks := params.InstallationDeadlineBlocks
558+
559+
store := ctx.KVStore(k.storeKey)
560+
startStore := prefix.NewStore(store, []byte(pendingNodeKeyPrefix))
561+
562+
state := k.GetState(ctx)
563+
for state.FirstPendingId != 0 {
564+
pendingId := state.FirstPendingId
565+
key := sdk.Uint64ToBigEndian(pendingId)
566+
bz := startStore.Get(key)
567+
node := &types.PendingInstallNode{}
568+
k.cdc.MustUnmarshal(bz, node)
569+
570+
if deadlineSeconds < 0 || currentSeconds-node.StartTime < deadlineSeconds {
571+
if deadlineBlocks < 0 || currentBlocks-node.StartBlock < deadlineBlocks {
572+
// Still alive. Stop the search.
573+
break
574+
}
575+
}
576+
577+
// This pending bundle install is dead. Remove it.
578+
k.SetPendingBundleInstall(ctx, pendingId, nil)
579+
580+
// Advance to the next node.
581+
state.FirstPendingId = node.NextId
582+
}
583+
584+
k.SetState(ctx, state)
585+
}
586+
587+
func (k Keeper) makeListTools(ctx sdk.Context) *types.ListTools {
588+
store := ctx.KVStore(k.storeKey)
589+
listStore := prefix.NewStore(store, []byte(pendingNodeKeyPrefix))
590+
return types.NewListTools(ctx, listStore, k.cdc)
591+
}
592+
593+
func (k Keeper) SetPendingBundleInstall(ctx sdk.Context, pendingId uint64, newMsg *types.MsgInstallBundle) {
594+
store := ctx.KVStore(k.storeKey)
595+
pendingStore := prefix.NewStore(store, []byte(pendingBundleInstallKeyPrefix))
596+
597+
key := sdk.Uint64ToBigEndian(pendingId)
598+
if newMsg != nil {
599+
bz := k.cdc.MustMarshal(newMsg)
600+
pendingStore.Set(key, bz)
601+
return
602+
}
603+
604+
var msg types.MsgInstallBundle
605+
k.cdc.MustUnmarshal(pendingStore.Get(key), &msg)
606+
if msg.BundleChunks != nil && len(msg.BundleChunks.Chunks) > 0 {
607+
// Remove the chunks.
608+
for i := range msg.BundleChunks.Chunks {
609+
k.SetPendingChunkData(ctx, pendingId, uint64(i), nil)
610+
}
611+
}
612+
pendingStore.Delete(key)
613+
614+
// Remove auxilliary data structure entries.
615+
k.RemovePendingInstallNode(ctx, pendingId)
616+
}
617+
618+
// RemovePendingInstallNode removes this pending install from the keeper's
619+
// ordered linked list structures, and deletes it from the store.
620+
func (k Keeper) RemovePendingInstallNode(ctx sdk.Context, pendingId uint64) {
621+
lt := k.makeListTools(ctx)
622+
623+
victimKey := lt.Key(pendingId)
624+
victimNode := lt.Fetch(victimKey)
625+
626+
// Remove the victim from the linked list, keeping the structure intact.
627+
lt.Unlink(victimNode, func(firstp, lastp *uint64) {
628+
state := k.GetState(ctx)
629+
if firstp != nil {
630+
state.FirstPendingId = *firstp
631+
}
632+
if lastp != nil {
633+
state.LastPendingId = *lastp
634+
}
635+
k.SetState(ctx, state)
636+
})
637+
638+
// Finally, delete the victim node's storage.
639+
lt.Delete(victimKey)
640+
}
641+
642+
func (k Keeper) GetPendingInstallNode(ctx sdk.Context, pendingId uint64) *types.PendingInstallNode {
643+
lt := k.makeListTools(ctx)
644+
return lt.Fetch(lt.Key(pendingId))
645+
}
646+
488647
func (k Keeper) GetSwingStore(ctx sdk.Context) sdk.KVStore {
489648
store := ctx.KVStore(k.storeKey)
490649
return prefix.NewStore(store, []byte(swingStoreKeyPrefix))

golang/cosmos/x/swingset/keeper/msg_server.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package keeper
22

33
import (
4+
"bytes"
45
"context"
6+
"crypto/sha256"
7+
"encoding/hex"
8+
"fmt"
59

610
"github.com/Agoric/agoric-sdk/golang/cosmos/vm"
711
"github.com/Agoric/agoric-sdk/golang/cosmos/x/swingset/types"
@@ -190,6 +194,26 @@ type installBundleAction struct {
190194

191195
func (keeper msgServer) InstallBundle(goCtx context.Context, msg *types.MsgInstallBundle) (*types.MsgInstallBundleResponse, error) {
192196
ctx := sdk.UnwrapSDKContext(goCtx)
197+
if msg.BundleChunks == nil || len(msg.BundleChunks.Chunks) == 0 {
198+
return keeper.InstallFinishedBundle(goCtx, msg)
199+
}
200+
201+
// Mark all the chunks as in-flight.
202+
bc := *msg.BundleChunks
203+
bc.Chunks = make([]*types.ChunkInfo, len(bc.Chunks))
204+
for i, chunk := range bc.Chunks {
205+
ci := *chunk
206+
ci.State = types.ChunkState_CHUNK_STATE_IN_FLIGHT
207+
bc.Chunks[i] = &ci
208+
}
209+
msg.BundleChunks = &bc
210+
211+
pendingId := keeper.AddPendingBundleInstall(ctx, msg)
212+
return &types.MsgInstallBundleResponse{PendingId: pendingId}, nil
213+
}
214+
215+
func (keeper msgServer) InstallFinishedBundle(goCtx context.Context, msg *types.MsgInstallBundle) (*types.MsgInstallBundleResponse, error) {
216+
ctx := sdk.UnwrapSDKContext(goCtx)
193217

194218
err := msg.Uncompress()
195219
if err != nil {
@@ -207,3 +231,102 @@ func (keeper msgServer) InstallBundle(goCtx context.Context, msg *types.MsgInsta
207231

208232
return &types.MsgInstallBundleResponse{}, nil
209233
}
234+
235+
func (keeper msgServer) SendChunk(goCtx context.Context, msg *types.MsgSendChunk) (*types.MsgSendChunkResponse, error) {
236+
ctx := sdk.UnwrapSDKContext(goCtx)
237+
238+
inst := keeper.GetPendingBundleInstall(ctx, msg.PendingId)
239+
if inst == nil {
240+
return nil, fmt.Errorf("no upload in progress for pending Id %d", msg.PendingId)
241+
}
242+
243+
bc := inst.BundleChunks
244+
245+
if msg.ChunkIndex < 0 || msg.ChunkIndex >= uint64(len(bc.Chunks)) {
246+
return nil, fmt.Errorf("chunk index %d out of range for pending Id %d", msg.ChunkIndex, msg.PendingId)
247+
}
248+
249+
if bc.Chunks[msg.ChunkIndex].State != types.ChunkState_CHUNK_STATE_IN_FLIGHT {
250+
return nil, fmt.Errorf("chunk %d is not in flight for pending Id %d", msg.ChunkIndex, msg.PendingId)
251+
}
252+
253+
// Verify the chunk data.
254+
ci := bc.Chunks[msg.ChunkIndex]
255+
if ci.ChunkSize != uint64(len(msg.ChunkData)) {
256+
return nil, fmt.Errorf("chunk %d size mismatch for pending Id %d", msg.ChunkIndex, msg.PendingId)
257+
}
258+
259+
sha256Hash, err := hex.DecodeString(ci.Hash)
260+
if err != nil {
261+
return nil, fmt.Errorf("chunk %d cannot decode hash %s: %s", msg.ChunkIndex, ci.Hash, err)
262+
}
263+
264+
hasher := sha256.New()
265+
sum := hasher.Sum(msg.ChunkData)
266+
if !bytes.Equal(sum, sha256Hash) {
267+
return nil, fmt.Errorf("chunk %d hash mismatch; expected %x, got %x", msg.ChunkIndex, sha256Hash, sum)
268+
}
269+
270+
// Data is valid, so store it.
271+
keeper.SetPendingChunkData(ctx, msg.PendingId, msg.ChunkIndex, msg.ChunkData)
272+
273+
// Mark the chunk as received, and store the pending installation.
274+
ci.State = types.ChunkState_CHUNK_STATE_RECEIVED
275+
keeper.SetPendingBundleInstall(ctx, msg.PendingId, inst)
276+
277+
res, err := keeper.MaybeFinalizeBundle(ctx, msg.PendingId)
278+
279+
return &types.MsgSendChunkResponse{
280+
PendingId: msg.PendingId,
281+
Chunk: ci,
282+
InstallResponse: res,
283+
}, err
284+
}
285+
286+
func (keeper msgServer) MaybeFinalizeBundle(ctx sdk.Context, pendingId uint64) (*types.MsgInstallBundleResponse, error) {
287+
msg := keeper.GetPendingBundleInstall(ctx, pendingId)
288+
if msg == nil {
289+
return nil, nil
290+
}
291+
292+
// If any chunks are not received, then bail (without error).
293+
bc := msg.BundleChunks
294+
totalChunkSize := uint64(0)
295+
for _, chunk := range bc.Chunks {
296+
if chunk.State != types.ChunkState_CHUNK_STATE_RECEIVED {
297+
return nil, nil
298+
}
299+
totalChunkSize += chunk.ChunkSize
300+
}
301+
302+
chunkData := make([]byte, 0, totalChunkSize)
303+
for i := range bc.Chunks {
304+
bz := keeper.GetPendingChunkData(ctx, pendingId, uint64(i))
305+
chunkData = append(chunkData, bz...)
306+
}
307+
308+
// Verify the hash of the concatenated chunks.
309+
hasher := sha256.New()
310+
sum := hasher.Sum(chunkData)
311+
sha256Hash, err := hex.DecodeString(bc.BundleHash)
312+
if err != nil {
313+
return nil, fmt.Errorf("cannot decode hash %s: %s", bc.BundleHash, err)
314+
}
315+
if !bytes.Equal(sum, sha256Hash) {
316+
return nil, fmt.Errorf("bundle hash mismatch; expected %x, got %x", sha256Hash, sum)
317+
}
318+
319+
// Is it compressed or not?
320+
if msg.UncompressedSize > 0 {
321+
msg.CompressedBundle = chunkData
322+
} else {
323+
msg.Bundle = string(chunkData)
324+
}
325+
326+
// Clean up the pending installation state.
327+
msg.BundleChunks = nil
328+
keeper.SetPendingBundleInstall(ctx, pendingId, nil)
329+
330+
// Install the bundle now that all the chunks are processed.
331+
return keeper.InstallFinishedBundle(ctx, msg)
332+
}

golang/cosmos/x/swingset/types/default-params.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ var (
6767

6868
DefaultBootstrapVatConfig = "@agoric/vm-config/decentral-core-config.json"
6969

70+
DefaultInstallationDeadlineBlocks int64 = -1 // unlimited
71+
DefaultInstallationDeadlineSeconds int64 = 24 * 60 * 60 // 24 hours
72+
7073
DefaultPowerFlagFees = []PowerFlagFee{
7174
NewPowerFlagFee(PowerFlagSmartWallet, sdk.NewCoins(sdk.NewInt64Coin("ubld", 10_000_000))),
7275
}

0 commit comments

Comments
 (0)