Skip to content

Commit ad9a571

Browse files
committed
Fixed replica issues.
1 parent d283306 commit ad9a571

File tree

4 files changed

+126
-17
lines changed

4 files changed

+126
-17
lines changed

bin.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const args = require('minimist')(process.argv.slice(2), {
1515
default: {
1616
port: 0,
1717
points: 100,
18-
timeout: 200,
18+
timeout: 1000,
1919
verbose: false
2020
},
2121
alias: {

lib/commands.js

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ function load (kv) {
77
const upring = kv.upring
88
const db = new Map()
99
const streams = new Map()
10-
const logger = upring.logger
1110

1211
function setupTracker (entry, reply, sendData) {
1312
if (entry.hasTracker) {
@@ -20,7 +19,7 @@ function load (kv) {
2019
entry.hasTracker = true
2120
entry.hasReplicator = false
2221

23-
logger.debug({ key }, 'configuring tracker')
22+
upring.logger.debug({ key }, 'configuring tracker')
2423

2524
const dest = upring._hashring.next(key)
2625
const tracker = upring.track(key, { replica: true })
@@ -39,7 +38,7 @@ function load (kv) {
3938
})
4039
streams.delete(key)
4140
setTimeout(function () {
42-
if (!entry.hasReplicator) {
41+
if (!entry.hasReplicator && !entry.hasTracker) {
4342
db.delete(key)
4443
}
4544
}, 30000).unref()
@@ -56,7 +55,7 @@ function load (kv) {
5655
function setupReplicator (entry, sendData) {
5756
const key = entry.key
5857
entry.hasReplicator = true
59-
logger.debug({ key }, 'configuring replicator')
58+
upring.logger.debug({ key }, 'configuring replicator')
6059
upring.replica(key, function () {
6160
entry.hasReplicator = false
6261
setupTracker(entry, noop, sendData)
@@ -91,7 +90,7 @@ function load (kv) {
9190
}
9291
}
9392

94-
logger.debug({ key, value: req.value }, 'setting data')
93+
upring.logger.debug({ key, value: req.value }, 'setting data')
9594

9695
if (needReply) {
9796
reply()
@@ -109,7 +108,7 @@ function load (kv) {
109108

110109
function sendData (peer, cb) {
111110
if (typeof cb !== 'function') {
112-
cb = bigError
111+
cb = retry
113112
}
114113

115114
const entry = db.get(key)
@@ -131,11 +130,23 @@ function load (kv) {
131130
return
132131
}
133132

134-
logger.debug({ key, value: entry.value }, 'replicated key')
133+
upring.logger.debug({ key, value: entry.value, to: peer }, 'replicated key')
135134

136135
cb()
137136
})
138137
}
138+
139+
function retry (err) {
140+
if (err) {
141+
upring.logger.error(err)
142+
const dest = upring._hashring.next(key)
143+
if (!dest) {
144+
return upring.emit('error', err)
145+
}
146+
147+
sendData(dest)
148+
}
149+
}
139150
}
140151

141152
upring.add('ns:kv,cmd:get', function (req, reply) {
@@ -148,7 +159,7 @@ function load (kv) {
148159
if (entry && entry.value || !dest) {
149160
reply(null, { key, value: entry ? entry.value : undefined })
150161
} else {
151-
logger.debug({ key }, 'checking if we are in the middle of a migration')
162+
upring.logger.debug({ key }, 'checking if we are in the middle of a migration')
152163
upring.peerConn(dest)
153164
.request(req, function (err, res) {
154165
if (err) {
@@ -159,7 +170,7 @@ function load (kv) {
159170
const entry = db.get(key)
160171

161172
if (res && !entry) {
162-
logger.debug({ key }, 'set data because of migration')
173+
upring.logger.debug({ key }, 'set data because of migration')
163174
put({
164175
ns: 'kv',
165176
cmd: 'put',
@@ -209,12 +220,6 @@ function load (kv) {
209220

210221
reply(null, { streams: { updates } })
211222
})
212-
213-
function bigError (err) {
214-
if (err) {
215-
upring.emit('error', err)
216-
}
217-
}
218223
}
219224

220225
function noop () {}

test/helper.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ function build (main) {
1212

1313
return UpRingKV({
1414
base,
15-
logLevel: 'error',
15+
logLevel: 'fatal',
1616
hashring: {
1717
joinTimeout
1818
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
'use strict'
2+
3+
const build = require('./helper').build
4+
const t = require('tap')
5+
const maxInt = Math.pow(2, 32) - 1
6+
7+
t.plan(16)
8+
9+
var a = build()
10+
t.tearDown(a.close.bind(a))
11+
12+
var c
13+
var b
14+
var key
15+
16+
a.upring.on('up', function () {
17+
t.pass('a up')
18+
19+
join(a, function (instance) {
20+
t.pass('b up')
21+
b = instance
22+
23+
key = 'hello'
24+
25+
for (var i = 0; i < maxInt && !a.upring.allocatedToMe(key); i += 1) {
26+
key = 'hello' + i
27+
}
28+
// key is now allocated to a
29+
30+
a.put(key, 'world', function (err) {
31+
t.error(err)
32+
33+
b.get(key, function (err, value) {
34+
t.error(err)
35+
t.equal(value, 'world')
36+
37+
afterDown(a, b, function () {
38+
t.pass('a closed')
39+
40+
join(b, function (instance) {
41+
t.pass('c joined')
42+
c = instance
43+
44+
c.get(key, function (err, value) {
45+
t.error(err)
46+
t.equal(value, 'world')
47+
48+
closeBAndGet()
49+
})
50+
})
51+
})
52+
})
53+
})
54+
})
55+
})
56+
57+
function afterDown (prev, next, cb) {
58+
var count = 0
59+
next.upring.once('peerDown', function () {
60+
if (++count === 2) {
61+
cb()
62+
}
63+
})
64+
prev.close(function () {
65+
if (++count === 2) {
66+
cb()
67+
}
68+
})
69+
}
70+
71+
function join (main, cb) {
72+
const instance = build(main)
73+
74+
t.tearDown(instance.close.bind(instance))
75+
76+
instance.upring.on('up', function () {
77+
cb(instance)
78+
})
79+
}
80+
81+
function closeBAndGet () {
82+
afterDown(b, c, function () {
83+
t.pass('b closed')
84+
85+
c.get(key, function (err, value) {
86+
t.error(err)
87+
t.equal(value, 'world')
88+
89+
join(c, function (d) {
90+
t.pass('d up')
91+
setTimeout(function () {
92+
afterDown(c, d, function () {
93+
t.pass('c closed')
94+
95+
d.get(key, function (err, value) {
96+
t.error(err)
97+
t.equal(value, 'world')
98+
})
99+
})
100+
}, 1000)
101+
})
102+
})
103+
})
104+
}

0 commit comments

Comments
 (0)