Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Transits #323

Merged
merged 15 commits into from
Aug 29, 2024
3 changes: 0 additions & 3 deletions datasources/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ class DataSource {

clearRequestData(requestId) {
delete this.requests[requestId];
for (const worker of Object.values(this.worker)) {
worker.clearRequestCache(requestId);
}
}
}

Expand Down
46 changes: 25 additions & 21 deletions http/env-binding.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ if (!cluster.isPrimary) {
});
}

async function messageParentProcess(message) {
return new Promise(async (resolve, reject) => {
const messageId = uuidv4();
const responseTimeout = setTimeout(() => {
emitter.off(messageId, messageResponseHandler);
reject(new Error('Response from primary process timed out'));
}, message.timeout ?? 10000);
const messageResponseHandler = (response) => {
clearTimeout(responseTimeout);
if (response.error) {
return reject(new Error(response.error));
}
resolve(response.data);
}
emitter.once(messageId, messageResponseHandler);
process.send({ ...message, id: messageId });
});
}

async function getDataPrimary(kvName, format) {
const namespaceId = process.env.ENVIRONMENT === 'production' ? productionNamespaceId : devNameSpaceID;
const url = `https://api.cloudflare.com/client/v4/accounts/${accountId}/storage/kv/namespaces/${namespaceId}/values/${kvName}`;
Expand All @@ -49,16 +68,7 @@ async function getDataPrimary(kvName, format) {
}

async function getDataWorker(kvName, format) {
return new Promise((resolve, reject) => {
const messageId = uuidv4();
emitter.once(messageId, (message) => {
if (message.error) {
return reject(new Error(message.error));
}
resolve(message.data);
});
process.send({action: 'getKv', kvName, id: messageId});
});
return messageParentProcess({action: 'getKv', kvName});
}

const DATA_CACHE = {
Expand All @@ -75,18 +85,12 @@ const DATA_CACHE = {
},
};

const putCacheWorker = (env, body, options) => {
return new Promise(async (resolve, reject) => {
const messageId = uuidv4();
emitter.once(messageId, (message) => {
if (message.error) {
return reject(new Error(message.error));
}
resolve(message.data);
});
const key = await cacheMachine.createKey(env, options.query, options.variables, options.specialCache);
process.send({action: 'cacheResponse', key, body, ttl: options.ttl, id: messageId});
const putCacheWorker = async (env, body, options) => {
const key = await cacheMachine.createKey(env, options.query, options.variables, options.specialCache);
messageParentProcess({action: 'cacheResponse', key, body, ttl: options.ttl}).catch(error => {
console.error(`Error updating cache`, error);
});
return;
};

const RESPONSE_CACHE = {
Expand Down
128 changes: 68 additions & 60 deletions http/index.mjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createServer } from 'node:http';
import cluster from 'node:cluster';
import os from 'node:os';
import { availableParallelism } from 'node:os';

import 'dotenv/config';

Expand All @@ -9,7 +9,7 @@ import getEnv from './env-binding.mjs';
import cacheMachine from '../utils/cache-machine.mjs';

const port = process.env.PORT ?? 8788;
const workerCount = parseInt(process.env.WORKERS ?? String(os.cpus().length - 1));
const workerCount = parseInt(process.env.WORKERS ?? String(Math.max(availableParallelism() - 1, 1)));

/*process.on('uncaughtException', (error) => {
console.error('Uncaught Exception', error.stack);
Expand All @@ -26,6 +26,7 @@ if (cluster.isPrimary && workerCount > 0) {
const env = getEnv();

const getKv = async (kvName, rejectOnError = true) => {
let refreshTime = msHalfHour;
try {
console.log(`getting ${kvName} data`);
clearTimeout(kvRefreshTimeout[kvName]);
Expand All @@ -34,7 +35,6 @@ if (cluster.isPrimary && workerCount > 0) {
const data = await kvLoading[kvName];
kvStore[kvName] = data;
delete kvLoading[kvName];
let refreshTime = msHalfHour;
if (data?.expiration && new Date(data.expiration) > new Date()) {
refreshTime = new Date(data.expiration) - new Date();
if (refreshTime < msOneMinute) {
Expand All @@ -44,86 +44,94 @@ if (cluster.isPrimary && workerCount > 0) {
if (data?.expiration === oldExpiration) {
refreshTime = msOneMinute;
}
kvRefreshTimeout[kvName] = setTimeout(() => {
getKv(kvName, false);
}, refreshTime);
return data;
} catch (error) {
delete kvLoading[kvName];
console.error('Error getting KV from cloudflare', error);
if (error.message !== 'Invalid CLOUDFLARE_TOKEN') {
let refreshTime = msOneMinute;
refreshTime = msOneMinute;
if (typeof kvStore[kvName] === 'undefined') {
refreshTime = 1000;
}
kvRefreshTimeout[kvName] = setTimeout(() => {
getKv(kvName, false);
}, refreshTime);
}
if (rejectOnError) {
return Promise.reject(error);
}
} finally {
kvRefreshTimeout[kvName] = setTimeout(() => {
getKv(kvName, false);
}, refreshTime);
}
};

console.log(`Starting ${workerCount} workers`);
for (let i = 0; i < workerCount; i++) {
cluster.fork();
}

for (const id in cluster.workers) {
cluster.workers[id].on('message', async (message) => {
//console.log(`message from worker ${id}:`, message);
if (message.action === 'getKv') {
const response = {
action: 'kvData',
kvName: message.kvName,
id: message.id,
};
try {
if (typeof kvStore[message.kvName] !== 'undefined') {
response.data = JSON.stringify(kvStore[message.kvName]);
} else if (kvLoading[message.kvName]) {
response.data = JSON.stringify(await kvLoading[message.kvName]);
} else {
response.data = JSON.stringify(await getKv(message.kvName));
}
} catch (error) {
response.error = error.message;
cluster.on('message', async (worker, message) => {
//console.log(`message from worker ${id}:`, message);
let response = false;
if (message.action === 'getKv') {
response = {
action: 'kvData',
kvName: message.kvName,
id: message.id,
};
try {
if (typeof kvStore[message.kvName] !== 'undefined') {
response.data = JSON.stringify(kvStore[message.kvName]);
} else if (kvLoading[message.kvName]) {
response.data = JSON.stringify(await kvLoading[message.kvName]);
} else {
response.data = JSON.stringify(await getKv(message.kvName));
}
cluster.workers[id].send(response);
} catch (error) {
response.error = error.message;
}
if (message.action === 'cacheResponse') {
const response = {
id: message.id,
data: false,
};
}
if (message.action === 'cacheResponse') {
response = {
id: message.id,
data: false,
};
try {
if (cachePending[message.key]) {
response.data = await cachePending[message.key];
} else {
let cachePutCooldown = message.ttl ? message.ttl * 1000 : msFiveMinutes;
cachePending[message.key] = cacheMachine.put(process.env, message.body, {key: message.key, ttl: message.ttl}).catch(error => {
cachePutCooldown = 10000;
return Promise.reject(error);
}).finally(() => {
setTimeout(() => {
delete cachePending[message.key];
}, cachePutCooldown);
});
response.data = await cachePending[message.key];
}
} catch (error) {
response.error = error.message;
}

}
if (response) {
if (worker.isConnected() && !worker.isDead()) {
try {
if (cachePending[message.key]) {
response.data = await cachePending[message.key];
} else {
let cachePutCooldown = message.ttl ? message.ttl * 1000 : msFiveMinutes;
cachePending[message.key] = cacheMachine.put(process.env, message.body, {key: message.key, ttl: message.ttl}).catch(error => {
cachePutCooldown = 10000;
return Promise.reject(error);
}).finally(() => {
setTimeout(() => {
delete cachePending[message.key];
}, cachePutCooldown);
});
response.data = await cachePending[message.key];
}
worker.send(response);
} catch (error) {
response.error = error.message;
console.error(`Error sending worker ${message.action} message response`, error);
}
cluster.workers[id].send(response);
}
});
}
}
});

cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
cluster.on('exit', function (worker, code, signal) {
if (!signal) {
console.log('worker ' + worker.process.pid + ' died');
cluster.fork();
}
});

console.log(`Starting ${workerCount} workers`);
for (let i = 0; i < workerCount; i++) {
cluster.fork();
}
} else {
// Workers can share any TCP connection
const yoga = await getYoga(getEnv());
Expand Down
13 changes: 7 additions & 6 deletions index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import schema from './schema.mjs';
import graphqlUtil from './utils/graphql-util.mjs';
import graphQLOptions from './utils/graphql-options.mjs';
import cacheMachine from './utils/cache-machine.mjs';
import fetchWithTimeout from './utils/fetch-with-timeout.mjs';

import { getNightbotResponse } from './plugins/plugin-nightbot.mjs';
import { getTwitchResponse } from './plugins/plugin-twitch.mjs';
Expand All @@ -32,8 +33,7 @@ let dataAPI;

async function graphqlHandler(request, env, ctx) {
const url = new URL(request.url);
let query = false;
let variables = false;
let query, variables;

if (request.method === 'POST') {
try {
Expand Down Expand Up @@ -92,7 +92,7 @@ async function graphqlHandler(request, env, ctx) {
let key;
// Check the cache service for data first - If cached data exists, return it
// we don't check the cache if we're the http server because the worker already did
if (env.SKIP_CACHE !== 'true' && !env.CLOUDFLARE_TOKEN) {
if (env.SKIP_CACHE !== 'true' && env.SKIP_CACHE_CHECK !== 'true' && !env.CLOUDFLARE_TOKEN) {
key = await cacheMachine.createKey(env, query, variables, specialCache);
const cachedResponse = await cacheMachine.get(env, {key});
if (cachedResponse) {
Expand All @@ -112,7 +112,7 @@ async function graphqlHandler(request, env, ctx) {
if (env.USE_ORIGIN === 'true') {
try {
const serverUrl = `https://api.tarkov.dev${graphQLOptions.baseEndpoint}`;
const queryResult = await fetch(serverUrl, {
const queryResult = await fetchWithTimeout(serverUrl, {
method: request.method,
body: JSON.stringify({
query,
Expand All @@ -122,6 +122,7 @@ async function graphqlHandler(request, env, ctx) {
'Content-Type': 'application/json',
'cache-check-complete': 'true',
},
timeout: 20000
});
if (queryResult.status !== 200) {
throw new Error(`${queryResult.status} ${await queryResult.text()}`);
Expand Down Expand Up @@ -174,8 +175,8 @@ async function graphqlHandler(request, env, ctx) {
const response = new Response(body, responseOptions);

if (env.SKIP_CACHE !== 'true' && ttl > 0) {
// using waitUntil doesn't hold up returning a response but keeps the worker alive as long as needed
ctx.waitUntil(cacheMachine.put(env, body, {key, query, variables, ttl, specialCache}));
key = key ?? await cacheMachine.createKey(env, query, variables, specialCache);
ctx.waitUntil(cacheMachine.put(env, body, {key}));
}

return response;
Expand Down
Loading
Loading