Skip to content

Commit 5b0b79f

Browse files
committed
emit sendable event when capacity becomes available in outgoing session buffer
1 parent a068778 commit 5b0b79f

File tree

2 files changed

+48
-2
lines changed

2 files changed

+48
-2
lines changed

lib/session.js

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -533,11 +533,11 @@ Session.prototype.find_link = function (filter) {
533533
};
534534

535535
Session.prototype.each_receiver = function (action, filter) {
536-
this.each_link(util.receiver_filter(filter));
536+
this.each_link(action, util.receiver_filter(filter));
537537
};
538538

539539
Session.prototype.each_sender = function (action, filter) {
540-
this.each_link(util.sender_filter(filter));
540+
this.each_link(action, util.sender_filter(filter));
541541
};
542542

543543
Session.prototype.each_link = function (action, filter) {
@@ -587,13 +587,25 @@ Session.prototype.is_closed = function () {
587587
return this.connection.is_closed() || this.is_itself_closed();
588588
};
589589

590+
function notify_sendable(sender) {
591+
sender.dispatch('sendable', sender._context());
592+
}
593+
594+
function is_sender_sendable(sender) {
595+
return sender.is_open() && sender.sendable();
596+
}
597+
590598
Session.prototype._process = function () {
591599
do {
592600
if (this.state.need_open()) {
593601
this.output(this.local.begin);
594602
}
595603

604+
var was_blocked = this.outgoing.deliveries.available() === 0;
596605
this.outgoing.process();
606+
if (was_blocked && this.outgoing.deliveries.available()) {
607+
this.each_sender(notify_sendable, is_sender_sendable);
608+
}
597609
this.incoming.process(this);
598610
for (var k in this.links) {
599611
this.links[k]._process();

test/links.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,4 +757,38 @@ describe('miscellaneous', function() {
757757
done();
758758
});
759759
});
760+
761+
it('sends presettled messages when sendable', function(done: Function) {
762+
server.options.credit_window = 0;
763+
var outgoing = [] as any;
764+
var incoming = [] as any;
765+
var sent = 0;
766+
var settled = 0;
767+
for (var i = 0; i < 10000; i++) {
768+
outgoing.push('message-' + i);
769+
}
770+
server.on('receiver_open', function (context: rhea.EventContext) {
771+
context.receiver!.add_credit(10000);
772+
});
773+
server.on('message', function (context: rhea.EventContext) {
774+
incoming.push(context.message!.body);
775+
if (incoming.length === outgoing.length) {
776+
context.receiver!.close();
777+
}
778+
});
779+
var conn = client.connect(listener.address() as any);
780+
var s = conn.open_sender({snd_settle_mode:1});
781+
client.on('sendable', function (context: rhea.EventContext) {
782+
while (context.sender!.sendable() && sent < outgoing.length) {
783+
context.sender!.send({body:outgoing[sent++]});
784+
}
785+
});
786+
client.on('sender_close', function (context: rhea.EventContext) {
787+
context.connection.close();
788+
});
789+
client.on('connection_close', function (context: rhea.EventContext) {
790+
assert.deepEqual(incoming, outgoing);
791+
done();
792+
});
793+
});
760794
});

0 commit comments

Comments
 (0)