Skip to content

Commit bb65491

Browse files
authored
feat: add support for connecting using a IORedis instance (#129)
1 parent 1d53ae9 commit bb65491

File tree

5 files changed

+143
-37
lines changed

5 files changed

+143
-37
lines changed

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,41 @@ const taskforceConnection = Connect("my connection", "my token", {
141141
});
142142
```
143143

144+
### Using an existing IORedis instance
145+
146+
You can also pass an existing IORedis instance instead of connection options. This is useful when you need more control over the Redis connection configuration:
147+
148+
```ts
149+
import { Connect } from "taskforce-connector";
150+
import Redis from "ioredis";
151+
152+
// Create your own Redis instance with custom configuration
153+
const redisClient = new Redis({
154+
host: "my redis host",
155+
port: 6379,
156+
password: "my redis password",
157+
// Any other IORedis options...
158+
maxRetriesPerRequest: null,
159+
enableReadyCheck: false,
160+
});
161+
162+
const taskforceConnection = Connect("my connection", "my token", redisClient);
163+
```
164+
165+
This also works with Redis Cluster:
166+
167+
```ts
168+
import { Connect } from "taskforce-connector";
169+
import Redis from "ioredis";
170+
171+
const cluster = new Redis.Cluster([
172+
{ host: "node1", port: 6379 },
173+
{ host: "node2", port: 6379 },
174+
]);
175+
176+
const taskforceConnection = Connect("my connection", "my token", cluster);
177+
```
178+
144179
If you are using the On Premises version of Taskforce, you can also specify the backend domain:
145180

146181
```ts

lib/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import { Integration } from "./interfaces/integration";
22
import { Socket, Connection } from "./socket";
33

44
export { Integration } from "./interfaces/integration";
5-
export { getRedisClient, FoundQueue } from "./queue-factory";
5+
export { getRedisClient, FoundQueue, RedisConnection } from "./queue-factory";
66
export { WebSocketClient } from "./ws-autoreconnect";
7-
export { Connection } from "./socket";
7+
export { Connection, ConnectionOptions } from "./socket";
88
export { respond } from "./responders/respond";
99
export { BullMQResponders } from "./responders/bullmq-responders";
1010

lib/queue-factory.ts

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ const queueNameRegExp = new RegExp("(.*):(.*):id");
1313
const maxCount = 150000;
1414
const maxTime = 40000;
1515

16+
export type RedisConnection = Redis | Cluster;
17+
1618
// We keep a redis client that we can reuse for all the queues.
1719
let redisClients: Record<string, Redis | Cluster> = {} as any;
1820

@@ -96,9 +98,10 @@ const getQueueKeys = async (client: Redis | Cluster, queueNames?: string[]) => {
9698
};
9799

98100
export async function getConnectionQueues(
99-
redisOpts: RedisOptions,
101+
redisOpts: RedisOptions | undefined,
100102
clusterNodes?: string[],
101-
queueNames?: string[]
103+
queueNames?: string[],
104+
redisClient?: RedisConnection
102105
): Promise<FoundQueue[]> {
103106
const queues = await execRedisCommand(
104107
redisOpts,
@@ -129,34 +132,74 @@ export async function getConnectionQueues(
129132
);
130133
return queues;
131134
},
132-
clusterNodes
135+
clusterNodes,
136+
redisClient
133137
);
134138

135139
return queues;
136140
}
137141

