Skip to content

Commit

Permalink
Merge pull request #323 from the-hideout/sell-item-task
Browse files Browse the repository at this point in the history
Add Transits
  • Loading branch information
Razzmatazzz authored Aug 29, 2024
2 parents 077d0f2 + 60eff3c commit 8b6a1cb
Show file tree
Hide file tree
Showing 16 changed files with 439 additions and 371 deletions.
3 changes: 0 additions & 3 deletions datasources/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ class DataSource {

clearRequestData(requestId) {
delete this.requests[requestId];
for (const worker of Object.values(this.worker)) {
worker.clearRequestCache(requestId);
}
}
}

Expand Down
46 changes: 25 additions & 21 deletions http/env-binding.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ if (!cluster.isPrimary) {
});
}

async function messageParentProcess(message) {
return new Promise(async (resolve, reject) => {
const messageId = uuidv4();
const responseTimeout = setTimeout(() => {
emitter.off(messageId, messageResponseHandler);
reject(new Error('Response from primary process timed out'));
}, message.timeout ?? 10000);
const messageResponseHandler = (response) => {
clearTimeout(responseTimeout);
if (response.error) {
return reject(new Error(response.error));
}
resolve(response.data);
}
emitter.once(messageId, messageResponseHandler);
process.send({ ...message, id: messageId });
});
}

