Skip to content

Commit a420326

Browse files
committed
implement cache update cooldown
1 parent 6d2ac6a commit a420326

7 files changed

+178
-112
lines changed

graphql-yoga.mjs

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export default async function getYoga(env) {
2727
yoga = createYoga({
2828
schema: (context) => {
2929
// this context only has the env vars present on creation
30+
console.log('schema context', Object.keys(context));
3031
context.request.requestId = uuidv4();
3132
if (env.ctx) {
3233
context.request.ctx = env.ctx;

http/env-binding.mjs

+38-14
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,23 @@ import { EventEmitter } from 'node:events';
55

66
import { v4 as uuidv4} from 'uuid';
77

8+
import cacheMachine from '../utils/cache-machine.mjs';
9+
810
const accountId = '424ad63426a1ae47d559873f929eb9fc';
911

1012
const productionNamespaceId = '2e6feba88a9e4097b6d2209191ed4ae5';
1113
const devNameSpaceID = '17fd725f04984e408d4a70b37c817171';
1214

13-
let emitter;
15+
const emitter = new EventEmitter();
16+
17+
if (!cluster.isPrimary) {
18+
process.on('message', (message) => {
19+
if (!message.id) {
20+
return;
21+
}
22+
emitter.emit(message.id, message);
23+
});
24+
}
1425

1526
async function getDataPrimary(kvName, format) {
1627
const namespaceId = process.env.ENVIRONMENT === 'production' ? productionNamespaceId : devNameSpaceID;
@@ -38,18 +49,6 @@ async function getDataPrimary(kvName, format) {
3849
}
3950

4051
async function getDataWorker(kvName, format) {
41-
if (!emitter) {
42-
emitter = new EventEmitter()
43-
process.on('message', (message) => {
44-
if (!message.id) {
45-
return;
46-
}
47-
if (message.action !== 'kvData') {
48-
return;
49-
}
50-
emitter.emit(message.id, message);
51-
});
52-
}
5352
return new Promise((resolve, reject) => {
5453
const messageId = uuidv4();
5554
emitter.once(messageId, (message) => {
@@ -76,10 +75,35 @@ const DATA_CACHE = {
7675
},
7776
};
7877

78+
const putCacheWorker = (env, body, options) => {
79+
return new Promise(async (resolve, reject) => {
80+
const messageId = uuidv4();
81+
emitter.once(messageId, (message) => {
82+
if (message.error) {
83+
return reject(new Error(message.error));
84+
}
85+
resolve(message.data);
86+
});
87+
const key = await cacheMachine.createKey(env, options.query, options.variables, options.specialCache);
88+
process.send({action: 'cacheResponse', key, body, ttl: options.ttl, id: messageId});
89+
});
90+
};
91+
92+
const RESPONSE_CACHE = {
93+
get: cacheMachine.get,
94+
put: (env, body, options) => {
95+
if (cluster.isPrimary) {
96+
return cacheMachine.put(env, body, options);
97+
}
98+
return putCacheWorker(env, body, options);
99+
},
100+
};
101+
79102
export default function getEnv() {
80103
return {
81104
...process.env,
82105
DATA_CACHE,
83-
ctx: {waitUntil: () => {}},
106+
RESPONSE_CACHE,
107+
//ctx: {waitUntil: () => {}},
84108
}
85109
};

http/index.mjs

+29
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import 'dotenv/config';
66

77
import getYoga from '../graphql-yoga.mjs';
88
import getEnv from './env-binding.mjs';
9+
import cacheMachine from '../utils/cache-machine.mjs';
910

1011
const port = process.env.PORT ?? 8788;
1112
const workerCount = parseInt(process.env.WORKERS ?? String(os.cpus().length - 1));
@@ -18,6 +19,7 @@ if (cluster.isPrimary && workerCount > 0) {
1819
const kvStore = {};
1920
const kvLoading = {};
2021
const kvRefreshTimeout = {};
22+
const cachePending = {};
2123
const msOneMinute = 1000 * 60;
2224
const msHalfHour = msOneMinute * 30;
2325
const env = getEnv();
@@ -90,6 +92,33 @@ if (cluster.isPrimary && workerCount > 0) {
9092
}
9193
cluster.workers[id].send(response);
9294
}
95+
if (message.action === 'cacheResponse') {
96+
const response = {
97+
id: message.id,
98+
data: false,
99+
};
100+
try {
101+
if (cachePending[message.key]) {
102+
console.log('cache put pending');
103+
response.data = await cachePending[message.key];
104+
} else {
105+
let cachePutCooldown = message.ttl ? message.ttl * 1000 : 60000;
106+
console.log('doing cache put', cachePutCooldown);
107+
cachePending[message.key] = cacheMachine.put(process.env, message.body, {key: message.key, ttl: message.ttl}).catch(error => {
108+
cachePutCooldown = 10000;
109+
return Promise.reject(error);
110+
}).finally(() => {
111+
setTimeout(() => {
112+
delete cachePending[message.key];
113+
}, cachePutCooldown);
114+
});
115+
response.data = await cachePending[message.key];
116+
}
117+
} catch (error) {
118+
response.error = error.message;
119+
}
120+
cluster.workers[id].send(response);
121+
}
93122
});
94123
}
95124

index.mjs

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
2-
/*import getYoga from './graphql-yoga.mjs';
1+
/*import cacheMachine from './utils/cache-machine.mjs';
2+
import getYoga from './graphql-yoga.mjs';
33
import graphQLOptions from './utils/graphql-options.mjs';
44
55
export default {
66
async fetch(request, env, ctx) {
77
try {
8-
const yoga = await getYoga(env);
9-
return yoga.fetch(request, {...env, ctx});
8+
const yoga = await getYoga({...env, RESPONSE_CACHE: cacheMachine});
9+
return yoga.fetch(request, {...env, ctx, RESPONSE_CACHE: cacheMachine});
1010
} catch (err) {
1111
console.log(err);
1212
return new Response(graphQLOptions.debug ? err : 'Something went wrong', { status: 500 });
@@ -173,7 +173,7 @@ async function graphqlHandler(request, env, ctx) {
173173

174174
if (env.SKIP_CACHE !== 'true' && ttl > 0) {
175175
// using waitUntil doesn't hold up returning a response but keeps the worker alive as long as needed
176-
ctx.waitUntil(cacheMachine.put(env, query, variables, body, String(ttl), specialCache));
176+
ctx.waitUntil(cacheMachine.put(env, body, {query, variables, ttl, specialCache}));
177177
}
178178

179179
return response;

plugins/plugin-request-timer.mjs

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ export default function useRequestTimer() {
22
return {
33
onRequest({ request, url, endResponse, serverContext, fetchAPI }) {
44
request.startTime = new Date();
5+
if (serverContext.waitUntil) {
6+
request.ctx = { waitUntil: serverContext.waitUntil };
7+
}
58
},
69
onResponse({request, response, serverContext, setResponse, fetchAPI}) {
710
console.log(`Response sent in ${new Date() - request.startTime}ms`);

plugins/plugin-use-cache-machine.mjs

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ export default function useCacheMachine(env) {
7777
// using waitUntil doesn't hold up returning a response but keeps the worker alive as long as needed
7878
const cacheBody = JSON.stringify(result);
7979
if (cacheBody.length > 0) {
80-
const cachePut = cacheMachine.put(env, request.params.query, request.params.variables, cacheBody, String(ttl), sCache);
80+
const cachePut = env.RESPONSE_CACHE.put(env, cacheBody, {query: request.params.query, variables: request.params.variables, ttl, specialCache: sCache});
8181
if (typeof process === 'undefined') {
8282
request.ctx.waitUntil(cachePut);
8383
}

utils/cache-machine.mjs

+101-92
Original file line numberDiff line numberDiff line change
@@ -38,103 +38,112 @@ async function hash(string) {
3838
return hashHex;
3939
}
4040

41-
// Updates the cache with the results of a query
42-
// :param json: the incoming request in json
43-
// :param body: the body to cache
44-
// :return: true if successful, false if not
45-
async function updateCache(env, query, variables, body, ttl = '', specialCache = '') {
46-
try {
47-
if (!env.CACHE_BASIC_AUTH) {
48-
console.warn('env.CACHE_BASIC_AUTH is not set; skipping cache check');
49-
return false;
50-
}
51-
if (cachePaused) {
52-
console.warn('Cache paused; skipping cache update');
53-
return false;
41+
const cacheMachine = {
42+
createKey: (environment, query, variables, specialCache = '') => {
43+
if (typeof variables !== 'string') {
44+
variables = JSON.stringify(variables);
5445
}
55-
// Get the cacheKey from the request
5646
query = query.trim();
57-
const cacheKey = await hash(env.ENVIRONMENT + query + JSON.stringify(variables) + specialCache);
58-
console.log(`Caching ${body.length} byte response for ${env.ENVIRONMENT} environment${ttl ? ` for ${ttl} seconds` : ''}`);
59-
60-
// headers and POST body
61-
const headersPost = {
62-
body: JSON.stringify({ key: cacheKey, value: body, ttl }),
63-
method: 'POST',
64-
headers: {
65-
'content-type': 'application/json;charset=UTF-8',
66-
'Authorization': `Basic ${env.CACHE_BASIC_AUTH}`
67-
},
68-
timeout: 10000,
69-
};
70-
71-
// Update the cache
72-
const response = await fetchWithTimeout(`${cacheUrl}/api/cache`, headersPost);
73-
74-
// Log non-200 responses
75-
if (response.status !== 200) {
76-
console.error(`failed to write to cache: ${response.status}`);
47+
return hash(environment + query + variables + specialCache);
48+
},
49+
// Checks the caching service to see if a request has been cached
50+
// :param json: the json payload of the incoming worker request
51+
// :return: json results of the item found in the cache or false if not found
52+
get: async (env, query, variables, specialCache = '') => {
53+
try {
54+
if (!env.CACHE_BASIC_AUTH) {
55+
console.warn('env.CACHE_BASIC_AUTH is not set; skipping cache check');
56+
return false;
57+
}
58+
if (cachePaused) {
59+
console.warn('Cache paused; skipping cache check');
60+
return false;
61+
}
62+
query = query.trim();
63+
const cacheKey = await cacheMachine.createKey(env.ENVIRONMENT, query, variables, specialCache);
64+
if (!cacheKey) {
65+
console.warn('Skipping cache check; key is empty');
66+
return false;
67+
}
68+
69+
const response = await fetchWithTimeout(`${cacheUrl}/api/cache?key=${cacheKey}`, {
70+
headers: {
71+
'content-type': 'application/json;charset=UTF-8',
72+
'Authorization': `Basic ${env.CACHE_BASIC_AUTH}`
73+
},
74+
});
75+
cacheFailCount = 0;
76+
if (response.status === 200) {
77+
return await response.json();
78+
} else if (response.status !== 404) {
79+
console.error(`failed to read from cache: ${response.status}`);
80+
}
81+
7782
return false
78-
}
79-
cacheFailCount = 0;
80-
return true
81-
} catch (error) {
82-
if (error.message === 'The operation was aborted due to timeout') {
83-
console.warn('Updating cache timed out');
84-
pauseCache();
85-
return false;
86-
}
87-
console.error('updateCache error: ' + error.message);
88-
return false;
89-
}
90-
}
91-
92-
// Checks the caching service to see if a request has been cached
93-
// :param json: the json payload of the incoming worker request
94-
// :return: json results of the item found in the cache or false if not found
95-
async function checkCache(env, query, variables, specialCache = '') {
96-
try {
97-
if (!env.CACHE_BASIC_AUTH) {
98-
console.warn('env.CACHE_BASIC_AUTH is not set; skipping cache check');
83+
} catch (error) {
84+
if (error.message === 'The operation was aborted due to timeout') {
85+
console.warn('Checking cache timed out');
86+
pauseCache();
87+
return false;
88+
}
89+
console.error('checkCache error: ' + error.message);
9990
return false;
10091
}
101-
if (cachePaused) {
102-
console.warn('Cache paused; skipping cache check');
92+
},
93+
// Updates the cache with the results of a query
94+
// :param json: the incoming request in json
95+
// :param body: the body to cache
96+
// :return: true if successful, false if not
97+
put: async (env, body, options = {}) => {
98+
try {
99+
if (!env.CACHE_BASIC_AUTH) {
100+
console.warn('env.CACHE_BASIC_AUTH is not set; skipping cache put');
101+
return false;
102+
}
103+
if (cachePaused) {
104+
console.warn('Cache paused; skipping cache update');
105+
return false;
106+
}
107+
if (!options.key && !options.query) {
108+
console.warn('Key or query not provided, skipping cache put');
109+
return false;
110+
}
111+
let { key, query, variables, ttl = 60, specialCache = '' } = options;
112+
if (!key) {
113+
query = query.trim();
114+
key = await cacheMachine.createKey(env.ENVIRONMENT, query, variables, specialCache);
115+
}
116+
ttl = String(ttl);
117+
console.log(`Caching ${body.length} byte response for ${env.ENVIRONMENT} environment${ttl ? ` for ${ttl} seconds` : ''}`);
118+
119+
// Update the cache
120+
const response = await fetchWithTimeout(`${cacheUrl}/api/cache`, {
121+
body: JSON.stringify({ key, value: body, ttl }),
122+
method: 'POST',
123+
headers: {
124+
'content-type': 'application/json;charset=UTF-8',
125+
'Authorization': `Basic ${env.CACHE_BASIC_AUTH}`
126+
},
127+
timeout: 10000,
128+
});
129+
130+
// Log non-200 responses
131+
if (response.status !== 200) {
132+
console.error(`failed to write to cache: ${response.status}`);
133+
return false
134+
}
135+
cacheFailCount = 0;
136+
return true
137+
} catch (error) {
138+
if (error.message === 'The operation was aborted due to timeout') {
139+
console.warn('Updating cache timed out');
140+
pauseCache();
141+
return false;
142+
}
143+
console.error('updateCache error: ' + error.message);
103144
return false;
104145
}
105-
query = query.trim();
106-
const cacheKey = await hash(env.ENVIRONMENT + query + JSON.stringify(variables) + specialCache);
107-
if (!cacheKey) {
108-
console.warn('Skipping cache check; key is empty');
109-
return false;
110-
}
111-
112-
const response = await fetchWithTimeout(`${cacheUrl}/api/cache?key=${cacheKey}`, {
113-
headers: {
114-
'content-type': 'application/json;charset=UTF-8',
115-
'Authorization': `Basic ${env.CACHE_BASIC_AUTH}`
116-
},
117-
});
118-
cacheFailCount = 0;
119-
if (response.status === 200) {
120-
return await response.json();
121-
} else if (response.status !== 404) {
122-
console.error(`failed to read from cache: ${response.status}`);
123-
}
124-
125-
return false
126-
} catch (error) {
127-
if (error.message === 'The operation was aborted due to timeout') {
128-
console.warn('Checking cache timed out');
129-
pauseCache();
130-
return false;
131-
}
132-
console.error('checkCache error: ' + error.message);
133-
return false;
134-
}
135-
}
136-
137-
export default {
138-
get: checkCache,
139-
put: updateCache
146+
},
140147
};
148+
149+
export default cacheMachine;

0 commit comments

Comments
 (0)