138-
export async function ping(redisOpts: RedisOptions, clusterNodes?: string[]) {
139-
return execRedisCommand(redisOpts, (client) => client.ping(), clusterNodes);
142+
export async function ping(
143+
redisOpts: RedisOptions | undefined,
144+
clusterNodes?: string[],
145+
redisClient?: RedisConnection
146+
) {
147+
return execRedisCommand(
148+
redisOpts,
149+
(client) => client.ping(),
150+
clusterNodes,
151+
redisClient
152+
);
140153
}
141154

142155
export async function getRedisInfo(
143-
redisOpts: RedisOptions,
144-
clusterNodes?: string[]
156+
redisOpts: RedisOptions | undefined,
157+
clusterNodes?: string[],
158+
redisClient?: RedisConnection
145159
) {
146160
const info = await execRedisCommand(
147161
redisOpts,
148162
(client) => client.info(),
149-
clusterNodes
163+
clusterNodes,
164+
redisClient
150165
);
151166
return info;
152167
}
153168

169+
/**
170+
* Gets or creates a Redis client for queue operations.
171+
*
172+
* @param redisOpts - Redis connection options. Required if existingClient is not provided.
173+
* @param type - The type of queue ("bull" or "bullmq"), used for client caching key.
174+
* @param clusterNodes - Optional list of cluster node URIs for Redis Cluster.
175+
* @param existingClient - Optional pre-configured Redis/Cluster instance.
176+
* @returns A Redis or Cluster client.
177+
*
178+
* @remarks
179+
* When `existingClient` is provided, it is returned directly without caching.
180+
* This allows the caller to manage the client lifecycle independently.
181+
*
182+
* When `redisOpts` are provided (without existingClient), the created client
183+
* is cached internally using a checksum of the options. Subsequent calls with
184+
* the same options will reuse the cached client.
185+
*/
154186
export function getRedisClient(
155-
redisOpts: RedisOptions,
187+
redisOpts: RedisOptions | undefined,
156188
type: "bull" | "bullmq",
157-
clusterNodes?: string[]
189+
clusterNodes?: string[],
190+
existingClient?: RedisConnection
158191
) {
159-
// Compute checksum for redisOpts
192+
// If an existing client is provided, return it directly.
193+
// We don't cache it since the caller owns its lifecycle.
194+
if (existingClient) {
195+
return existingClient;
196+
}
197+
198+
if (!redisOpts) {
199+
throw new Error("Redis options are required when no client is provided");
200+
}
201+
202+
// Compute checksum for redisOpts to use as cache key
160203
const checksumJson = JSON.stringify(redisOpts);
161204
const checksum = require("crypto")
162205
.createHash("md5")
@@ -214,32 +257,34 @@ export function getRedisClient(
214257
}
215258

216259
export async function execRedisCommand(
217-
redisOpts: RedisOptions,
260+
redisOpts: RedisOptions | undefined,
218261
cb: (client: Redis | Cluster) => any,
219-
clusterNodes?: string[]
262+
clusterNodes?: string[],
263+
redisClient?: RedisConnection
220264
) {
221-
const redisClient = getRedisClient(redisOpts, "bull", clusterNodes);
265+
const client = getRedisClient(redisOpts, "bull", clusterNodes, redisClient);
222266

223-
const result = await cb(redisClient);
267+
const result = await cb(client);
224268

225269
return result;
226270
}
227271

228272
export function createQueue(
229273
foundQueue: FoundQueue,
230-
redisOpts: RedisOptions,
274+
redisOpts: RedisOptions | undefined,
231275
opts: {
232276
nodes?: string[];
233277
integrations?: {
234278
[key: string]: Integration;
235279
};
280+
redisClient?: RedisConnection;
236281
} = {}
237282
): { queue: Bull.Queue | Queue; responders: Responders } {
238-
const { nodes, integrations } = opts;
283+
const { nodes, integrations, redisClient } = opts;
239284
const createClient = function (type: "client" /*, redisOpts */) {
240285
switch (type) {
241286
case "client":
242-
return getRedisClient(redisOpts, "bull", nodes);
287+
return getRedisClient(redisOpts, "bull", nodes, redisClient);
243288
default:
244289
throw new Error(`Unexpected connection type: ${type}`);
245290
}
@@ -255,7 +300,7 @@ export function createQueue(
255300

256301
switch (foundQueue.type) {
257302
case "bullmq":
258-
const connection = getRedisClient(redisOpts, "bullmq", nodes);
303+
const connection = getRedisClient(redisOpts, "bullmq", nodes, redisClient);
259304
switch (foundQueue.majorVersion) {
260305
case 0:
261306
return {

lib/queues-cache.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@ import * as Bull from "bull";
22
import { Queue } from "bullmq";
33
import { RedisOptions } from "ioredis";
44
import { keyBy } from "lodash";
5-
import { FoundQueue, createQueue, getConnectionQueues } from "./queue-factory";
5+
import {
6+
FoundQueue,
7+
RedisConnection,
8+
createQueue,
9+
getConnectionQueues,
10+
} from "./queue-factory";
611
import { Responders } from "./interfaces/responders";
712
import { Integration } from "./interfaces/integration";
813

@@ -21,17 +26,23 @@ export function queueKey(
2126
}
2227

2328
export async function updateQueuesCache(
24-
redisOpts: RedisOptions,
29+
redisOpts: RedisOptions | undefined,
2530
opts: {
2631
nodes?: string[];
2732
integrations?: {
2833
[key: string]: Integration;
2934
};
3035
queueNames?: string[];
31-
} = {}
36+
} = {},
37+
redisClient?: RedisConnection
3238
) {
3339
const { nodes, integrations, queueNames } = opts;
34-
const newQueues = await getConnectionQueues(redisOpts, nodes, queueNames);
40+
const newQueues = await getConnectionQueues(
41+
redisOpts,
42+
nodes,
43+
queueNames,
44+
redisClient
45+
);
3546

3647
queuesCache = queuesCache || {};
3748

@@ -70,7 +81,11 @@ export async function updateQueuesCache(
7081

7182
toAdd.forEach(function (foundQueue: FoundQueue) {
7283
const key = queueKey(foundQueue);
73-
const queue = createQueue(foundQueue, redisOpts, { nodes, integrations });
84+
const queue = createQueue(foundQueue, redisOpts, {
85+
nodes,
86+
integrations,
87+
redisClient,
88+
});
7489
if (queue) {
7590
queuesCache[key] = queue;
7691
}

lib/socket.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import { RedisOptions } from "ioredis";
1+
import { Redis, Cluster, RedisOptions } from "ioredis";
22
import { pick } from "lodash";
33
import { getCache, updateQueuesCache, queueKey } from "./queues-cache";
44
import { WebSocketClient } from "./ws-autoreconnect";
55
import {
66
FoundQueue,
7+
RedisConnection,
78
execRedisCommand,
89
getRedisInfo,
910
ping,
@@ -15,7 +16,7 @@ const { version } = require(`${__dirname}/../package.json`);
1516

1617
const chalk = require("chalk");
1718

18-
export interface Connection {
19+
export interface ConnectionOptions {
1920
port?: number;
2021
host?: string;
2122
password?: string;
@@ -24,6 +25,8 @@ export interface Connection {
2425
tls?: object;
2526
}
2627

28+
export type Connection = ConnectionOptions | RedisConnection;
29+
2730
export const Socket = (
2831
name: string,
2932
server: string,
@@ -40,7 +43,10 @@ export const Socket = (
4043
) => {
4144
const { team, nodes } = opts;
4245
const ws = new WebSocketClient();
43-
const redisOpts = redisOptsFromConnection(connection);
46+
const redisOpts = isRedisInstance(connection)
47+
? undefined
48+
: redisOptsFromConnection(connection);
49+
const redisClient = isRedisInstance(connection) ? connection : undefined;
4450

4551
ws.open(server, {
4652
headers: {
@@ -92,7 +98,7 @@ export const Socket = (
9298
//
9399
// Send this connection.
94100
//
95-
const queues = await updateQueuesCache(redisOpts, opts);
101+
const queues = await updateQueuesCache(redisOpts, opts, redisClient);
96102
console.log(
97103
`${chalk.yellow("WebSocket:")} ${chalk.green(
98104
"sending connection:"
@@ -133,7 +139,7 @@ export const Socket = (
133139
case "jobs":
134140
let cache = getCache();
135141
if (!cache) {
136-
await updateQueuesCache(redisOpts, opts);
142+
await updateQueuesCache(redisOpts, opts, redisClient);
137143
cache = getCache();
138144
if (!cache) {
139145
throw new Error("Unable to update queues");
@@ -177,12 +183,12 @@ export const Socket = (
177183

178184
switch (data.cmd) {
179185
case "ping":
180-
const pong = await ping(redisOpts, nodes);
186+
const pong = await ping(redisOpts, nodes, redisClient);
181187
respond(msg.id, startTime, pong);
182188
break;
183189
case "getConnection":
184190
{
185-
const queues = await updateQueuesCache(redisOpts, opts);
191+
const queues = await updateQueuesCache(redisOpts, opts, redisClient);
186192

187193
console.log(
188194
`${chalk.yellow("WebSocket:")} ${chalk.green(
@@ -204,23 +210,24 @@ export const Socket = (
204210
break;
205211
case "getQueues":
206212
{
207-
const queues = await updateQueuesCache(redisOpts, opts);
213+
const queues = await updateQueuesCache(redisOpts, opts, redisClient);
208214

209215
logSendingQueues(queues);
210216

211217
respond(msg.id, startTime, queues);
212218
}
213219
break;
214220
case "getInfo":
215-
const info = await getRedisInfo(redisOpts, nodes);
221+
const info = await getRedisInfo(redisOpts, nodes, redisClient);
216222
respond(msg.id, startTime, info);
217223
break;
218224

219225
case "getQueueType":
220226
const queueType = await execRedisCommand(
221227
redisOpts,
222228
(client) => getQueueType(data.name, data.prefix, client),
223-
nodes
229+
nodes,
230+
redisClient
224231
);
225232
respond(msg.id, startTime, { queueType });
226233
break;
@@ -249,7 +256,11 @@ export const Socket = (
249256
}
250257
};
251258

252-
function redisOptsFromConnection(connection: Connection): RedisOptions {
259+
function isRedisInstance(connection: Connection): connection is RedisConnection {
260+
return connection instanceof Redis || connection instanceof Cluster;
261+
}
262+
263+
function redisOptsFromConnection(connection: ConnectionOptions): RedisOptions {
253264
let opts: RedisOptions = {
254265
...pick(connection, [
255266
"host",

0 commit comments

Comments
 (0)