Skip to content

Commit 8ff188f

Browse files
authored
Merge pull request #92 from moleculerjs/issue-71
Add channel name to Context
2 parents d796748 + efa08da commit 8ff188f

File tree

3 files changed

+365
-4
lines changed

3 files changed

+365
-4
lines changed

examples/channel-name/index.js

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
"use strict";
2+
3+
const { ServiceBroker } = require("moleculer");
4+
const ChannelsMiddleware = require("../..").Middleware;
5+
const TracingMiddleware = require("../..").Tracing;
6+
7+
let c = 1;
8+
9+
// Create broker
10+
const broker = new ServiceBroker({
11+
logLevel: {
12+
CHANNELS: "info",
13+
"**": "info"
14+
},
15+
middlewares: [
16+
ChannelsMiddleware({
17+
// adapter: {
18+
// type: "Fake"
19+
// },
20+
/*adapter: {
21+
type: "Kafka",
22+
options: { kafka: { brokers: ["localhost:9093"] } }
23+
},*/
24+
/*adapter: {
25+
type: "AMQP"
26+
},*/
27+
adapter: {
28+
type: "NATS"
29+
},
30+
/*
31+
adapter: {
32+
type: "Redis",
33+
options: {
34+
redis: "localhost:6379"
35+
//serializer: "MsgPack"
36+
}
37+
},
38+
*/
39+
context: true
40+
}),
41+
TracingMiddleware()
42+
],
43+
replCommands: [
44+
// {
45+
// command: "publish",
46+
// alias: ["p"],
47+
// async action(broker, args) {
48+
// const payload = {
49+
// id: ++c,
50+
// name: "Jane Doe",
51+
// pid: process.pid
52+
// };
53+
// await broker.call(
54+
// "publisher.publish",
55+
// { payload, headers: { a: "123" } },
56+
// {
57+
// meta: {
58+
// loggedInUser: {
59+
// id: 12345,
60+
// name: "John Doe",
61+
// roles: ["admin"],
62+
// status: true
63+
// }
64+
// }
65+
// }
66+
// );
67+
// }
68+
// }
69+
]
70+
});
71+
72+
broker.createService({
73+
name: "publisher",
74+
actions: {
75+
async publish(ctx) {
76+
const parentChannelName = ctx.parentChannelName;
77+
const level = ctx.level;
78+
const caller = ctx.caller;
79+
const msg = `Flow level: ${level}, Type: Action, Name: ${ctx.action.name}, Caller: ${caller}, Parent channel: ${parentChannelName}`;
80+
this.logger.info(msg);
81+
82+
await broker.sendToChannel("my.topic.level.2", ctx.params.payload, {
83+
ctx,
84+
headers: ctx.params.headers
85+
});
86+
87+
await broker.Promise.delay(1000);
88+
}
89+
}
90+
});
91+
92+
broker.createService({
93+
name: "sub2",
94+
channels: {
95+
"my.topic.level.2": {
96+
async handler(ctx, raw) {
97+
const parentChannelName = ctx.parentChannelName;
98+
const level = ctx.level;
99+
const caller = ctx.caller;
100+
const msg = `Flow level: ${level}, Type: Channel, Name: ${ctx.channelName}, Caller: ${caller}, Parent channel: ${parentChannelName}`;
101+
this.logger.info(msg);
102+
103+
await Promise.delay(100);
104+
105+
const headers = this.broker.channelAdapter.parseMessageHeaders(raw);
106+
107+
await broker.sendToChannel("my.topic.level.3", ctx.params, {
108+
ctx,
109+
headers
110+
});
111+
}
112+
}
113+
}
114+
});
115+
116+
broker.createService({
117+
name: "sub3",
118+
channels: {
119+
"my.topic.level.3": {
120+
async handler(ctx, raw) {
121+
const parentChannelName = ctx.parentChannelName;
122+
const level = ctx.level;
123+
const caller = ctx.caller;
124+
const msg = `Flow level: ${level}, Type: Channel, Name: ${ctx.channelName}, Caller: ${caller}, Parent channel: ${parentChannelName}`;
125+
this.logger.info(msg);
126+
127+
await Promise.delay(100);
128+
129+
const headers = this.broker.channelAdapter.parseMessageHeaders(raw);
130+
131+
await broker.sendToChannel("my.topic.level.4", ctx.params, {
132+
ctx,
133+
headers
134+
});
135+
}
136+
}
137+
}
138+
});
139+
140+
broker.createService({
141+
name: "sub4",
142+
channels: {
143+
"my.topic.level.4": {
144+
async handler(ctx, raw) {
145+
const parentChannelName = ctx.parentChannelName;
146+
const level = ctx.level;
147+
const caller = ctx.caller;
148+
const msg = `Flow level: ${level}, Type: Channel, Name: ${ctx.channelName}, Caller: ${caller}, Parent channel: ${parentChannelName}`;
149+
this.logger.info(msg);
150+
151+
await Promise.delay(100);
152+
153+
const headers = this.broker.channelAdapter.parseMessageHeaders(raw);
154+
155+
await broker.sendToChannel("my.topic.level.5", ctx.params, {
156+
ctx,
157+
headers
158+
});
159+
}
160+
},
161+
162+
"my.topic.level.5": {
163+
async handler(ctx, raw) {
164+
const parentChannelName = ctx.parentChannelName;
165+
const level = ctx.level;
166+
const caller = ctx.caller;
167+
const msg = `Flow level: ${level}, Type: Channel, Name: ${ctx.channelName}, Caller: ${caller}, Parent channel: ${parentChannelName}`;
168+
this.logger.info(msg);
169+
170+
await Promise.delay(100);
171+
172+
await ctx.call("test.demo.level.6", null, { parentCtx: ctx });
173+
}
174+
}
175+
}
176+
});
177+
178+
broker.createService({
179+
name: "test",
180+
actions: {
181+
"demo.level.6": {
182+
async handler(ctx) {
183+
const channelName = ctx?.options?.parentCtx?.channelName;
184+
const level = ctx.level;
185+
const caller = ctx.caller;
186+
const msg = `Flow level: ${level}, Type: Action, Name: ${ctx.action.name}, Caller: ${caller}, Channel name: ${channelName}`;
187+
this.logger.info(msg);
188+
// this.logger.info("Demo service called", ctx);
189+
}
190+
}
191+
}
192+
});
193+
194+
broker
195+
.start()
196+
.then(async () => {
197+
broker.repl();
198+
199+
const payload = {
200+
id: ++c,
201+
name: "Jane Doe",
202+
pid: process.pid
203+
};
204+
205+
broker.logger.info("Initializing the flow...");
206+
207+
await broker.call(
208+
"publisher.publish",
209+
{ payload, headers: { a: "123" } },
210+
{
211+
meta: {
212+
loggedInUser: {
213+
id: 12345,
214+
name: "John Doe",
215+
roles: ["admin"],
216+
status: true
217+
}
218+
}
219+
}
220+
);
221+
})
222+
.catch(err => {
223+
broker.logger.error(err);
224+
broker.stop();
225+
});

