-
Notifications
You must be signed in to change notification settings - Fork 1
/
redis.js
265 lines (227 loc) · 7.37 KB
/
redis.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// todo: use import assertions once they're supported by Node.js & ESLint
// https://github.com/tc39/proposal-import-assertions
import {createRequire} from 'module'
const require = createRequire(import.meta.url)
import {ok} from 'assert'
import {randomBytes} from 'crypto'
import createDebug from 'debug'
import commonPrefix from 'common-prefix'
import {NO_RESULTS} from '../no-results.js'
const pkg = require('../package.json')
const debug = createDebug('cached-hafas-client:redis')
const VERSION = pkg['cached-hafas-client'].dataVersion + ''
ok(VERSION)
const COLLECTIONS = 'c'
const COLLECTIONS_ROWS = 'r'
const ATOMS = 'a'
const READ_MATCHING_COLLECTION = `\
local collections_prefix = ARGV[1];
local rows_prefix = ARGV[2];
local created_min = tonumber(ARGV[3]); -- UNIX epoch in seconds
local created_max = tonumber(ARGV[4]); -- UNIX epoch in seconds
local when_min = tonumber(ARGV[5]); -- UNIX epoch in milliseconds
local when_max = tonumber(ARGV[6]); -- UNIX epoch in milliseconds
local function read_collection (id)
local rows = {};
redis.log(redis.LOG_DEBUG, 'scanning for collection rows (rows_prefix: ' .. rows_prefix .. ')');
local cursor = "0";
while true do
-- todo: pass in collection rows prefix
local res = redis.call("scan", cursor, "match", rows_prefix .. id .. ":*", "COUNT", 30);
cursor = res[1];
for _, key in ipairs(res[2]) do
local __, ___, when, i = string.find(key, "[^:]+:[^:]+:[^:]+:([^:]+):([^:]+)");
when = tonumber(when);
i = tonumber(i);
if when >= when_min and when <= when_max
then
redis.log(redis.LOG_VERBOSE, 'collection row ' .. i .. ' matches');
local row = redis.call("get", key);
table.insert(rows, {i, row});
else
redis.log(redis.LOG_VERBOSE, 'collection row ' .. i .. ' doesn\\'t match (when: ' .. when .. ')');
end
end
if cursor == "0" then
redis.log(redis.LOG_VERBOSE, 'done scanning for collection rows');
break
end
end
return rows;
end
redis.log(redis.LOG_DEBUG, 'scanning for collections (collections_prefix: ' .. collections_prefix .. ')');
local cursor = "0";
while true do
-- todo: scan in reverse order to, when in doubt, get the latest collection
-- todo: COUNT 30 instead?
local res = redis.call("scan", cursor, "match", collections_prefix .. "*", "COUNT", 100);
cursor = res[1];
for i, key in ipairs(res[2]) do
local _, __, created = string.find(key, "[^:]+:[^:]+:[^:]+:[^:]+:([^:]+)");
created = tonumber(created);
if created >= created_min and created <= created_max
then
local col = redis.call("get", key);
local _, __, id, when, duration = string.find(col, "([^:]+):([^:]+):([^:]+)");
redis.log(redis.LOG_VERBOSE, 'id: ' .. id .. 'when: ' .. when .. ' duration: ' .. duration);
when = tonumber(when);
duration = tonumber(duration);
if when <= when_min and (when + duration) >= when_max
then
redis.log(redis.LOG_VERBOSE, 'collection ' .. id .. ' matches');
return read_collection(id);
else
redis.log(redis.LOG_VERBOSE, 'collection ' .. id .. ' doesn\\'t match (when: ' .. when .. ' duration: ' .. duration .. ')');
end
else
redis.log(redis.LOG_VERBOSE, 'collection ' .. id .. ' doesn\\'t match (created: ' .. created .. ')');
end
end
if cursor == "0" then
redis.log(redis.LOG_VERBOSE, 'done scanning for collections');
break
end
end
return nil;
`
const READ_MATCHING_ATOM = `\
local prefix = ARGV[1];
local created_min = tonumber(ARGV[2]); -- UNIX epoch in seconds
local created_max = tonumber(ARGV[3]); -- UNIX epoch in seconds
redis.log(redis.LOG_DEBUG, 'scanning for atoms (prefix: ' .. prefix .. ')');
local cursor = "0";
while true do
-- todo: scan in reverse order to, when in doubt, get the latest atom
local res = redis.call("scan", cursor, "match", prefix .. "*", "COUNT", 30);
cursor = res[1];
for i, key in ipairs(res[2]) do
local _, __, created = string.find(key, "[^:]+:[^:]+:[^:]+:[^:]+:([^:]+)");
created = tonumber(created);
if created >= created_min and created <= created_max
then
local atom = redis.call("get", key);
return atom;
else
redis.log(redis.LOG_VERBOSE, 'atom doesn\\'t match (created: ' .. created .. ')');
end
end
if cursor == "0" then
redis.log(redis.LOG_VERBOSE, 'done scanning for atoms');
break
end
end
`
const createRedisStore = (db) => {
// todo: stop mutating `db`
const _readMatchingCollection = 'readMatchingCollection' + VERSION
if (!db[_readMatchingCollection]) {
db.defineCommand(_readMatchingCollection, {
numberOfKeys: 0,
lua: READ_MATCHING_COLLECTION,
})
}
const _readMatchingAtom = 'readMatchingAtom' + VERSION
if (!db[_readMatchingAtom]) {
db.defineCommand(_readMatchingAtom, {
numberOfKeys: 0,
lua: READ_MATCHING_ATOM,
})
}
const init = async () => {
debug('init')
}
const readCollection = async (args) => {
debug('readCollection', args)
const {
method, inputHash,
whenMin, whenMax
} = args
const createdMin = Math.floor(args.createdMin / 1000)
const createdMax = Math.ceil(args.createdMax / 1000)
const prefix = commonPrefix([
[VERSION, COLLECTIONS, method, inputHash, createdMin].join(':'),
[VERSION, COLLECTIONS, method, inputHash, createdMax].join(':')
])
const rowsPrefix = `${VERSION}:${COLLECTIONS_ROWS}:`
const rows = await db[_readMatchingCollection](
prefix, rowsPrefix,
createdMin, createdMax,
whenMin, whenMax,
)
if (rows === null) { // no matching collection found
return NO_RESULTS
}
return rows
.sort(([idxA], [idxB]) => idxA - idxB)
.map(([_, data]) => ({data}))
}
const writeCollection = async (args) => {
debug('writeCollection', args)
const {
method, inputHash, when, duration,
cachePeriod,
rows
} = args
const created = Math.round(args.created / 1000)
const collectionId = randomBytes(10).toString('hex')
const cmds = [
[
'set',
[VERSION, COLLECTIONS, method, inputHash, created].join(':'),
[collectionId, when, duration].join(':'),
'PX', cachePeriod,
],
...rows.map((row, i) => {
// todo: fall back to plannedWhen?
const t = +new Date(row.when)
if (Number.isNaN(t)) throw new Error(`rows[${i}].when must be a number or an ISO 8601 string`)
const key = [VERSION, COLLECTIONS_ROWS, collectionId, t, i].join(':')
return ['set', key, row.data, 'PX', cachePeriod]
}),
]
await db.multi(cmds).exec()
}
// atomics
// method:inputHash:created:id
// todo: this fails with `created` timestamps of different lengths (2033)
const readAtom = async (args) => {
debug('readAtom', args)
const {
method, inputHash
} = args
const createdMin = Math.floor(args.createdMin / 1000)
const createdMax = Math.ceil(args.createdMax / 1000)
const deserialize = args.deserialize || JSON.parse
const keysPrefix = commonPrefix([
[VERSION, ATOMS, method, inputHash, createdMin].join(':'),
[VERSION, ATOMS, method, inputHash, createdMax].join(':')
])
const val = await db[_readMatchingAtom]([
keysPrefix,
createdMin, createdMax,
])
return val ? deserialize(val) : NO_RESULTS
}
const writeAtom = async (args) => {
debug('writeAtom', args)
const {
method, inputHash,
cachePeriod,
val
} = args
const created = Math.round(args.created / 1000)
const serialize = args.serialize || JSON.stringify
const key = [VERSION, ATOMS, method, inputHash, created].join(':')
await db.set(key, serialize(val), 'PX', cachePeriod)
}
return {
init,
readCollection,
writeCollection,
readAtom,
writeAtom
}
}
export {
createRedisStore,
}