Skip to content
This repository was archived by the owner on Oct 6, 2023. It is now read-only.

Commit fec4263

Browse files
authored
Mon 6367 ipv4 bad flush 20.10.x (#477)
This patch is essentially an improvement on the tcp stream write/flush functions. If the write function cannot start because of too much retention, writing is begun by flush(). But it was unab writing properly. Moreover, there was a possibility to loop into an infinite loop because of a 0 value returne Refs: MON-6367
1 parent 5bd712d commit fec4263

File tree

4 files changed

+82
-35
lines changed

4 files changed

+82
-35
lines changed

doc/en/release_notes/20.10.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ The splitter class is now thread safe and does not need external locks anymore.
1616
It is also far less strict and allows some reading and some writing at the same
1717
time.
1818

19+
TCP
20+
===
21+
Writing on a tcp stream could slow down in case of many retention files. The
22+
issue was essentially in the flush internal function.
23+
1924
************
2025
Enhancements
2126
************

tcp/inc/com/centreon/broker/tcp/tcp_connection.hh

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
3333
asio::ip::tcp::socket _socket;
3434
asio::io_context::strand _strand;
3535

36-
std::mutex _data_m;
36+
std::mutex _error_m;
3737
asio::error_code _current_error;
3838

3939
std::mutex _exposed_write_queue_m;
4040
std::queue<std::vector<char>> _exposed_write_queue;
4141
std::queue<std::vector<char>> _write_queue;
42+
std::atomic_bool _write_queue_has_events;
4243
std::atomic_bool _writing;
4344

4445
std::atomic<int32_t> _acks;
@@ -53,8 +54,6 @@ class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
5354
std::atomic_bool _closed;
5455
std::string _peer;
5556

56-
std::condition_variable _is_writing_cv;
57-
5857
public:
5958
typedef std::shared_ptr<tcp_connection> pointer;
6059
tcp_connection(asio::io_context& io_context,

tcp/src/tcp_connection.cc

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ tcp_connection::tcp_connection(asio::io_context& io_context,
3939
uint16_t port)
4040
: _socket(io_context),
4141
_strand(io_context),
42+
_write_queue_has_events(false),
4243
_writing(false),
4344
_acks(0),
4445
_reading(false),
@@ -80,10 +81,27 @@ asio::ip::tcp::socket& tcp_connection::socket() {
8081
* @return 0.
8182
*/
8283
int32_t tcp_connection::flush() {
83-
while (_writing) {
84-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
84+
int32_t retval = _acks;
85+
if (_acks) {
86+
/* Do not set it to zero directly, maybe it has already been incremented by
87+
* another operation */
88+
_acks -= retval;
89+
return retval;
90+
}
91+
{
92+
std::lock_guard<std::mutex> lck(_error_m);
93+
if (_current_error) {
94+
std::string msg{std::move(_current_error.message())};
95+
_current_error.clear();
96+
throw exceptions::msg() << msg;
97+
}
8598
}
86-
return 0;
99+
if (_write_queue_has_events && !_writing) {
100+
_writing = true;
101+
// The strand is useful because of the flush() method.
102+
_strand.context().post(std::bind(&tcp_connection::writing, ptr()));
103+
}
104+
return retval;
87105
}
88106

89107
/**
@@ -150,31 +168,32 @@ static std::string debug_buf(const char* data, int32_t size) {
150168
*/
151169
int32_t tcp_connection::write(const std::vector<char>& v) {
152170
{
153-
std::lock_guard<std::mutex> lck(_data_m);
171+
std::lock_guard<std::mutex> lck(_error_m);
154172
if (_current_error) {
155173
std::string msg{std::move(_current_error.message())};
156174
_current_error.clear();
157175
throw exceptions::msg() << msg;
158176
}
177+
}
159178

160-
{
161-
std::lock_guard<std::mutex> lck(_exposed_write_queue_m);
162-
_exposed_write_queue.push(v);
163-
}
179+
{
180+
std::lock_guard<std::mutex> lck(_exposed_write_queue_m);
181+
_exposed_write_queue.push(v);
182+
}
164183

165-
// If the queue is not empty and the writing work is not started, we start
166-
// it.
167-
if (!_writing) {
168-
_writing = true;
169-
// The strand is useful because of the flush() method.
170-
_strand.context().post(std::bind(&tcp_connection::writing, ptr()));
171-
}
184+
// If the queue is not empty and the writing work is not started, we start
185+
// it.
186+
if (!_writing) {
187+
_writing = true;
188+
// The strand is useful because of the flush() method.
189+
_strand.context().post(std::bind(&tcp_connection::writing, ptr()));
172190
}
173191

174192
int32_t retval = _acks;
175193
/* Do not set it to zero directly, maybe it has already been incremented by
176194
* another operation */
177195
_acks -= retval;
196+
178197
return retval;
179198
}
180199

@@ -188,11 +207,13 @@ int32_t tcp_connection::write(const std::vector<char>& v) {
188207
* * Launches the async_write.
189208
*/
190209
void tcp_connection::writing() {
191-
if (_write_queue.empty()) {
210+
if (!_write_queue_has_events) {
192211
std::lock_guard<std::mutex> lck(_exposed_write_queue_m);
193212
std::swap(_write_queue, _exposed_write_queue);
213+
_write_queue_has_events = !_write_queue.empty();
194214
}
195-
if (_write_queue.empty()) {
215+
if (!_write_queue_has_events) {
216+
196217
_writing = false;
197218
return;
198219
}
@@ -211,14 +232,14 @@ void tcp_connection::writing() {
211232
void tcp_connection::handle_write(const asio::error_code& ec) {
212233
if (ec) {
213234
log_v2::tcp()->error("Error while writing on tcp socket: {}", ec.message());
214-
std::lock_guard<std::mutex> lck(_data_m);
235+
std::lock_guard<std::mutex> lck(_error_m);
215236
_current_error = ec;
216237
_writing = false;
217238
} else {
218-
std::lock_guard<std::mutex> lck(_data_m);
219239
++_acks;
220240
_write_queue.pop();
221-
if (!_write_queue.empty()) {
241+
_write_queue_has_events = !_write_queue.empty();
242+
if (_write_queue_has_events) {
222243
// The strand is useful because of the flush() method.
223244
asio::async_write(_socket, asio::buffer(_write_queue.front()),
224245
_strand.wrap(std::bind(&tcp_connection::handle_write,
@@ -291,7 +312,7 @@ void tcp_connection::close() {
291312
*/
292313
std::vector<char> tcp_connection::read(time_t timeout_time, bool* timeout) {
293314
{
294-
std::lock_guard<std::mutex> lck(_data_m);
315+
std::lock_guard<std::mutex> lck(_error_m);
295316
if (_current_error) {
296317
std::string msg{std::move(_current_error.message())};
297318
_current_error.clear();

tcp/test/acceptor.cc

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ TEST(TcpAcceptor, Nominal) {
111111
}
112112
}
113113
s_centengine->write(data_write);
114-
s_centengine->flush();
114+
int retry = 10;
115+
while (retry-- && s_centengine->flush() == 0)
116+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
115117
});
116118

117119
centengine.join();
@@ -158,7 +160,9 @@ TEST(TcpAcceptor, QuestionAnswer) {
158160
}
159161
s_cbd->write(data_write);
160162
}
161-
s_cbd->flush();
163+
int retry = 10;
164+
while (retry-- && s_cbd->flush() == 0)
165+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
162166
});
163167

164168
std::thread centengine([] {
@@ -303,7 +307,9 @@ TEST(TcpAcceptor, MultiNominal) {
303307
}
304308
}
305309
s_centengine->write(data_write);
306-
s_centengine->flush();
310+
int retry = 10;
311+
while (retry-- && s_centengine->flush() == 0)
312+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
307313
std::unique_lock<std::mutex> lock(cbd_m);
308314
cbd_cv.wait(lock, [&cbd_finished] { return cbd_finished; });
309315
});
@@ -348,7 +354,9 @@ TEST(TcpAcceptor, NominalReversed) {
348354
}
349355
}
350356
s_centengine->write(data_write);
351-
s_centengine->flush();
357+
int retry = 10;
358+
while (retry-- && s_centengine->flush() == 0)
359+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
352360
});
353361

354362
std::this_thread::sleep_for(std::chrono::milliseconds(500));
@@ -406,7 +414,9 @@ TEST(TcpAcceptor, OnePeer) {
406414
}
407415
}
408416
s_centengine->write(data_write);
409-
s_centengine->flush();
417+
int retry = 10;
418+
while (retry-- && s_centengine->flush() == 0)
419+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
410420
});
411421

412422
std::thread cbd([] {
@@ -500,7 +510,9 @@ TEST(TcpAcceptor, OnePeerReversed) {
500510
}
501511
}
502512
s_centengine->write(data_write);
503-
s_centengine->flush();
513+
int retry = 10;
514+
while (retry-- && s_centengine->flush() == 0)
515+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
504516
});
505517

506518
centengine.join();
@@ -537,7 +549,9 @@ TEST(TcpAcceptor, MultiOnePeer) {
537549
std::shared_ptr<io::raw> data_write = std::make_shared<io::raw>();
538550
data_write->append(std::string("Hello2!"));
539551
s_centengine->write(data_write);
540-
s_centengine->flush();
552+
int retry = 10;
553+
while (retry-- && s_centengine->flush() == 0)
554+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
541555
s_centengine.reset();
542556
i++;
543557
} else
@@ -621,7 +635,9 @@ TEST(TcpAcceptor, NominalRepeated) {
621635
std::cout << "engine 4 " << i << "\n";
622636
i++;
623637
}
624-
s_centengine->flush();
638+
int retry = 10;
639+
while (retry-- && s_centengine->flush() == 0)
640+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
625641
});
626642

627643
/* We start nb_steps instances of cbd one after the other. Each time, it
@@ -706,7 +722,9 @@ TEST(TcpAcceptor, Simple) {
706722
std::shared_ptr<io::data> data_read;
707723
data->append(std::string("TEST\n"));
708724
str->write(data);
709-
str->flush();
725+
int retry = 10;
726+
while (retry-- && str->flush() == 0)
727+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
710728
std::unique_lock<std::mutex> lock(m);
711729
cv.wait(lock, [&finish] { return finish; });
712730
});
@@ -856,7 +874,9 @@ TEST(TcpAcceptor, CloseRead) {
856874
std::shared_ptr<io::data> data_read;
857875
data->append(std::string("0"));
858876
str->write(data);
859-
str->flush();
877+
int retry = 10;
878+
while (retry-- && str->flush() == 0)
879+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
860880
}
861881
}};
862882
std::shared_ptr<io::stream> io;
@@ -936,7 +956,9 @@ TEST(TcpAcceptor, QuestionAnswerMultiple) {
936956
}
937957
s_cbd->write(data_write);
938958
}
939-
s_cbd->flush();
959+
int retry = 10;
960+
while (retry-- && s_cbd->flush() == 0)
961+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
940962
});
941963

942964
centengine.emplace_back([i] {

0 commit comments

Comments
 (0)