src/index.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ module.exports = function ChannelsMiddleware(mwOpts) {
192192
if (opts.ctx.service) {
193193
opts.headers.$caller = opts.ctx.service.fullName;
194194
}
195+
196+
if (opts.ctx.channelName) {
197+
opts.headers.$parentChannelName = opts.ctx.channelName;
198+
}
199+
200+
// Serialize meta and headers
195201
opts.headers.$meta = adapter.serializer
196202
.serialize(opts.ctx.meta)
197203
.toString("base64");
@@ -286,7 +292,7 @@ module.exports = function ChannelsMiddleware(mwOpts) {
286292
// Wrap the handler with context creating
287293
if (chan.context) {
288294
wrappedHandler = (msg, raw) => {
289-
let parentCtx, caller, meta, ctxHeaders;
295+
let parentCtx, caller, meta, ctxHeaders, parentChannelName;
290296
const headers = adapter.parseMessageHeaders(raw);
291297
if (headers) {
292298
if (headers.$requestID) {
@@ -297,6 +303,7 @@ module.exports = function ChannelsMiddleware(mwOpts) {
297303
level: headers.$level ? parseInt(headers.$level) : 0
298304
};
299305
caller = headers.$caller;
306+
parentChannelName = headers.$parentChannelName;
300307
}
301308

302309
if (headers.$meta) {
@@ -319,6 +326,12 @@ module.exports = function ChannelsMiddleware(mwOpts) {
319326
headers: ctxHeaders
320327
});
321328

329+
ctx.channelName = chan.name;
330+
ctx.parentChannelName = parentChannelName;
331+
332+
// Attach current service that has the channel handler to the context
333+
ctx.service = svc;
334+
322335
return handler2(ctx, raw);
323336
};
324337
}

0 commit comments

Comments
 (0)