From 18a67633e09852c02705f66d15856a5c3e3ab057 Mon Sep 17 00:00:00 2001 From: Razzmatazz Date: Tue, 10 Sep 2024 15:24:39 -0500 Subject: [PATCH] add queue for http server kv requests --- http/cloudflare-kv.mjs | 61 ++++++++++++++++++++++++++++++++++++++++++ http/env-binding.mjs | 18 +++---------- http/test-kv.mjs | 40 +++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 15 deletions(-) create mode 100644 http/cloudflare-kv.mjs create mode 100644 http/test-kv.mjs diff --git a/http/cloudflare-kv.mjs b/http/cloudflare-kv.mjs new file mode 100644 index 00000000..e047232b --- /dev/null +++ b/http/cloudflare-kv.mjs @@ -0,0 +1,61 @@ +import { EventEmitter } from 'node:events'; + +import fetchWithTimeout from '../utils/fetch-with-timeout.mjs'; + +const completeEmitter = new EventEmitter(); + +const accountId = '424ad63426a1ae47d559873f929eb9fc'; + +const productionNamespaceId = '2e6feba88a9e4097b6d2209191ed4ae5'; +const devNameSpaceID = '17fd725f04984e408d4a70b37c817171'; + +const requestLimit = 6; + +let pending = []; +const queue = []; + +const checkQueue = async () => { + if (pending.length >= requestLimit) { + return; + } + if (queue.length < 1) { + return; + } + const kvName = queue.shift(); + pending.push(kvName); + + const namespaceId = process.env.ENVIRONMENT === 'production' ? productionNamespaceId : devNameSpaceID; + const url = `https://api.cloudflare.com/client/v4/accounts/${accountId}/storage/kv/namespaces/${namespaceId}/values/${kvName}`; + let response; + try { + response = await fetchWithTimeout(url, { + method: 'GET', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${process.env.CLOUDFLARE_TOKEN}`, + }, + timeout: 9000, + }); + completeEmitter.emit(kvName, response); + } catch (error) { + //response = new Response(null, {status: 500, statusText: error.message}); + queue.unshift(kvName); + } finally { + pending = pending.filter(kv => kv !== kvName); + } + checkQueue(); +}; + +const cloudflareKv = { + get: async (kvName) => { + return new Promise((resolve) => { + completeEmitter.once(kvName, resolve); + if (!pending.includes(kvName) && !queue.includes(kvName)) { + queue.push(kvName); + } + checkQueue(); + }); + }, +}; + +export default cloudflareKv; diff --git a/http/env-binding.mjs b/http/env-binding.mjs index 507c115d..f836cef0 100644 --- a/http/env-binding.mjs +++ b/http/env-binding.mjs @@ -6,11 +6,7 @@ import { EventEmitter } from 'node:events'; import { v4 as uuidv4} from 'uuid'; import cacheMachine from '../utils/cache-machine.mjs'; - -const accountId = '424ad63426a1ae47d559873f929eb9fc'; - -const productionNamespaceId = '2e6feba88a9e4097b6d2209191ed4ae5'; -const devNameSpaceID = '17fd725f04984e408d4a70b37c817171'; +import cloudflareKv from './cloudflare-kv.mjs'; const emitter = new EventEmitter(); @@ -43,15 +39,7 @@ async function messageParentProcess(message) { } async function getDataPrimary(kvName, format) { - const namespaceId = process.env.ENVIRONMENT === 'production' ? productionNamespaceId : devNameSpaceID; - const url = `https://api.cloudflare.com/client/v4/accounts/${accountId}/storage/kv/namespaces/${namespaceId}/values/${kvName}`; - const response = await fetch(url, { - method: 'GET', - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${process.env.CLOUDFLARE_TOKEN}`, - }, - }); + const response = await cloudflareKv.get(kvName); if (response.status === 404) { return null; } @@ -68,7 +56,7 @@ async function getDataPrimary(kvName, format) { } async function getDataWorker(kvName, format) { - return messageParentProcess({action: 'getKv', kvName}); + return messageParentProcess({action: 'getKv', kvName, timeout: 25000}); } const DATA_CACHE = { diff --git a/http/test-kv.mjs b/http/test-kv.mjs new file mode 100644 index 00000000..18090ae2 --- /dev/null +++ b/http/test-kv.mjs @@ -0,0 +1,40 @@ +import 'dotenv/config'; + +import DataAPI from '../datasources/index.mjs'; +import cloudflareKv from './cloudflare-kv.mjs'; + +const data = new DataAPI(); + +const getKv = async (kvName) => { + const response = await cloudflareKv.get(kvName); + if (response.status !== 200) { + console.error('error', kvName, `${response.status} ${response.statusText}`); + return; + } + console.log(response.status, kvName, (await response.text()).length); +}; + +for (const workerName in data.worker) { + const worker = data.worker[workerName]; + for (const gameMode of worker.gameModes) { + let kvName = worker.kvName; + let suffix = ''; + if (gameMode !== 'regular') { + suffix = `_${gameMode}`; + } + try { + if (worker.kvs) { + for (const hexKey in worker.kvs) { + const splitWorker = worker.kvs[hexKey]; + const fullKvName = `${splitWorker.kvName}${suffix}`; + getKv(fullKvName); + } + } else { + const fullKvName = `${kvName}${suffix}`; + getKv(fullKvName); + } + } catch (error) { + console.error(kvName, gameMode, error); + } + } +}