Skip to content

Commit 8126930

Browse files
committed
routing: support multiple namespaced MissionControls
1 parent 5eff056 commit 8126930

6 files changed

+250
-53
lines changed

routing/integrated_routing_context_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,10 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32,
168168
mcController, err := NewMissionController(db, c.source.pubkey, &c.mcCfg)
169169
require.NoError(c.t, err)
170170

171-
mc := mcController.GetDefaultStore()
171+
mc, err := mcController.GetNamespacedStore(
172+
DefaultMissionControlNamespace,
173+
)
174+
require.NoError(c.t, err)
172175

173176
getBandwidthHints := func(_ Graph) (bandwidthHints, error) {
174177
// Create bandwidth hints based on local channel balances.

routing/missioncontrol.go

+135-32
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,12 @@ type MissionControl struct {
135135
}
136136

137137
// MissionController manages MissionControl instances in various namespaces.
138-
//
139-
// NOTE: currently it only has a MissionControl in the default namespace.
140138
type MissionController struct {
141-
cfg *mcConfig
139+
db kvdb.Backend
140+
cfg *mcConfig
141+
defaultMCCfg *MissionControlConfig
142142

143-
mc *MissionControl
143+
mc map[string]*MissionControl
144144
mu sync.Mutex
145145

146146
// TODO(roasbeef): further counters, if vertex continually unavailable,
@@ -149,12 +149,33 @@ type MissionController struct {
149149
// TODO(roasbeef): also add favorable metrics for nodes
150150
}
151151

152-
// GetDefaultStore returns the MissionControl in the default namespace.
153-
func (m *MissionController) GetDefaultStore() *MissionControl {
152+
// GetNamespacedStore returns the MissionControl in the given namespace. If one
153+
// does not yet exist, then it is initialised.
154+
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
155+
error) {
156+
157+
m.mu.Lock()
158+
defer m.mu.Unlock()
159+
160+
if mc, ok := m.mc[ns]; ok {
161+
return mc, nil
162+
}
163+
164+
return m.initMissionControl(ns)
165+
}
166+
167+
// ListNamespaces returns a list of the namespaces that the MissionController
168+
// is aware of.
169+
func (m *MissionController) ListNamespaces() []string {
154170
m.mu.Lock()
155171
defer m.mu.Unlock()
156172

157-
return m.mc
173+
namespaces := make([]string, 0, len(m.mc))
174+
for ns := range m.mc {
175+
namespaces = append(namespaces, ns)
176+
}
177+
178+
return namespaces
158179
}
159180

160181
// MissionControlConfig defines parameters that control mission control
@@ -259,61 +280,143 @@ func NewMissionController(db kvdb.Backend, self route.Vertex,
259280
return nil, err
260281
}
261282

283+
mcCfg := &mcConfig{
284+
clock: clock.NewDefaultClock(),
285+
selfNode: self,
286+
}
287+
288+
mgr := &MissionController{
289+
db: db,
290+
defaultMCCfg: cfg,
291+
cfg: mcCfg,
292+
mc: make(map[string]*MissionControl),
293+
}
294+
295+
if err := mgr.loadMissionControls(); err != nil {
296+
return nil, err
297+
}
298+
299+
for _, mc := range mgr.mc {
300+
if err := mc.init(); err != nil {
301+
return nil, err
302+
}
303+
}
304+
305+
return mgr, nil
306+
}
307+
308+
// loadMissionControls initialises a MissionControl in the default namespace if
309+
// one does not yet exist. It then initialises a MissionControl for all other
310+
// namespaces found in the DB.
311+
//
312+
// NOTE: this should only be called once during MissionController construction.
313+
func (m *MissionController) loadMissionControls() error {
314+
m.mu.Lock()
315+
defer m.mu.Unlock()
316+
317+
// Always initialise the default namespace.
318+
_, err := m.initMissionControl(DefaultMissionControlNamespace)
319+
if err != nil {
320+
return err
321+
}
322+
323+
namespaces := make(map[string]struct{})
324+
err = m.db.View(func(tx walletdb.ReadTx) error {
325+
mcStoreBkt := tx.ReadBucket(resultsKey)
326+
if mcStoreBkt == nil {
327+
return fmt.Errorf("top level mission control bucket " +
328+
"not found")
329+
}
330+
331+
// Iterate through all the keys in the bucket and collect the
332+
// namespaces.
333+
return mcStoreBkt.ForEach(func(k, _ []byte) error {
334+
// We've already initialised the default namespace so
335+
// we can skip it.
336+
if string(k) == DefaultMissionControlNamespace {
337+
return nil
338+
}
339+
340+
namespaces[string(k)] = struct{}{}
341+
342+
return nil
343+
})
344+
}, func() {})
345+
if err != nil {
346+
return err
347+
}
348+
349+
// Now, iterate through all the namespaces and initialise them.
350+
for ns := range namespaces {
351+
_, err = m.initMissionControl(ns)
352+
if err != nil {
353+
return err
354+
}
355+
}
356+
357+
return nil
358+
}
359+
360+
// initMissionControl creates a new MissionControl instance with the given
361+
// namespace if one does not yet exist.
362+
//
363+
// NOTE: the MissionController's mutex must be held before calling this method.
364+
func (m *MissionController) initMissionControl(namespace string) (
365+
*MissionControl, error) {
366+
367+
// If a mission control with this namespace has already been initialised
368+
// then there is nothing left to do.
369+
if mc, ok := m.mc[namespace]; ok {
370+
return mc, nil
371+
}
372+
373+
cfg := m.defaultMCCfg
374+
262375
store, err := newMissionControlStore(
263-
newDefaultNamespacedStore(db), cfg.MaxMcHistory,
376+
newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
264377
cfg.McFlushInterval,
265378
)
266379
if err != nil {
267380
return nil, err
268381
}
269382

270-
mcCfg := &mcConfig{
271-
clock: clock.NewDefaultClock(),
272-
selfNode: self,
273-
}
274-
275-
// Create a mission control in the default namespace.
276-
defaultMC := &MissionControl{
277-
cfg: mcCfg,
383+
mc := &MissionControl{
384+
cfg: m.cfg,
278385
state: newMissionControlState(cfg.MinFailureRelaxInterval),
279386
store: store,
280387
estimator: cfg.Estimator,
281388
log: build.NewPrefixLog(
282-
fmt.Sprintf("[%s]:", DefaultMissionControlNamespace),
283-
log,
389+
fmt.Sprintf("[%s]:", namespace), log,
284390
),
285391
onConfigUpdate: cfg.OnConfigUpdate,
286392
}
287393

288-
mc := &MissionController{
289-
cfg: mcCfg,
290-
mc: defaultMC,
291-
}
292-
293-
if err := mc.mc.init(); err != nil {
294-
return nil, err
295-
}
394+
m.mc[namespace] = mc
296395

297396
return mc, nil
298397
}
299398

300-
// RunStoreTicker runs the mission control store's ticker.
301-
func (m *MissionController) RunStoreTicker() {
399+
// RunStoreTickers runs the mission controller store's tickers.
400+
func (m *MissionController) RunStoreTickers() {
302401
m.mu.Lock()
303402
defer m.mu.Unlock()
304403

305-
m.mc.store.run()
404+
for _, mc := range m.mc {
405+
mc.store.run()
406+
}
306407
}
307408

308-
// StopStoreTicker stops the mission control store's ticker.
309-
func (m *MissionController) StopStoreTicker() {
409+
// StopStoreTickers stops the mission control store's tickers.
410+
func (m *MissionController) StopStoreTickers() {
310411
log.Debug("Stopping mission control store ticker")
311412
defer log.Debug("Mission control store ticker stopped")
312413

313414
m.mu.Lock()
314415
defer m.mu.Unlock()
315416

316-
m.mc.store.stop()
417+
for _, mc := range m.mc {
418+
mc.store.stop()
419+
}
317420
}
318421

319422
// init initializes mission control with historical data.

routing/missioncontrol_test.go

+86-8
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ var (
4141
)
4242

4343
type mcTestContext struct {
44-
t *testing.T
45-
mc *MissionControl
44+
t *testing.T
4645

47-
clock *testClock
46+
mcController *MissionController
47+
mc *MissionControl
48+
clock *testClock
4849

4950
db kvdb.Backend
5051
dbPath string
@@ -88,8 +89,10 @@ func createMcTestContext(t *testing.T) *mcTestContext {
8889
func (ctx *mcTestContext) restartMc() {
8990
// Since we don't run a timer to store results in unit tests, we store
9091
// them here before fetching back everything in NewMissionController.
91-
if ctx.mc != nil {
92-
require.NoError(ctx.t, ctx.mc.store.storeResults())
92+
if ctx.mcController != nil {
93+
for _, mc := range ctx.mcController.mc {
94+
require.NoError(ctx.t, mc.store.storeResults())
95+
}
9396
}
9497

9598
aCfg := AprioriConfig{
@@ -101,16 +104,26 @@ func (ctx *mcTestContext) restartMc() {
101104
estimator, err := NewAprioriEstimator(aCfg)
102105
require.NoError(ctx.t, err)
103106

104-
mc, err := NewMissionController(
107+
ctx.mcController, err = NewMissionController(
105108
ctx.db, mcTestSelf,
106109
&MissionControlConfig{Estimator: estimator},
107110
)
108111
if err != nil {
109112
ctx.t.Fatal(err)
110113
}
111114

112-
mc.cfg.clock = ctx.clock
113-
ctx.mc = mc.GetDefaultStore()
115+
ctx.mcController.cfg.clock = ctx.clock
116+
117+
// By default, select the default namespace.
118+
ctx.setNamespacedMC(DefaultMissionControlNamespace)
119+
}
120+
121+
// setNamespacedMC sets the currently selected MissionControl instance of the
122+
// mcTextContext to the one with the given namespace.
123+
func (ctx *mcTestContext) setNamespacedMC(namespace string) {
124+
var err error
125+
ctx.mc, err = ctx.mcController.GetNamespacedStore(namespace)
126+
require.NoError(ctx.t, err)
114127
}
115128

116129
// Assert that mission control returns a probability for an edge.
@@ -233,6 +246,71 @@ func TestMissionControlChannelUpdate(t *testing.T) {
233246
ctx.expectP(100, 0)
234247
}
235248

249+
// TestMissionControlNamespaces tests that the results reported to a
250+
// MissionControl instance in one namespace does not affect the query results in
251+
// another namespace.
252+
func TestMissionControlNamespaces(t *testing.T) {
253+
// Create a new MC context. This will select the default namespace
254+
// MissionControl instance.
255+
ctx := createMcTestContext(t)
256+
257+
// Initially, the controller should only be aware of the default
258+
// namespace.
259+
require.ElementsMatch(t, ctx.mcController.ListNamespaces(), []string{
260+
DefaultMissionControlNamespace,
261+
})
262+
263+
// Initial probability is expected to be the apriori.
264+
ctx.expectP(1000, testAprioriHopProbability)
265+
266+
// Expect probability to be zero after reporting the edge as failed.
267+
ctx.reportFailure(1000, lnwire.NewTemporaryChannelFailure(nil))
268+
ctx.expectP(1000, 0)
269+
270+
// Now, switch namespaces.
271+
const newNs = "new-namespace"
272+
ctx.setNamespacedMC(newNs)
273+
274+
// Now, the controller should only be aware of the default namespace and
275+
// the new one.
276+
require.ElementsMatch(t, ctx.mcController.ListNamespaces(), []string{
277+
DefaultMissionControlNamespace,
278+
newNs,
279+
})
280+
281+
// Since this new namespace has no idea about the reported failure, the
282+
// expected probability should once again be the apriori probability.
283+
ctx.expectP(1000, testAprioriHopProbability)
284+
285+
// Report a success in the new namespace.
286+
ctx.reportSuccess()
287+
288+
// The probability of the pair should now have increased.
289+
ctx.expectP(1000, testAprioriHopProbability+0.05)
290+
291+
// Switch back to the default namespace.
292+
ctx.setNamespacedMC(DefaultMissionControlNamespace)
293+
294+
// The probability in the default namespace should still be zero.
295+
ctx.expectP(1000, 0)
296+
297+
// We also want to test that the initial loading of the namespaces is
298+
// done correctly. So let's reload the controller and assert that the
299+
// probabilities in both namespaces remain the same after restart.
300+
ctx.restartMc()
301+
302+
// Assert that both namespaces were loaded.
303+
require.ElementsMatch(t, ctx.mcController.ListNamespaces(), []string{
304+
DefaultMissionControlNamespace,
305+
newNs,
306+
})
307+
308+
// Assert that the probabilities in both namespaces remain unchanged.
309+
ctx.expectP(1000, 0)
310+
ctx.setNamespacedMC(newNs)
311+
ctx.expectP(1000, testAprioriHopProbability+0.05)
312+
}
313+
236314
// testClock is an implementation of clock.Clock that lets the caller overwrite
237315
// the current time at any point.
238316
type testClock struct {

routing/router_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,11 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
132132
graphInstance.graphBackend, route.Vertex{}, mcConfig,
133133
)
134134
require.NoError(t, err, "failed to create missioncontrol")
135-
mc := mcController.GetDefaultStore()
135+
136+
mc, err := mcController.GetNamespacedStore(
137+
DefaultMissionControlNamespace,
138+
)
139+
require.NoError(t, err)
136140

137141
sourceNode, err := graphInstance.graph.SourceNode()
138142
require.NoError(t, err)

rpcserver.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
729729
return info.NodeKey1Bytes, info.NodeKey2Bytes, nil
730730
},
731731
FindRoute: s.chanRouter.FindRoute,
732-
MissionControl: s.missionControl.GetDefaultStore(),
732+
MissionControl: s.defaultMC,
733733
ActiveNetParams: r.cfg.ActiveNetParams.Params,
734734
Tower: s.controlTower,
735735
MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry,
@@ -6071,8 +6071,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context,
60716071

60726072
return r.server.chanRouter.FindBlindedPaths(
60736073
r.selfNode, amt,
6074-
r.server.missionControl.GetDefaultStore().
6075-
GetProbability,
6074+
r.server.defaultMC.GetProbability,
60766075
blindingRestrictions,
60776076
)
60786077
},

0 commit comments

Comments
 (0)