3
3
#include < orderbook.hpp>
4
4
#include " influxdb.hpp"
5
5
#include " spdlog/spdlog.h"
6
- #include < future>
7
6
#include < sys/types.h>
8
7
#include < unistd.h>
9
8
#include < pthread.h>
10
9
#include < map>
11
- #include < folly/concurrency/UnboundedQueue.h>
12
10
#include < boost/asio.hpp>
13
11
#include < boost/asio/thread_pool.hpp>
14
- // #include <boost/lockfree/queue.hpp >
12
+ #include < tbb/concurrent_queue.h >
15
13
16
14
namespace matching_engine
17
15
{
@@ -32,21 +30,22 @@ class consumer
32
30
}
33
31
void push (OrderPtr order)
34
32
{
35
- queue_.enqueue (std::move (order));
33
+ queue_.push (std::move (order));
36
34
}
37
35
void register_market (std::string_view market)
38
36
{
39
37
markets.emplace (market, market);
40
38
}
41
39
void listen ()
42
40
{
43
- for (const auto & [name, _] : markets) {
44
- console_->info (" Consumer of {} started @{}" , name, (pid_t ) syscall (SYS_gettid));
45
- }
41
+ if (console_ != nullptr )
42
+ for (const auto & [name, _] : markets) {
43
+ console_->info (" Consumer of {} started @{}" , name, (pid_t ) syscall (SYS_gettid));
44
+ }
46
45
OrderPtr order;
47
46
auto last_log = Time::now ();
48
47
while (should_consume_ ()) {
49
- queue_.dequeue (order);
48
+ queue_.pop (order);
50
49
auto &ob = markets.at (order->market_name ());
51
50
const auto start = Time::now ();
52
51
ob.match (std::move (order));
@@ -66,6 +65,7 @@ class consumer
66
65
});
67
66
last_log = start;
68
67
}
68
+
69
69
}
70
70
}
71
71
private:
@@ -74,8 +74,7 @@ class consumer
74
74
return !should_exit_ or !queue_.empty ();
75
75
}
76
76
std::unordered_map<std::string_view, OrderBook> markets;
77
- folly::UnboundedQueue<OrderPtr, true , true , true , 16 > queue_;
78
- // boost::lockfree::queue<OrderPtr> queue_;
77
+ tbb::concurrent_bounded_queue<OrderPtr> queue_;
79
78
std::atomic_bool should_exit_;
80
79
std::shared_ptr<spdlog::logger> console_;
81
80
};
@@ -85,8 +84,8 @@ class dispatcher
85
84
public:
86
85
dispatcher () = default ;
87
86
dispatcher (const dispatcher&) = delete ;
88
- dispatcher (std::shared_ptr<spdlog::logger> console ,
89
- std::vector<std::string_view> markets ,
87
+ dispatcher (std::vector<std::string_view> markets ,
88
+ std::shared_ptr<spdlog::logger> console = nullptr ,
90
89
const uint64_t available_cores = std::max(1u , std::thread::hardware_concurrency() - 1 )):
91
90
pool_{available_cores},
92
91
console_{console}
@@ -121,10 +120,17 @@ class dispatcher
121
120
auto & market_consumer = market_registry_.at (order->market_name ());
122
121
market_consumer->push (std::move (order));
123
122
}
124
- std::string_view registered_market_name (std::string_view market) const
123
+ std::string_view registered_market_name (const std::string_view market) const
125
124
{
126
125
return market_registry_.find (market)->first ;
127
126
}
127
+ void shutdown ()
128
+ {
129
+ for (auto const & [_, c] : market_registry_) {
130
+ c->shutdown ();
131
+ }
132
+ pool_.join ();
133
+ }
128
134
private:
129
135
// tbb::concurrent_unordered_map<std::string, std::shared_ptr<consumer>> market_registry_;
130
136
std::unordered_map<std::string_view, std::shared_ptr<consumer>> market_registry_;
0 commit comments