async function getDataPrimary(kvName, format) {
const namespaceId = process.env.ENVIRONMENT === 'production' ? productionNamespaceId : devNameSpaceID;
const url = `https://api.cloudflare.com/client/v4/accounts/${accountId}/storage/kv/namespaces/${namespaceId}/values/${kvName}`;
Expand All @@ -49,16 +68,7 @@ async function getDataPrimary(kvName, format) {
}

async function getDataWorker(kvName, format) {
return new Promise((resolve, reject) => {
const messageId = uuidv4();
emitter.once(messageId, (message) => {
if (message.error) {
return reject(new Error(message.error));
}
resolve(message.data);
});
process.send({action: 'getKv', kvName, id: messageId});
});
return messageParentProcess({action: 'getKv', kvName});
}

const DATA_CACHE = {
Expand All @@ -75,18 +85,12 @@ const DATA_CACHE = {
},
};

const putCacheWorker = (env, body, options) => {
return new Promise(async (resolve, reject) => {
const messageId = uuidv4();
emitter.once(messageId, (message) => {
if (message.error) {
return reject(new Error(message.error));
}
resolve(message.data);
});
const key = await cacheMachine.createKey(env, options.query, options.variables, options.specialCache);
process.send({action: 'cacheResponse', key, body, ttl: options.ttl, id: messageId});
const putCacheWorker = async (env, body, options) => {
const key = await cacheMachine.createKey(env, options.query, options.variables, options.specialCache);
messageParentProcess({action: 'cacheResponse', key, body, ttl: options.ttl}).catch(error => {
console.error(`Error updating cache`, error);
});
return;
};

const RESPONSE_CACHE = {
Expand Down
128 changes: 68 additions & 60 deletions http/index.mjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createServer } from 'node:http';
import cluster from 'node:cluster';
import os from 'node:os';
import { availableParallelism } from 'node:os';

import 'dotenv/config';

Expand All @@ -9,7 +9,7 @@ import getEnv from './env-binding.mjs';
import cacheMachine from '../utils/cache-machine.mjs';

const port = process.env.PORT ?? 8788;
const workerCount = parseInt(process.env.WORKERS ?? String(os.cpus().length - 1));
const workerCount = parseInt(process.env.WORKERS ?? String(Math.max(availableParallelism() - 1, 1)));

/*process.on('uncaughtException', (error) => {
console.error('Uncaught Exception', error.stack);
Expand All @@ -26,6 +26,7 @@ if (cluster.isPrimary && workerCount > 0) {
const env = getEnv();

const getKv = async (kvName, rejectOnError = true) => {
let refreshTime = msHalfHour;
try {
console.log(`getting ${kvName} data`);
clearTimeout(kvRefreshTimeout[kvName]);
Expand All @@ -34,7 +35,6 @@ if (cluster.isPrimary && workerCount > 0) {
const data = await kvLoading[kvName];
kvStore[kvName] = data;
delete kvLoading[kvName];
let refreshTime = msHalfHour;
if (data?.expiration && new Date(data.expiration) > new Date()) {
refreshTime = new Date(data.expiration) - new Date();
if (refreshTime < msOneMinute) {
Expand All @@ -44,86 +44,94 @@ if (cluster.isPrimary && workerCount > 0) {
if (data?.expiration === oldExpiration) {
refreshTime = msOneMinute;
}
kvRefreshTimeout[kvName] = setTimeout(() => {
getKv(kvName, false);
}, refreshTime);
return data;
} catch (error) {
delete kvLoading[kvName];
console.error('Error getting KV from cloudflare', error);
if (error.message !== 'Invalid CLOUDFLARE_TOKEN') {
let refreshTime = msOneMinute;
refreshTime = msOneMinute;
if (typeof kvStore[kvName] === 'undefined') {
refreshTime = 1000;
}
kvRefreshTimeout[kvName] = setTimeout(() => {
getKv(kvName, false);
}, refreshTime);
}
if (rejectOnError) {
return Promise.reject(error);
}
} finally {
kvRefreshTimeout[kvName] = setTimeout(() => {
getKv(kvName, false);
}, refreshTime);
}
};

console.log(`Starting ${workerCount} workers`);
for (let i = 0; i < workerCount; i++) {
cluster.fork();
}

for (const id in cluster.workers) {
cluster.workers[id].on('message', async (message) => {
//console.log(`message from worker ${id}:`, message);
if (message.action === 'getKv') {
const response = {
action: 'kvData',
kvName: message.kvName,
id: message.id,
};
try {
if (typeof kvStore[message.kvName] !== 'undefined') {
response.data = JSON.stringify(kvStore[message.kvName]);
} else if (kvLoading[message.kvName]) {
response.data = JSON.stringify(await kvLoading[message.kvName]);
} else {
response.data = JSON.stringify(await getKv(message.kvName));
}
} catch (error) {
response.error = error.message;
cluster.on('message', async (worker, message) => {
//console.log(`message from worker ${id}:`, message);
let response = false;
if (message.action === 'getKv') {
response = {
action: 'kvData',
kvName: message.kvName,
id: message.id,
};
try {
if (typeof kvStore[message.kvName] !== 'undefined') {
response.data = JSON.stringify(kvStore[message.kvName]);
} else if (kvLoading[message.kvName]) {
response.data = JSON.stringify(await kvLoading[message.kvName]);
} else {
response.data = JSON.stringify(await getKv(message.kvName));
}
cluster.workers[id].send(response);
} catch (error) {
response.error = error.message;
}
if (message.action === 'cacheResponse') {
const response = {
id: message.id,
data: false,
};
}
if (message.action === 'cacheResponse') {
response = {
id: message.id,
data: false,
};
try {
if (cachePending[message.key]) {
response.data = await cachePending[message.key];
} else {
let cachePutCooldown = message.ttl ? message.ttl * 1000 : msFiveMinutes;
cachePending[message.key] = cacheMachine.put(process.env, message.body, {key: message.key, ttl: message.ttl}).catch(error => {
cachePutCooldown = 10000;
return Promise.reject(error);
}).finally(() => {
setTimeout(() => {
delete cachePending[message.key];
}, cachePutCooldown);
});
response.data = await cachePending[message.key];
}
} catch (error) {
response.error = error.message;
}

}
if (response) {
if (worker.isConnected() && !worker.isDead()) {
try {
if (cachePending[message.key]) {
response.data = await cachePending[message.key];
} else {
let cachePutCooldown = message.ttl ? message.ttl * 1000 : msFiveMinutes;
cachePending[message.key] = cacheMachine.put(process.env, message.body, {key: message.key, ttl: message.ttl}).catch(error => {
cachePutCooldown = 10000;
return Promise.reject(error);
}).finally(() => {
setTimeout(() => {
delete cachePending[message.key];
}, cachePutCooldown);
});
response.data = await cachePending[message.key];
}
worker.send(response);
} catch (error) {
response.error = error.message;
console.error(`Error sending worker ${message.action} message response`, error);
}
cluster.workers[id].send(response);
}
});
}
}
});

cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
cluster.on('exit', function (worker, code, signal) {
if (!signal) {
console.log('worker ' + worker.process.pid + ' died');
cluster.fork();
}
});

console.log(`Starting ${workerCount} workers`);
for (let i = 0; i < workerCount; i++) {
cluster.fork();
}
} else {
// Workers can share any TCP connection
const yoga = await getYoga(getEnv());
Expand Down
13 changes: 7 additions & 6 deletions index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import schema from './schema.mjs';
import graphqlUtil from './utils/graphql-util.mjs';
import graphQLOptions from './utils/graphql-options.mjs';
import cacheMachine from './utils/cache-machine.mjs';
import fetchWithTimeout from './utils/fetch-with-timeout.mjs';

import { getNightbotResponse } from './plugins/plugin-nightbot.mjs';
import { getTwitchResponse } from './plugins/plugin-twitch.mjs';
Expand All @@ -32,8 +33,7 @@ let dataAPI;

async function graphqlHandler(request, env, ctx) {
const url = new URL(request.url);
let query = false;
let variables = false;
let query, variables;

if (request.method === 'POST') {
try {
Expand Down Expand Up @@ -92,7 +92,7 @@ async function graphqlHandler(request, env, ctx) {
let key;
// Check the cache service for data first - If cached data exists, return it
// we don't check the cache if we're the http server because the worker already did
if (env.SKIP_CACHE !== 'true' && !env.CLOUDFLARE_TOKEN) {
if (env.SKIP_CACHE !== 'true' && env.SKIP_CACHE_CHECK !== 'true' && !env.CLOUDFLARE_TOKEN) {
key = await cacheMachine.createKey(env, query, variables, specialCache);
const cachedResponse = await cacheMachine.get(env, {key});
if (cachedResponse) {
Expand All @@ -112,7 +112,7 @@ async function graphqlHandler(request, env, ctx) {
if (env.USE_ORIGIN === 'true') {
try {
const serverUrl = `https://api.tarkov.dev${graphQLOptions.baseEndpoint}`;
const queryResult = await fetch(serverUrl, {
const queryResult = await fetchWithTimeout(serverUrl, {
method: request.method,
body: JSON.stringify({
query,
Expand All @@ -122,6 +122,7 @@ async function graphqlHandler(request, env, ctx) {
'Content-Type': 'application/json',
'cache-check-complete': 'true',
},
timeout: 20000
});
if (queryResult.status !== 200) {
throw new Error(`${queryResult.status} ${await queryResult.text()}`);
Expand Down Expand Up @@ -174,8 +175,8 @@ async function graphqlHandler(request, env, ctx) {
const response = new Response(body, responseOptions);

if (env.SKIP_CACHE !== 'true' && ttl > 0) {
// using waitUntil doesn't hold up returning a response but keeps the worker alive as long as needed
ctx.waitUntil(cacheMachine.put(env, body, {key, query, variables, ttl, specialCache}));
key = key ?? await cacheMachine.createKey(env, query, variables, specialCache);
ctx.waitUntil(cacheMachine.put(env, body, {key}));
}

return response;
Expand Down
Loading

0 comments on commit 8b6a1cb

Please sign in to comment.