Skip to content

Commit 991af29

Browse files
committed
feat: add support for connecting using a IORedis instance
1 parent 4c71a39 commit 991af29

File tree

5 files changed

+122
-36
lines changed

5 files changed

+122
-36
lines changed

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,41 @@ const taskforceConnection = Connect("my connection", "my token", {
141141
});
142142
```
143143

144+
### Using an existing IORedis instance
145+
146+
You can also pass an existing IORedis instance instead of connection options. This is useful when you need more control over the Redis connection configuration:
147+
148+
```ts
149+
import { Connect } from "taskforce-connector";
150+
import Redis from "ioredis";
151+
152+
// Create your own Redis instance with custom configuration
153+
const redisClient = new Redis({
154+
host: "my redis host",
155+
port: 6379,
156+
password: "my redis password",
157+
// Any other IORedis options...
158+
maxRetriesPerRequest: null,
159+
enableReadyCheck: false,
160+
});
161+
162+
const taskforceConnection = Connect("my connection", "my token", redisClient);
163+
```
164+
165+
This also works with Redis Cluster:
166+
167+
```ts
168+
import { Connect } from "taskforce-connector";
169+
import Redis from "ioredis";
170+
171+
const cluster = new Redis.Cluster([
172+
{ host: "node1", port: 6379 },
173+
{ host: "node2", port: 6379 },
174+
]);
175+
176+
const taskforceConnection = Connect("my connection", "my token", cluster);
177+
```
178+
144179
If you are using the On Premises version of Taskforce, you can also specify the backend domain:
145180

146181
```ts

lib/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import { Integration } from "./interfaces/integration";
22
import { Socket, Connection } from "./socket";
33

44
export { Integration } from "./interfaces/integration";
5-
export { getRedisClient, FoundQueue } from "./queue-factory";
5+
export { getRedisClient, FoundQueue, RedisConnection } from "./queue-factory";
66
export { WebSocketClient } from "./ws-autoreconnect";
7-
export { Connection } from "./socket";
7+
export { Connection, ConnectionOptions, RedisConnection as RedisClient } from "./socket";
88
export { respond } from "./responders/respond";
99
export { BullMQResponders } from "./responders/bullmq-responders";
1010

lib/queue-factory.ts

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ const queueNameRegExp = new RegExp("(.*):(.*):id");
1313
const maxCount = 150000;
1414
const maxTime = 40000;
1515

16+
export type RedisConnection = Redis | Cluster;
17+
1618
// We keep a redis client that we can reuse for all the queues.
1719
let redisClients: Record<string, Redis | Cluster> = {} as any;
1820

@@ -96,9 +98,10 @@ const getQueueKeys = async (client: Redis | Cluster, queueNames?: string[]) => {
9698
};
9799

98100
export async function getConnectionQueues(
99-
redisOpts: RedisOptions,
101+
redisOpts: RedisOptions | undefined,
100102
clusterNodes?: string[],
101-
queueNames?: string[]
103+
queueNames?: string[],
104+
redisClient?: RedisConnection
102105
): Promise<FoundQueue[]> {
103106
const queues = await execRedisCommand(
104107
redisOpts,
@@ -129,33 +132,55 @@ export async function getConnectionQueues(
129132
);
130133
return queues;
131134
},
132-
clusterNodes
135+
clusterNodes,
136+
redisClient
133137
);
134138

135139
return queues;
136140
}
137141

