Skip to content

Commit

Permalink
fix: add schedLocation cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ppedziwiatr committed Jul 19, 2024
1 parent a139b62 commit 1424b4c
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 124 deletions.
6 changes: 5 additions & 1 deletion servers/mu/src/domain/api/sendDataItem.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export function sendDataItemWith ({

const locateProcessLocal = fromPromise(locateProcess)

const schedLocationCache = new Map()

/**
* If the data item is a Message, then cranking and tracing
* must also be performed.
Expand Down Expand Up @@ -96,6 +98,8 @@ export function sendDataItemWith ({
if (hasTargetTag) {
return Rejected({ res })
}
const schedulerTag = res.dataItem.tags.find((tag) => tag.name === 'Scheduler')
schedLocationCache.set(res.dataItem.id, schedulerTag.value)
return Resolved()
})
.bichain(({ res }) => {
Expand Down Expand Up @@ -125,7 +129,7 @@ export function sendDataItemWith ({
schedLocation and it will get sent directly to
Arweave
*/
return locateProcessLocal(ctx.dataItem.target)
return locateProcessLocal(ctx.dataItem.target, schedLocationCache.get(ctx.dataItem.target))
.chain((schedLocation) => sendMessage({ ...ctx, schedLocation }))
}
return sendProcess(ctx)
Expand Down
2 changes: 1 addition & 1 deletion servers/mu/src/domain/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export const createLogger = (name) => {

logger.child = (name) => createLogger(`${logger.namespace}:${name}`)
logger.tap = (note, ...rest) =>
tap((...args) => logger(note, ...rest, ...args))
tap((...args) => { logger(note, ...rest) })

return logger
}
Expand Down
28 changes: 28 additions & 0 deletions servers/ur/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions servers/ur/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"http-proxy-node16": "^1.0.4",
"lru-cache": "^11.0.0",
"ramda": "^0.30.1",
"round-robin-js": "^3.0.5",
"zod": "^3.23.8"
},
"devDependencies": {
Expand Down
130 changes: 16 additions & 114 deletions servers/ur/src/domain.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,10 @@
import { defaultTo, isEmpty, complement, path } from 'ramda'
import { LRUCache } from 'lru-cache'
import { SequentialRoundRobin } from 'round-robin-js'

const isNotEmpty = complement(isEmpty)
// TODO: do sth less stupid..
const ORACLE = ['_b21c1djDesKI5LPXBZvZbXKdkTgQIx2FsN2HXtFsqQ']

export function bailoutWith ({ fetch, subrouterUrl, surUrl, owners, processToHost, ownerToHost }) {
const processToOwnerCache = new LRUCache({
/**
* 10MB
*/
maxSize: 10_000_000,
/**
* A number is 8 bytes
*/
sizeCalculation: () => 8
})

async function findProcessOwner (processId) {
const owner = processToOwnerCache.get(processId)
if (owner) return owner

return fetch(`${surUrl}/processes/${processId}`)
.then((res) => res.json())
.then(defaultTo({}))
.then(path(['owner', 'address']))
.then((owner) => {
if (!owner) return null

processToOwnerCache.set(processId, owner)
return owner
})
.catch((_e) => null)
}

return async (processId) => {
/**
* If a process has a specific mapping configured,
* then immediately return it's mapping
*/
if (processToHost && processToHost[processId]) return processToHost[processId]
/**
* If there are owner -> host configured, then we lookup the process
* owner and return the specific host if found
*/
if (ownerToHost && isNotEmpty(ownerToHost)) {
const owner = await findProcessOwner(processId)
if (ownerToHost[owner]) return ownerToHost[owner]
}

/**
* @deprecated - this functionality is subsumed by ownerToHost
* and will eventually be removed
*
* All three of these must be set for the
* subrouter logic to work so if any are
* not set just return.
*/
if (!subrouterUrl || !surUrl || !owners) return
const owner = await findProcessOwner(processId)
if (owners.includes(owner)) return subrouterUrl
}
export const bailoutWith = () => {
throw new Error('Not implemented.')
}

/**
Expand All @@ -70,66 +16,22 @@ export function bailoutWith ({ fetch, subrouterUrl, surUrl, owners, processToHos
* If the failoverAttempt exceeds the length of valid hosts list, then every host has
* been attempted, and so return undefined, to be handled upstream
*/
export function determineHostWith ({ hosts = [], bailout }) {
/**
* TODO: should we inject this cache?
*/
const processToHostCache = new LRUCache({
/**
* 10MB
*/
maxSize: 10_000_000,
/**
* A number is 8 bytes
*/
sizeCalculation: () => 8
})
export function determineHostWith ({ hosts = [] }) {
const hostsRoundRobinTable = new SequentialRoundRobin(hosts)
const processToHostCache = new Map()

return async ({ processId, failoverAttempt = 0 }) => {
if (failoverAttempt >= hosts.length) return

if (bailout) {
const bail = await bailout(processId)
if (bail) return bail
if (ORACLE.includes(processId)) {
return hosts[0]
}
if (failoverAttempt >= hosts.length) return

/**
* Check cache, and hydrate if necessary
*/
let hashSum = processToHostCache.get(processId)
if (!hashSum) {
/**
* Only perform the expensive computation of hash -> idx once and cache
*/
hashSum = computeHashSumFromProcessId({ processId, length: hosts.length })
processToHostCache.set(processId, hashSum)
let cachedHost = processToHostCache.get(processId)
if (!cachedHost) {
cachedHost = hostsRoundRobinTable.next().value
processToHostCache.set(processId, cachedHost)
}

return hosts[(hashSum + failoverAttempt) % hosts.length]
}
}

export function computeHashSumFromProcessId ({ processId, length }) {
// return processId.split('').reduce((acc, char) => acc + char.charCodeAt(0), 0)
return Number(BigInt(cyrb53(processId)) % BigInt(length))
}

/**
cyrb53 (c) 2018 bryc (github.com/bryc)
License: Public domain (or MIT if needed). Attribution appreciated.
A fast and simple 53-bit string hash function with decent collision resistance.
Largely inspired by MurmurHash2/3, but with a focus on speed/simplicity.
*/
const cyrb53 = (str, seed = 0) => {
let h1 = 0xdeadbeef ^ seed; let h2 = 0x41c6ce57 ^ seed
for (let i = 0, ch; i < str.length; i++) {
ch = str.charCodeAt(i)
h1 = Math.imul(h1 ^ ch, 2654435761)
h2 = Math.imul(h2 ^ ch, 1597334677)
return cachedHost
}
h1 = Math.imul(h1 ^ (h1 >>> 16), 2246822507)
h1 ^= Math.imul(h2 ^ (h2 >>> 13), 3266489909)
h2 = Math.imul(h2 ^ (h2 >>> 16), 2246822507)
h2 ^= Math.imul(h1 ^ (h1 >>> 13), 3266489909)
return 4294967296 * (2097151 & h2) + (h1 >>> 0)
}
6 changes: 3 additions & 3 deletions servers/ur/src/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import httpProxy from 'http-proxy-node16'
/**
* TODO: we could inject these, but just keeping simple for now
*/
import { determineHostWith, bailoutWith } from './domain.js'
import { determineHostWith } from './domain.js'
import { logger } from './logger.js'

import { mountRoutesWithByAoUnit } from './routes/byAoUnit.js'

// TODO: refactor to NOT using Ramda, as performance is crucial for us.
export function proxyWith ({ aoUnit, hosts, subrouterUrl, surUrl, owners, processToHost, ownerToHost }) {
const _logger = logger.child('proxy')
_logger('Configuring to reverse proxy ao %s units...', aoUnit)

const proxy = httpProxy.createProxyServer({})

const bailout = aoUnit === 'cu' ? bailoutWith({ fetch, subrouterUrl, surUrl, owners, processToHost, ownerToHost }) : undefined
const determineHost = determineHostWith({ hosts, bailout })
const determineHost = determineHostWith({ hosts })

async function trampoline (init) {
let result = init
Expand Down
11 changes: 6 additions & 5 deletions servers/ur/src/routes/cu.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
*/
export function mountCuRoutesWith ({ app, middleware }) {
app.get('/', middleware({ processIdFromRequest: () => 'process' }))
app.get('/result/:messageTxId', middleware({ processIdFromRequest: (req) => req.query['process-id'] }))
app.get('/results/:processId', middleware({ processIdFromRequest: (req) => req.params.processId }))
app.get('/state/:processId', middleware({ processIdFromRequest: (req) => req.params.processId }))
app.get('/cron/:processId', middleware({ processIdFromRequest: (req) => req.params.processId }))
app.post('/dry-run', middleware({ processIdFromRequest: (req) => req.query['process-id'] }))
app.post('/result/:messageTxId', middleware({ processIdFromRequest: (req) => req.query['process-id'] }))
app.get('/subscribe/:processId', middleware({ processIdFromRequest: (req) => req.params.processId }))
// app.get('/results/:processId', middleware({ processIdFromRequest: (req) => req.params.processId }))
// app.get('/state/:processId', middleware({ processIdFromRequest: (req) => req.params.processId }))
// app.get('/cron/:processId', middleware({ processIdFromRequest: (req) => req.params.processId }))
// app.post('/dry-run', middleware({ processIdFromRequest: (req) => req.query['process-id'] }))
}

0 comments on commit 1424b4c

Please sign in to comment.