Skip to content

Commit e299aa5

Browse files
basic.get is asynchronous until the entire body has been received, only then subsequent messages are sent + first work in progress on implementing smarter buffers
1 parent b97222c commit e299aa5

File tree

7 files changed

+138
-6
lines changed

7 files changed

+138
-6
lines changed

amqpcpp.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
#include <amqpcpp/monitor.h>
3838

3939
// amqp types
40+
#include <amqpcpp/buffer.h>
41+
#include <amqpcpp/bytebuffer.h>
4042
#include <amqpcpp/field.h>
4143
#include <amqpcpp/numericfield.h>
4244
#include <amqpcpp/decimalfield.h>

include/buffer.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Buffer.h
3+
*
4+
* Interface that can be implemented by client applications and that
5+
* is parsed to the Connection::parse() method.
6+
*
7+
* Normally, the Connection::parse() method is fed with a byte
8+
* array. However, if you're receiving big frames, it may be inconvenient
9+
* to copy these big frames into continguous byte arrays, and you
10+
* prefer using objects that internally use linked lists or other
11+
* ways to store the bytes. In such sitations, you can implement this
12+
* interface and pass that to the connection.
13+
*
14+
* @author Emiel Bruijntjes <[email protected]>
15+
* @copyright 2014 Copernica BV
16+
*/
17+
18+
/**
19+
* Include guard
20+
*/
21+
#pragma once
22+
23+
/**
24+
* Namespace
25+
*/
26+
namespace AMQP {
27+
28+
/**
29+
* Class definition
30+
*/
31+
class Buffer
32+
{
33+
34+
35+
};
36+
37+
/**
38+
* End of namespace
39+
*/
40+
}
41+
42+

include/bytebuffer.h

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* ByteByffer.h
3+
*
4+
* Very simple implementation of the buffer class that simply wraps
5+
* around a buffer of bytes
6+
*
7+
* @author Emiel Bruijntjes <[email protected]>
8+
* @copyright 2014 Copernica BV
9+
*/
10+
11+
/**
12+
* Include guard
13+
*/
14+
#pragma once
15+
16+
/**
17+
* Open namespace
18+
*/
19+
namespace AMQP {
20+
21+
/**
22+
* Class definition
23+
*/
24+
class ByteBuffer : public Buffer
25+
{
26+
private:
27+
/**
28+
* The actual byte buffer
29+
* @var const char *
30+
*/
31+
const char *_data;
32+
33+
/**
34+
* Size of the buffer
35+
* @var size_t
36+
*/
37+
size_t _size;
38+
39+
public:
40+
/**
41+
* Constructor
42+
* @param data
43+
* @param size
44+
*/
45+
ByteBuffer(const char *data, size_t size) : _data(data), _size(size) {}
46+
47+
/**
48+
* Destructor
49+
*/
50+
virtual ~ByteBuffer() {}
51+
52+
};
53+
54+
/**
55+
* End namespace
56+
*/
57+
}

include/channel.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ class Channel
320320
bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, envelope); }
321321
bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, Envelope(message)); }
322322
bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, Envelope(message, size)); }
323+
bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation.publish(exchange, routingKey, Envelope(message, strlen(message))); }
323324

324325
/**
325326
* Set the Quality of Service (QOS) for this channel

include/connection.h

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,32 @@ class Connection
7777
*/
7878
size_t parse(const char *buffer, size_t size)
7979
{
80-
return _implementation.parse(buffer, size);
80+
//return _implementation.parse(ByteBuffer(buffer, size));
81+
return _implementation.parse(buffer, size);
8182
}
83+
84+
/**
85+
* Parse data that was recevied from RabbitMQ
86+
*
87+
* Every time that data comes in from RabbitMQ, you should call this method to parse
88+
* the incoming data, and let it handle by the AMQP library. This method returns the number
89+
* of bytes that were processed.
90+
*
91+
* If not all bytes could be processed because it only contained a partial frame, you should
92+
* call this same method later on when more data is available. The AMQP library does not do
93+
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
94+
* later call.
95+
*
96+
* This method accepts a buffer object. This is an interface that is defined by the AMQP
97+
* library, that can be implemented by you to allow faster access to a buffer.
98+
*
99+
* @param buffer buffer to decode
100+
* @return number of bytes that were processed
101+
*/
102+
//size_t parse(const Buffer &buffer)
103+
//{
104+
// return _implementation.parse(buffer);
105+
//}
82106

83107
/**
84108
* Close the connection

src/basicgetokframe.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ class BasicGetOKFrame : public BasicFrame
177177
// construct the message
178178
channel->message(*this);
179179

180-
// we're synchronized
181-
channel->synchronized();
180+
// notice that the channel is not yet synchronized here, because
181+
// we first have to receive the entire body
182182

183183
// done
184184
return true;

src/channelimpl.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -703,16 +703,22 @@ void ChannelImpl::reportMessage()
703703
// skip if there is no message
704704
if (!_message) return;
705705

706+
// after the report the channel may be destructed, monitor that
707+
Monitor monitor(this);
708+
709+
// synchronize the channel if this comes from a basic.get frame
710+
if (_message->consumer().empty()) synchronized();
711+
712+
// syncing the channel may destruct the channel
713+
if (!monitor.valid()) return;
714+
706715
// look for the consumer
707716
auto iter = _consumers.find(_message->consumer());
708717
if (iter == _consumers.end()) return;
709718

710719
// is this a valid callback method
711720
if (!iter->second) return;
712721

713-
// after the report the channel may be destructed, monitor that
714-
Monitor monitor(this);
715-
716722
// call the callback
717723
_message->report(iter->second);
718724

0 commit comments

Comments
 (0)