From 1424b4cbbf0a9d73d182002e402492e94182fcc2 Mon Sep 17 00:00:00 2001 From: ppedziwiatr Date: Fri, 19 Jul 2024 21:29:58 +0200 Subject: [PATCH] fix: add schedLocation cache --- servers/mu/src/domain/api/sendDataItem.js | 6 +- servers/mu/src/domain/logger.js | 2 +- servers/ur/package-lock.json | 28 +++++ servers/ur/package.json | 1 + servers/ur/src/domain.js | 130 +++------------------- servers/ur/src/proxy.js | 6 +- servers/ur/src/routes/cu.js | 11 +- 7 files changed, 60 insertions(+), 124 deletions(-) diff --git a/servers/mu/src/domain/api/sendDataItem.js b/servers/mu/src/domain/api/sendDataItem.js index 514538102..36493eec0 100644 --- a/servers/mu/src/domain/api/sendDataItem.js +++ b/servers/mu/src/domain/api/sendDataItem.js @@ -34,6 +34,8 @@ export function sendDataItemWith ({ const locateProcessLocal = fromPromise(locateProcess) + const schedLocationCache = new Map() + /** * If the data item is a Message, then cranking and tracing * must also be performed. @@ -96,6 +98,8 @@ export function sendDataItemWith ({ if (hasTargetTag) { return Rejected({ res }) } + const schedulerTag = res.dataItem.tags.find((tag) => tag.name === 'Scheduler') + schedLocationCache.set(res.dataItem.id, schedulerTag.value) return Resolved() }) .bichain(({ res }) => { @@ -125,7 +129,7 @@ export function sendDataItemWith ({ schedLocation and it will get sent directly to Arweave */ - return locateProcessLocal(ctx.dataItem.target) + return locateProcessLocal(ctx.dataItem.target, schedLocationCache.get(ctx.dataItem.target)) .chain((schedLocation) => sendMessage({ ...ctx, schedLocation })) } return sendProcess(ctx) diff --git a/servers/mu/src/domain/logger.js b/servers/mu/src/domain/logger.js index 75a7d926d..e024da416 100644 --- a/servers/mu/src/domain/logger.js +++ b/servers/mu/src/domain/logger.js @@ -6,7 +6,7 @@ export const createLogger = (name) => { logger.child = (name) => createLogger(`${logger.namespace}:${name}`) logger.tap = (note, ...rest) => - tap((...args) => logger(note, ...rest, ...args)) + tap((...args) => { logger(note, ...rest) }) return logger } diff --git a/servers/ur/package-lock.json b/servers/ur/package-lock.json index 6c2a7e7c3..e726ac1fc 100644 --- a/servers/ur/package-lock.json +++ b/servers/ur/package-lock.json @@ -15,6 +15,7 @@ "http-proxy-node16": "^1.0.4", "lru-cache": "^11.0.0", "ramda": "^0.30.1", + "round-robin-js": "^3.0.5", "zod": "^3.23.8" }, "devDependencies": { @@ -26,6 +27,24 @@ "yarn": "please-use-npm" } }, + "node_modules/@datastructures-js/heap": { + "version": "4.3.3", + "resolved": "https://registry.npmjs.org/@datastructures-js/heap/-/heap-4.3.3.tgz", + "integrity": "sha512-UcUu/DLh/aM4W3C8zZfwxxm6/6FIZUlm3mcAXuNOCa6Aj4iizNvNXQyb8DjZQH2jKSQbMRyNlngP6TPimuGjpQ==" + }, + "node_modules/@datastructures-js/linked-list": { + "version": "5.2.5", + "resolved": "https://registry.npmjs.org/@datastructures-js/linked-list/-/linked-list-5.2.5.tgz", + "integrity": "sha512-YhlkDh7yMOaJ2bGgmrwvyF3vCrMELo2ERTiRxFMMkQF6+5n6expCBoseHtMIwe8fHJu1bzT7IaEHRU58hBoMUQ==" + }, + "node_modules/@datastructures-js/priority-queue": { + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/@datastructures-js/priority-queue/-/priority-queue-6.3.1.tgz", + "integrity": "sha512-eoxkWql/j0VJ0UFMFTpnyJz4KbEEVQ6aZ/JuJUgenu0Im4tYKylAycNGsYCHGXiVNEd7OKGVwfx1Ac3oYkuu7A==", + "dependencies": { + "@datastructures-js/heap": "^4.3.3" + } + }, "node_modules/@ethersproject/abstract-provider": { "version": "5.7.0", "resolved": "https://registry.npmjs.org/@ethersproject/abstract-provider/-/abstract-provider-5.7.0.tgz", @@ -2070,6 +2089,15 @@ "node": ">= 4" } }, + "node_modules/round-robin-js": { + "version": "3.0.5", + "resolved": "https://registry.npmjs.org/round-robin-js/-/round-robin-js-3.0.5.tgz", + "integrity": "sha512-3T1+azyfZ/XDrBr8tYO3x2QvODHPGUfifxxCs6tuhOW7QSGN7iDZQ4JyYHNLtgpnqiIbTMyEZpqArKL289C9iQ==", + "dependencies": { + "@datastructures-js/linked-list": "^5.2.2", + "@datastructures-js/priority-queue": "^6.1.0" + } + }, "node_modules/safe-buffer": { "version": "5.2.1", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", diff --git a/servers/ur/package.json b/servers/ur/package.json index 44b8f7226..8d5f9300a 100644 --- a/servers/ur/package.json +++ b/servers/ur/package.json @@ -17,6 +17,7 @@ "http-proxy-node16": "^1.0.4", "lru-cache": "^11.0.0", "ramda": "^0.30.1", + "round-robin-js": "^3.0.5", "zod": "^3.23.8" }, "devDependencies": { diff --git a/servers/ur/src/domain.js b/servers/ur/src/domain.js index 1f8d3f77b..cd1639082 100644 --- a/servers/ur/src/domain.js +++ b/servers/ur/src/domain.js @@ -1,64 +1,10 @@ -import { defaultTo, isEmpty, complement, path } from 'ramda' -import { LRUCache } from 'lru-cache' +import { SequentialRoundRobin } from 'round-robin-js' -const isNotEmpty = complement(isEmpty) +// TODO: do sth less stupid.. +const ORACLE = ['_b21c1djDesKI5LPXBZvZbXKdkTgQIx2FsN2HXtFsqQ'] -export function bailoutWith ({ fetch, subrouterUrl, surUrl, owners, processToHost, ownerToHost }) { - const processToOwnerCache = new LRUCache({ - /** - * 10MB - */ - maxSize: 10_000_000, - /** - * A number is 8 bytes - */ - sizeCalculation: () => 8 - }) - - async function findProcessOwner (processId) { - const owner = processToOwnerCache.get(processId) - if (owner) return owner - - return fetch(`${surUrl}/processes/${processId}`) - .then((res) => res.json()) - .then(defaultTo({})) - .then(path(['owner', 'address'])) - .then((owner) => { - if (!owner) return null - - processToOwnerCache.set(processId, owner) - return owner - }) - .catch((_e) => null) - } - - return async (processId) => { - /** - * If a process has a specific mapping configured, - * then immediately return it's mapping - */ - if (processToHost && processToHost[processId]) return processToHost[processId] - /** - * If there are owner -> host configured, then we lookup the process - * owner and return the specific host if found - */ - if (ownerToHost && isNotEmpty(ownerToHost)) { - const owner = await findProcessOwner(processId) - if (ownerToHost[owner]) return ownerToHost[owner] - } - - /** - * @deprecated - this functionality is subsumed by ownerToHost - * and will eventually be removed - * - * All three of these must be set for the - * subrouter logic to work so if any are - * not set just return. - */ - if (!subrouterUrl || !surUrl || !owners) return - const owner = await findProcessOwner(processId) - if (owners.includes(owner)) return subrouterUrl - } +export const bailoutWith = () => { + throw new Error('Not implemented.') } /** @@ -70,66 +16,22 @@ export function bailoutWith ({ fetch, subrouterUrl, surUrl, owners, processToHos * If the failoverAttempt exceeds the length of valid hosts list, then every host has * been attempted, and so return undefined, to be handled upstream */ -export function determineHostWith ({ hosts = [], bailout }) { - /** - * TODO: should we inject this cache? - */ - const processToHostCache = new LRUCache({ - /** - * 10MB - */ - maxSize: 10_000_000, - /** - * A number is 8 bytes - */ - sizeCalculation: () => 8 - }) +export function determineHostWith ({ hosts = [] }) { + const hostsRoundRobinTable = new SequentialRoundRobin(hosts) + const processToHostCache = new Map() return async ({ processId, failoverAttempt = 0 }) => { - if (failoverAttempt >= hosts.length) return - - if (bailout) { - const bail = await bailout(processId) - if (bail) return bail + if (ORACLE.includes(processId)) { + return hosts[0] } + if (failoverAttempt >= hosts.length) return - /** - * Check cache, and hydrate if necessary - */ - let hashSum = processToHostCache.get(processId) - if (!hashSum) { - /** - * Only perform the expensive computation of hash -> idx once and cache - */ - hashSum = computeHashSumFromProcessId({ processId, length: hosts.length }) - processToHostCache.set(processId, hashSum) + let cachedHost = processToHostCache.get(processId) + if (!cachedHost) { + cachedHost = hostsRoundRobinTable.next().value + processToHostCache.set(processId, cachedHost) } - return hosts[(hashSum + failoverAttempt) % hosts.length] - } -} - -export function computeHashSumFromProcessId ({ processId, length }) { - // return processId.split('').reduce((acc, char) => acc + char.charCodeAt(0), 0) - return Number(BigInt(cyrb53(processId)) % BigInt(length)) -} - -/** - cyrb53 (c) 2018 bryc (github.com/bryc) - License: Public domain (or MIT if needed). Attribution appreciated. - A fast and simple 53-bit string hash function with decent collision resistance. - Largely inspired by MurmurHash2/3, but with a focus on speed/simplicity. -*/ -const cyrb53 = (str, seed = 0) => { - let h1 = 0xdeadbeef ^ seed; let h2 = 0x41c6ce57 ^ seed - for (let i = 0, ch; i < str.length; i++) { - ch = str.charCodeAt(i) - h1 = Math.imul(h1 ^ ch, 2654435761) - h2 = Math.imul(h2 ^ ch, 1597334677) + return cachedHost } - h1 = Math.imul(h1 ^ (h1 >>> 16), 2246822507) - h1 ^= Math.imul(h2 ^ (h2 >>> 13), 3266489909) - h2 = Math.imul(h2 ^ (h2 >>> 16), 2246822507) - h2 ^= Math.imul(h1 ^ (h1 >>> 13), 3266489909) - return 4294967296 * (2097151 & h2) + (h1 >>> 0) } diff --git a/servers/ur/src/proxy.js b/servers/ur/src/proxy.js index 1afb7b000..7f25f1017 100644 --- a/servers/ur/src/proxy.js +++ b/servers/ur/src/proxy.js @@ -9,19 +9,19 @@ import httpProxy from 'http-proxy-node16' /** * TODO: we could inject these, but just keeping simple for now */ -import { determineHostWith, bailoutWith } from './domain.js' +import { determineHostWith } from './domain.js' import { logger } from './logger.js' import { mountRoutesWithByAoUnit } from './routes/byAoUnit.js' +// TODO: refactor to NOT using Ramda, as performance is crucial for us. export function proxyWith ({ aoUnit, hosts, subrouterUrl, surUrl, owners, processToHost, ownerToHost }) { const _logger = logger.child('proxy') _logger('Configuring to reverse proxy ao %s units...', aoUnit) const proxy = httpProxy.createProxyServer({}) - const bailout = aoUnit === 'cu' ? bailoutWith({ fetch, subrouterUrl, surUrl, owners, processToHost, ownerToHost }) : undefined - const determineHost = determineHostWith({ hosts, bailout }) + const determineHost = determineHostWith({ hosts }) async function trampoline (init) { let result = init diff --git a/servers/ur/src/routes/cu.js b/servers/ur/src/routes/cu.js index 59936ff94..417c9daa5 100644 --- a/servers/ur/src/routes/cu.js +++ b/servers/ur/src/routes/cu.js @@ -3,9 +3,10 @@ */ export function mountCuRoutesWith ({ app, middleware }) { app.get('/', middleware({ processIdFromRequest: () => 'process' })) - app.get('/result/:messageTxId', middleware({ processIdFromRequest: (req) => req.query['process-id'] })) - app.get('/results/:processId', middleware({ processIdFromRequest: (req) => req.params.processId })) - app.get('/state/:processId', middleware({ processIdFromRequest: (req) => req.params.processId })) - app.get('/cron/:processId', middleware({ processIdFromRequest: (req) => req.params.processId })) - app.post('/dry-run', middleware({ processIdFromRequest: (req) => req.query['process-id'] })) + app.post('/result/:messageTxId', middleware({ processIdFromRequest: (req) => req.query['process-id'] })) + app.get('/subscribe/:processId', middleware({ processIdFromRequest: (req) => req.params.processId })) + // app.get('/results/:processId', middleware({ processIdFromRequest: (req) => req.params.processId })) + // app.get('/state/:processId', middleware({ processIdFromRequest: (req) => req.params.processId })) + // app.get('/cron/:processId', middleware({ processIdFromRequest: (req) => req.params.processId })) + // app.post('/dry-run', middleware({ processIdFromRequest: (req) => req.query['process-id'] })) }