Skip to content

Commit 9260a8b

Browse files
refactor(swamp): use RPC client instead of service pointers (celestiaorg#2699)
Replaces celestiaorg#2356 First step of celestiaorg#2337
1 parent 28bd438 commit 9260a8b

8 files changed

+158
-87
lines changed

nodebuilder/node.go

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"strings"
88

9+
"github.com/cristalhq/jwt"
910
"github.com/ipfs/boxo/blockservice"
1011
"github.com/ipfs/boxo/exchange"
1112
logging "github.com/ipfs/go-log/v2"
@@ -48,6 +49,7 @@ type Node struct {
4849
Network p2p.Network
4950
Bootstrappers p2p.Bootstrappers
5051
Config *Config
52+
AdminSigner jwt.Signer
5153

5254
// rpc components
5355
RPCServer *rpc.Server // not optional

nodebuilder/tests/api_test.go

+27-21
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,27 @@ import (
1313
"github.com/celestiaorg/celestia-node/api/rpc/client"
1414
"github.com/celestiaorg/celestia-node/blob"
1515
"github.com/celestiaorg/celestia-node/blob/blobtest"
16+
"github.com/celestiaorg/celestia-node/libs/authtoken"
1617
"github.com/celestiaorg/celestia-node/nodebuilder"
1718
"github.com/celestiaorg/celestia-node/nodebuilder/node"
1819
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
1920
)
2021

22+
func getAdminClient(ctx context.Context, nd *nodebuilder.Node, t *testing.T) *client.Client {
23+
t.Helper()
24+
25+
signer := nd.AdminSigner
26+
listenAddr := "ws://" + nd.RPCServer.ListenAddr()
27+
28+
jwt, err := authtoken.NewSignedJWT(signer, []auth.Permission{"public", "read", "write", "admin"})
29+
require.NoError(t, err)
30+
31+
client, err := client.NewClient(ctx, listenAddr, jwt)
32+
require.NoError(t, err)
33+
34+
return client
35+
}
36+
2137
func TestNodeModule(t *testing.T) {
2238
ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
2339
t.Cleanup(cancel)
@@ -66,26 +82,20 @@ func TestGetByHeight(t *testing.T) {
6682
err := bridge.Start(ctx)
6783
require.NoError(t, err)
6884

69-
adminPerms := []auth.Permission{"public", "read", "write", "admin"}
70-
jwt, err := bridge.AdminServ.AuthNew(ctx, adminPerms)
71-
require.NoError(t, err)
72-
73-
bridgeAddr := "http://" + bridge.RPCServer.ListenAddr()
74-
client, err := client.NewClient(ctx, bridgeAddr, jwt)
75-
require.NoError(t, err)
85+
rpcClient := getAdminClient(ctx, bridge, t)
7686

7787
// let a few blocks be produced
78-
_, err = client.Header.WaitForHeight(ctx, 3)
88+
_, err = rpcClient.Header.WaitForHeight(ctx, 3)
7989
require.NoError(t, err)
8090

81-
networkHead, err := client.Header.NetworkHead(ctx)
91+
networkHead, err := rpcClient.Header.NetworkHead(ctx)
8292
require.NoError(t, err)
83-
_, err = client.Header.GetByHeight(ctx, networkHead.Height()+1)
93+
_, err = rpcClient.Header.GetByHeight(ctx, networkHead.Height()+1)
8494
require.Nil(t, err, "Requesting syncer.Head()+1 shouldn't return an error")
8595

86-
networkHead, err = client.Header.NetworkHead(ctx)
96+
networkHead, err = rpcClient.Header.NetworkHead(ctx)
8797
require.NoError(t, err)
88-
_, err = client.Header.GetByHeight(ctx, networkHead.Height()+2)
98+
_, err = rpcClient.Header.GetByHeight(ctx, networkHead.Height()+2)
8999
require.ErrorContains(t, err, "given height is from the future")
90100
}
91101

@@ -101,13 +111,7 @@ func TestBlobRPC(t *testing.T) {
101111
err := bridge.Start(ctx)
102112
require.NoError(t, err)
103113

104-
adminPerms := []auth.Permission{"public", "read", "write", "admin"}
105-
jwt, err := bridge.AdminServ.AuthNew(ctx, adminPerms)
106-
require.NoError(t, err)
107-
108-
bridgeAddr := "http://" + bridge.RPCServer.ListenAddr()
109-
client, err := client.NewClient(ctx, bridgeAddr, jwt)
110-
require.NoError(t, err)
114+
rpcClient := getAdminClient(ctx, bridge, t)
111115

112116
appBlobs, err := blobtest.GenerateV0Blobs([]int{8}, false)
113117
require.NoError(t, err)
@@ -119,7 +123,7 @@ func TestBlobRPC(t *testing.T) {
119123
)
120124
require.NoError(t, err)
121125

122-
height, err := client.Blob.Submit(ctx, []*blob.Blob{newBlob}, nil)
126+
height, err := rpcClient.Blob.Submit(ctx, []*blob.Blob{newBlob}, nil)
123127
require.NoError(t, err)
124128
require.True(t, height != 0)
125129
}
@@ -147,9 +151,11 @@ func TestHeaderSubscription(t *testing.T) {
147151
err = light.Start(ctx)
148152
require.NoError(t, err)
149153

154+
lightClient := getAdminClient(ctx, light, t)
155+
150156
// subscribe to headers via the light node's RPC header subscription
151157
subctx, subcancel := context.WithCancel(ctx)
152-
sub, err := light.HeaderServ.Subscribe(subctx)
158+
sub, err := lightClient.Header.Subscribe(subctx)
153159
require.NoError(t, err)
154160
// listen for 5 headers
155161
for i := 0; i < 5; i++ {

nodebuilder/tests/blob_test.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,15 @@ func TestBlobModule(t *testing.T) {
5555
lightNode := sw.NewNodeWithConfig(node.Light, lightCfg)
5656
require.NoError(t, lightNode.Start(ctx))
5757

58-
height, err := fullNode.BlobServ.Submit(ctx, blobs, nil)
58+
fullClient := getAdminClient(ctx, fullNode, t)
59+
lightClient := getAdminClient(ctx, lightNode, t)
60+
61+
height, err := fullClient.Blob.Submit(ctx, blobs, nil)
5962
require.NoError(t, err)
6063

61-
_, err = fullNode.HeaderServ.WaitForHeight(ctx, height)
64+
_, err = fullClient.Header.WaitForHeight(ctx, height)
6265
require.NoError(t, err)
63-
_, err = lightNode.HeaderServ.WaitForHeight(ctx, height)
66+
_, err = lightClient.Header.WaitForHeight(ctx, height)
6467
require.NoError(t, err)
6568

6669
var test = []struct {
@@ -70,15 +73,15 @@ func TestBlobModule(t *testing.T) {
7073
{
7174
name: "Get",
7275
doFn: func(t *testing.T) {
73-
blob1, err := fullNode.BlobServ.Get(ctx, height, blobs[0].Namespace(), blobs[0].Commitment)
76+
blob1, err := fullClient.Blob.Get(ctx, height, blobs[0].Namespace(), blobs[0].Commitment)
7477
require.NoError(t, err)
7578
require.Equal(t, blobs[0], blob1)
7679
},
7780
},
7881
{
7982
name: "GetAll",
8083
doFn: func(t *testing.T) {
81-
newBlobs, err := fullNode.BlobServ.GetAll(ctx, height, []share.Namespace{blobs[0].Namespace()})
84+
newBlobs, err := fullClient.Blob.GetAll(ctx, height, []share.Namespace{blobs[0].Namespace()})
8285
require.NoError(t, err)
8386
require.Len(t, newBlobs, len(appBlobs0))
8487
require.True(t, bytes.Equal(blobs[0].Commitment, newBlobs[0].Commitment))
@@ -88,10 +91,10 @@ func TestBlobModule(t *testing.T) {
8891
{
8992
name: "Included",
9093
doFn: func(t *testing.T) {
91-
proof, err := fullNode.BlobServ.GetProof(ctx, height, blobs[0].Namespace(), blobs[0].Commitment)
94+
proof, err := fullClient.Blob.GetProof(ctx, height, blobs[0].Namespace(), blobs[0].Commitment)
9295
require.NoError(t, err)
9396

94-
included, err := lightNode.BlobServ.Included(
97+
included, err := lightClient.Blob.Included(
9598
ctx,
9699
height,
97100
blobs[0].Namespace(),
@@ -114,7 +117,7 @@ func TestBlobModule(t *testing.T) {
114117
)
115118
require.NoError(t, err)
116119

117-
b, err := fullNode.BlobServ.Get(ctx, height, newBlob.Namespace(), newBlob.Commitment)
120+
b, err := fullClient.Blob.Get(ctx, height, newBlob.Namespace(), newBlob.Commitment)
118121
assert.Nil(t, b)
119122
require.Error(t, err)
120123
require.ErrorIs(t, err, blob.ErrBlobNotFound)

nodebuilder/tests/fraud_test.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,11 @@ func TestFraudProofHandling(t *testing.T) {
8989
err = full.Start(ctx)
9090
require.NoError(t, err)
9191

92+
fullClient := getAdminClient(ctx, full, t)
93+
9294
// 5.
9395
subCtx, subCancel := context.WithCancel(ctx)
94-
subscr, err := full.FraudServ.Subscribe(subCtx, byzantine.BadEncoding)
96+
subscr, err := fullClient.Fraud.Subscribe(subCtx, byzantine.BadEncoding)
9597
require.NoError(t, err)
9698
select {
9799
case p := <-subscr:
@@ -108,7 +110,7 @@ func TestFraudProofHandling(t *testing.T) {
108110
// lifecycles of each Module.
109111
// 6.
110112
syncCtx, syncCancel := context.WithTimeout(context.Background(), blockTime*5)
111-
_, err = full.HeaderServ.WaitForHeight(syncCtx, 15)
113+
_, err = fullClient.Header.WaitForHeight(syncCtx, 15)
112114
require.ErrorIs(t, err, context.DeadlineExceeded)
113115
syncCancel()
114116

@@ -118,10 +120,11 @@ func TestFraudProofHandling(t *testing.T) {
118120
lnStore := nodebuilder.MockStore(t, cfg)
119121
light := sw.NewNodeWithStore(node.Light, lnStore)
120122
require.NoError(t, light.Start(ctx))
123+
lightClient := getAdminClient(ctx, light, t)
121124

122125
// 8.
123126
subCtx, subCancel = context.WithCancel(ctx)
124-
subscr, err = light.FraudServ.Subscribe(subCtx, byzantine.BadEncoding)
127+
subscr, err = lightClient.Fraud.Subscribe(subCtx, byzantine.BadEncoding)
125128
require.NoError(t, err)
126129
select {
127130
case p := <-subscr:
@@ -135,7 +138,8 @@ func TestFraudProofHandling(t *testing.T) {
135138
// 9.
136139
fN := sw.NewNodeWithStore(node.Full, store)
137140
require.Error(t, fN.Start(ctx))
138-
proofs, err := fN.FraudServ.Get(ctx, byzantine.BadEncoding)
141+
fNClient := getAdminClient(ctx, fN, t)
142+
proofs, err := fNClient.Fraud.Get(ctx, byzantine.BadEncoding)
139143
require.NoError(t, err)
140144
require.NotNil(t, proofs)
141145

nodebuilder/tests/nd_test.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ func TestShrexNDFromLights(t *testing.T) {
4545
err = light.Start(ctx)
4646
require.NoError(t, err)
4747

48+
bridgeClient := getAdminClient(ctx, bridge, t)
49+
lightClient := getAdminClient(ctx, light, t)
50+
4851
// wait for chain to be filled
4952
require.NoError(t, <-fillDn)
5053

@@ -54,17 +57,17 @@ func TestShrexNDFromLights(t *testing.T) {
5457
// the block that actually has transactions. We can get this data from the
5558
// response returned by FillBlock.
5659
for i := 16; i < blocks; i++ {
57-
h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i))
60+
h, err := bridgeClient.Header.GetByHeight(ctx, uint64(i))
5861
require.NoError(t, err)
5962

6063
reqCtx, cancel := context.WithTimeout(ctx, time.Second*5)
6164

6265
// ensure to fetch random namespace (not the reserved namespace)
6366
namespace := h.DAH.RowRoots[1][:share.NamespaceSize]
6467

65-
expected, err := bridge.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
68+
expected, err := bridgeClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
6669
require.NoError(t, err)
67-
got, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
70+
got, err := lightClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
6871
require.NoError(t, err)
6972

7073
require.True(t, len(got[0].Shares) > 0)
@@ -113,12 +116,15 @@ func TestShrexNDFromLightsWithBadFulls(t *testing.T) {
113116
require.NoError(t, startFullNodes(ctx, fulls...))
114117
require.NoError(t, light.Start(ctx))
115118

119+
bridgeClient := getAdminClient(ctx, bridge, t)
120+
lightClient := getAdminClient(ctx, light, t)
121+
116122
// wait for chain to fill up
117123
require.NoError(t, <-fillDn)
118124

119125
// first 2 blocks are not filled with data
120126
for i := 3; i < blocks; i++ {
121-
h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i))
127+
h, err := bridgeClient.Header.GetByHeight(ctx, uint64(i))
122128
require.NoError(t, err)
123129

124130
if len(h.DAH.RowRoots) != bsize*2 {
@@ -133,16 +139,18 @@ func TestShrexNDFromLightsWithBadFulls(t *testing.T) {
133139
// ensure to fetch random namespace (not the reserved namespace)
134140
namespace := h.DAH.RowRoots[1][:share.NamespaceSize]
135141

136-
expected, err := bridge.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
142+
expected, err := bridgeClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
137143
require.NoError(t, err)
138144
require.True(t, len(expected[0].Shares) > 0)
139145

140146
// choose a random full to test
141-
gotFull, err := fulls[len(fulls)/2].ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
147+
fN := fulls[len(fulls)/2]
148+
fnClient := getAdminClient(ctx, fN, t)
149+
gotFull, err := fnClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
142150
require.NoError(t, err)
143151
require.True(t, len(gotFull[0].Shares) > 0)
144152

145-
gotLight, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
153+
gotLight, err := lightClient.Share.GetSharesByNamespace(reqCtx, h.DAH, namespace)
146154
require.NoError(t, err)
147155
require.True(t, len(gotLight[0].Shares) > 0)
148156

nodebuilder/tests/p2p_test.go

+31-8
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@ func TestBridgeNodeAsBootstrapper(t *testing.T) {
4646
require.NoError(t, nd.Start(ctx))
4747
assert.Equal(t, *addr, nd.Bootstrappers[0])
4848
// ensure that node is actually connected to BN
49-
assert.True(t, nd.Host.Network().Connectedness(addr.ID) == network.Connected)
49+
client := getAdminClient(ctx, nd, t)
50+
connectedenss, err := client.P2P.Connectedness(ctx, addr.ID)
51+
require.NoError(t, err)
52+
assert.Equal(t, connectedenss, network.Connected)
5053
}
5154
}
5255

@@ -102,15 +105,22 @@ func TestFullDiscoveryViaBootstrapper(t *testing.T) {
102105
for index := range nodes {
103106
require.NoError(t, nodes[index].Start(ctx))
104107
assert.Equal(t, *bootstrapper, nodes[index].Bootstrappers[0])
105-
assert.True(t, nodes[index].Host.Network().Connectedness(bootstrapper.ID) == network.Connected)
108+
// ensure that node is actually connected to BN
109+
client := getAdminClient(ctx, nodes[index], t)
110+
connectedness, err := client.P2P.Connectedness(ctx, bootstrapper.ID)
111+
require.NoError(t, err)
112+
assert.Equal(t, connectedness, network.Connected)
106113
}
107114

108115
for {
109116
if ctx.Err() != nil {
110117
t.Fatal(ctx.Err())
111118
}
112-
if light.Host.Network().Connectedness(host.InfoFromHost(full.Host).ID) == network.Connected {
113-
// LN discovered FN successfully and is now connected
119+
// LN discovered FN successfully and is now connected
120+
client := getAdminClient(ctx, light, t)
121+
connectedness, err := client.P2P.Connectedness(ctx, host.InfoFromHost(full.Host).ID)
122+
require.NoError(t, err)
123+
if connectedness == network.Connected {
114124
break
115125
}
116126
}
@@ -158,11 +168,19 @@ func TestRestartNodeDiscovery(t *testing.T) {
158168
for index := 0; index < numFulls; index++ {
159169
nodes[index] = sw.NewNodeWithConfig(node.Full, fullCfg, nodesConfig)
160170
require.NoError(t, nodes[index].Start(ctx))
161-
assert.True(t, nodes[index].Host.Network().Connectedness(bridgeAddr.ID) == network.Connected)
171+
client := getAdminClient(ctx, nodes[index], t)
172+
connectedness, err := client.P2P.Connectedness(ctx, bridgeAddr.ID)
173+
require.NoError(t, err)
174+
assert.Equal(t, connectedness, network.Connected)
162175
}
163176

164177
// ensure FNs are connected to each other
165-
require.True(t, nodes[0].Host.Network().Connectedness(nodes[1].Host.ID()) == network.Connected)
178+
fullClient1 := getAdminClient(ctx, nodes[0], t)
179+
fullClient2 := getAdminClient(ctx, nodes[1], t)
180+
181+
connectedness, err := fullClient1.P2P.Connectedness(ctx, nodes[1].Host.ID())
182+
require.NoError(t, err)
183+
assert.Equal(t, connectedness, network.Connected)
166184

167185
// disconnect the FNs
168186
sw.Disconnect(t, nodes[0], nodes[1])
@@ -175,8 +193,13 @@ func TestRestartNodeDiscovery(t *testing.T) {
175193

176194
// ensure that the FN with disabled discovery is discovered by both of the
177195
// running FNs that have discovery enabled
178-
require.True(t, nodes[0].Host.Network().Connectedness(disabledDiscoveryFN.Host.ID()) == network.Connected)
179-
require.True(t, nodes[1].Host.Network().Connectedness(disabledDiscoveryFN.Host.ID()) == network.Connected)
196+
connectedness, err = fullClient1.P2P.Connectedness(ctx, disabledDiscoveryFN.Host.ID())
197+
require.NoError(t, err)
198+
assert.Equal(t, connectedness, network.Connected)
199+
200+
connectedness, err = fullClient2.P2P.Connectedness(ctx, disabledDiscoveryFN.Host.ID())
201+
require.NoError(t, err)
202+
assert.Equal(t, connectedness, network.Connected)
180203
}
181204

182205
func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) {

0 commit comments

Comments
 (0)