Skip to content

Commit 0f2942a

Browse files
committed
Add per host queues.
1 parent 1972ca2 commit 0f2942a

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -220,19 +220,26 @@ public DialogueChannel build() {
220220

221221
ImmutableList<Channel> perHostChannels = IntStream.range(0, channels.size())
222222
.mapToObj(index -> new ChannelAndFactory() {
223+
224+
private final LimitedChannel stickyLimitedChannel =
225+
StickyConcurrencyLimitedChannel.createForQueueKey(
226+
nodeSelectionChannel, cf.channelName());
227+
private final Channel queueOverride =
228+
QueuedChannel.createPerHost(cf, stickyLimitedChannel, index);
229+
223230
@Override
224231
public EndpointChannel endpoint(Endpoint endpoint) {
225232
EndpointChannel endpointChannel = channelFactory.endpoint(endpoint);
226233
return request -> {
234+
request.attachments().put(QueueAttachments.QUEUE_OVERRIDE, queueOverride);
227235
nodeSelectionChannel.routeToHost(index, request);
228236
return endpointChannel.execute(request);
229237
};
230238
}
231239

232240
@Override
233241
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
234-
nodeSelectionChannel.routeToHost(index, request);
235-
return channelFactory.endpoint(endpoint).execute(request);
242+
return endpoint(endpoint).execute(request);
236243
}
237244
})
238245
.collect(ImmutableList.toImmutableList());

dialogue-core/src/main/java/com/palantir/dialogue/core/QueuedChannel.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ static QueuedChannel createPerHost(Config cf, LimitedChannel delegate, int hostI
106106
return new QueuedChannel(
107107
delegate,
108108
cf.channelName(),
109-
stickyInstrumentation(
110-
DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()), cf.channelName()),
109+
perHostInstrumentation(
110+
DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()), cf.channelName(), hostIndex),
111111
cf.maxQueueSize());
112112
}
113113

@@ -401,15 +401,22 @@ public Timer requestQueuedTime() {
401401

402402
static QueuedChannelInstrumentation perHostInstrumentation(
403403
DialogueClientMetrics metrics, String channelName, int hostIndex) {
404+
String hostIndexString = Integer.toString(hostIndex);
404405
return new QueuedChannelInstrumentation() {
405406
@Override
406407
public Counter requestsQueued() {
407-
return metrics.requestsPerhostQueued().channelName(channelName).hostIndex(hostIndex);
408+
return metrics.requestsPerhostQueued()
409+
.channelName(channelName)
410+
.hostIndex(hostIndexString)
411+
.build();
408412
}
409413

410414
@Override
411415
public Timer requestQueuedTime() {
412-
return metrics.requestStickyQueuedTime(channelName);
416+
return metrics.requestPerhostQueuedTime()
417+
.channelName(channelName)
418+
.hostIndex(hostIndexString)
419+
.build();
413420
}
414421
};
415422
}

0 commit comments

Comments
 (0)