138-
export async function ping(redisOpts: RedisOptions, clusterNodes?: string[]) {
139-
return execRedisCommand(redisOpts, (client) => client.ping(), clusterNodes);
142+
export async function ping(
143+
redisOpts: RedisOptions | undefined,
144+
clusterNodes?: string[],
145+
redisClient?: RedisConnection
146+
) {
147+
return execRedisCommand(
148+
redisOpts,
149+
(client) => client.ping(),
150+
clusterNodes,
151+
redisClient
152+
);
140153
}
141154

142155
export async function getRedisInfo(
143-
redisOpts: RedisOptions,
144-
clusterNodes?: string[]
156+
redisOpts: RedisOptions | undefined,
157+
clusterNodes?: string[],
158+
redisClient?: RedisConnection
145159
) {
146160
const info = await execRedisCommand(
147161
redisOpts,
148162
(client) => client.info(),
149-
clusterNodes
163+
clusterNodes,
164+
redisClient
150165
);
151166
return info;
152167
}
153168

154169
export function getRedisClient(
155-
redisOpts: RedisOptions,
170+
redisOpts: RedisOptions | undefined,
156171
type: "bull" | "bullmq",
157-
clusterNodes?: string[]
172+
clusterNodes?: string[],
173+
existingClient?: RedisConnection
158174
) {
175+
// If we have an existing client, use it directly
176+
if (existingClient) {
177+
return existingClient;
178+
}
179+
180+
if (!redisOpts) {
181+
throw new Error("Redis options are required when no client is provided");
182+
}
183+
159184
// Compute checksum for redisOpts
160185
const checksumJson = JSON.stringify(redisOpts);
161186
const checksum = require("crypto")
@@ -214,32 +239,34 @@ export function getRedisClient(
214239
}
215240

216241
export async function execRedisCommand(
217-
redisOpts: RedisOptions,
242+
redisOpts: RedisOptions | undefined,
218243
cb: (client: Redis | Cluster) => any,
219-
clusterNodes?: string[]
244+
clusterNodes?: string[],
245+
redisClient?: RedisConnection
220246
) {
221-
const redisClient = getRedisClient(redisOpts, "bull", clusterNodes);
247+
const client = getRedisClient(redisOpts, "bull", clusterNodes, redisClient);
222248

223-
const result = await cb(redisClient);
249+
const result = await cb(client);
224250

225251
return result;
226252
}
227253

228254
export function createQueue(
229255
foundQueue: FoundQueue,
230-
redisOpts: RedisOptions,
256+
redisOpts: RedisOptions | undefined,
231257
opts: {
232258
nodes?: string[];
233259
integrations?: {
234260
[key: string]: Integration;
235261
};
262+
redisClient?: RedisConnection;
236263
} = {}
237264
): { queue: Bull.Queue | Queue; responders: Responders } {
238-
const { nodes, integrations } = opts;
265+
const { nodes, integrations, redisClient } = opts;
239266
const createClient = function (type: "client" /*, redisOpts */) {
240267
switch (type) {
241268
case "client":
242-
return getRedisClient(redisOpts, "bull", nodes);
269+
return getRedisClient(redisOpts, "bull", nodes, redisClient);
243270
default:
244271
throw new Error(`Unexpected connection type: ${type}`);
245272
}
@@ -255,7 +282,7 @@ export function createQueue(
255282

256283
switch (foundQueue.type) {
257284
case "bullmq":
258-
const connection = getRedisClient(redisOpts, "bullmq", nodes);
285+
const connection = getRedisClient(redisOpts, "bullmq", nodes, redisClient);
259286
switch (foundQueue.majorVersion) {
260287
case 0:
261288
return {

lib/queues-cache.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import * as Bull from "bull";
22
import { Queue } from "bullmq";
3-
import { RedisOptions } from "ioredis";
3+
import { Redis, Cluster, RedisOptions } from "ioredis";
44
import { keyBy } from "lodash";
55
import { FoundQueue, createQueue, getConnectionQueues } from "./queue-factory";
66
import { Responders } from "./interfaces/responders";
77
import { Integration } from "./interfaces/integration";
88

9+
export type RedisConnection = Redis | Cluster;
10+
911
let queuesCache: {
1012
[index: string]: { queue: Bull.Queue | Queue; responders: Responders };
1113
} = null;
@@ -21,17 +23,23 @@ export function queueKey(
2123
}
2224

2325
export async function updateQueuesCache(
24-
redisOpts: RedisOptions,
26+
redisOpts: RedisOptions | undefined,
2527
opts: {
2628
nodes?: string[];
2729
integrations?: {
2830
[key: string]: Integration;
2931
};
3032
queueNames?: string[];
31-
} = {}
33+
} = {},
34+
redisClient?: RedisConnection
3235
) {
3336
const { nodes, integrations, queueNames } = opts;
34-
const newQueues = await getConnectionQueues(redisOpts, nodes, queueNames);
37+
const newQueues = await getConnectionQueues(
38+
redisOpts,
39+
nodes,
40+
queueNames,
41+
redisClient
42+
);
3543

3644
queuesCache = queuesCache || {};
3745

@@ -70,7 +78,11 @@ export async function updateQueuesCache(
7078

7179
toAdd.forEach(function (foundQueue: FoundQueue) {
7280
const key = queueKey(foundQueue);
73-
const queue = createQueue(foundQueue, redisOpts, { nodes, integrations });
81+
const queue = createQueue(foundQueue, redisOpts, {
82+
nodes,
83+
integrations,
84+
redisClient,
85+
});
7486
if (queue) {
7587
queuesCache[key] = queue;
7688
}

lib/socket.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { RedisOptions } from "ioredis";
1+
import { Redis, Cluster, RedisOptions } from "ioredis";
22
import { pick } from "lodash";
33
import { getCache, updateQueuesCache, queueKey } from "./queues-cache";
44
import { WebSocketClient } from "./ws-autoreconnect";
@@ -11,11 +11,13 @@ import {
1111
import { getQueueType, redisOptsFromUrl } from "./utils";
1212
import { Integration } from "./interfaces/integration";
1313

14+
export type RedisConnection = Redis | Cluster;
15+
1416
const { version } = require(`${__dirname}/../package.json`);
1517

1618
const chalk = require("chalk");
1719

18-
export interface Connection {
20+
export interface ConnectionOptions {
1921
port?: number;
2022
host?: string;
2123
password?: string;
@@ -24,6 +26,8 @@ export interface Connection {
2426
tls?: object;
2527
}
2628

29+
export type Connection = ConnectionOptions | RedisConnection;
30+
2731
export const Socket = (
2832
name: string,
2933
server: string,
@@ -40,7 +44,10 @@ export const Socket = (
4044
) => {
4145
const { team, nodes } = opts;
4246
const ws = new WebSocketClient();
43-
const redisOpts = redisOptsFromConnection(connection);
47+
const redisOpts = isRedisInstance(connection)
48+
? undefined
49+
: redisOptsFromConnection(connection);
50+
const redisClient = isRedisInstance(connection) ? connection : undefined;
4451

4552
ws.open(server, {
4653
headers: {
@@ -92,7 +99,7 @@ export const Socket = (
9299
//
93100
// Send this connection.
94101
//
95-
const queues = await updateQueuesCache(redisOpts, opts);
102+
const queues = await updateQueuesCache(redisOpts, opts, redisClient);
96103
console.log(
97104
`${chalk.yellow("WebSocket:")} ${chalk.green(
98105
"sending connection:"
@@ -133,7 +140,7 @@ export const Socket = (
133140
case "jobs":
134141
let cache = getCache();
135142
if (!cache) {
136-
await updateQueuesCache(redisOpts, opts);
143+
await updateQueuesCache(redisOpts, opts, redisClient);
137144
cache = getCache();
138145
if (!cache) {
139146
throw new Error("Unable to update queues");
@@ -177,12 +184,12 @@ export const Socket = (
177184

178185
switch (data.cmd) {
179186
case "ping":
180-
const pong = await ping(redisOpts, nodes);
187+
const pong = await ping(redisOpts, nodes, redisClient);
181188
respond(msg.id, startTime, pong);
182189
break;
183190
case "getConnection":
184191
{
185-
const queues = await updateQueuesCache(redisOpts, opts);
192+
const queues = await updateQueuesCache(redisOpts, opts, redisClient);
186193

187194
console.log(
188195
`${chalk.yellow("WebSocket:")} ${chalk.green(
@@ -204,23 +211,24 @@ export const Socket = (
204211
break;
205212
case "getQueues":
206213
{
207-
const queues = await updateQueuesCache(redisOpts, opts);
214+
const queues = await updateQueuesCache(redisOpts, opts, redisClient);
208215

209216
logSendingQueues(queues);
210217

211218
respond(msg.id, startTime, queues);
212219
}
213220
break;
214221
case "getInfo":
215-
const info = await getRedisInfo(redisOpts, nodes);
222+
const info = await getRedisInfo(redisOpts, nodes, redisClient);
216223
respond(msg.id, startTime, info);
217224
break;
218225

219226
case "getQueueType":
220227
const queueType = await execRedisCommand(
221228
redisOpts,
222229
(client) => getQueueType(data.name, data.prefix, client),
223-
nodes
230+
nodes,
231+
redisClient
224232
);
225233
respond(msg.id, startTime, { queueType });
226234
break;
@@ -249,7 +257,11 @@ export const Socket = (
249257
}
250258
};
251259

252-
function redisOptsFromConnection(connection: Connection): RedisOptions {
260+
function isRedisInstance(connection: Connection): connection is RedisConnection {
261+
return connection instanceof Redis || connection instanceof Cluster;
262+
}
263+
264+
function redisOptsFromConnection(connection: ConnectionOptions): RedisOptions {
253265
let opts: RedisOptions = {
254266
...pick(connection, [
255267
"host",

0 commit comments

Comments
 (0)