Skip to content
This repository was archived by the owner on Sep 3, 2024. It is now read-only.

Refactor UDP transport #111

Merged
merged 12 commits into from
Jan 16, 2024
12 changes: 8 additions & 4 deletions include/transport/priority_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ namespace qtransport {
};

public:
~priority_queue() {
std::lock_guard<std::mutex> _(_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why lock the mutex if there is nothing to do? Is this to ensure there are no threads inside doing work? If that's a risk, there might still be a risk of a thread in the code, but not fully out of a function or perhaps entering the code and not yet reaching a lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Destruct starts from another thread, which caused a race condition on a pop/push that was happening at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this object is held with a shared pointer. Whatever threads are calling into it should have a valid pointer (i.e., the object's destructor has not been called). Do you know what threads? While this might appear to fix it, it concerns me that we're not first terminating those threads before attempting to destroy the priority queue object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed it for now. This change was a fix back before the shutdown with the picoquic transport. The segfault was caused by the priority queue destructor, but it is a valid question of why was the order not maintained. I've created issue #112 to work this issue.

}

/**
* Construct a priority queue
* @param tick_service Shared pointer to tick_service service
Expand Down Expand Up @@ -154,13 +158,13 @@ namespace qtransport {
/**
* @brief Clear queue
*/
void clear() {
void clear()
{
std::lock_guard<std::mutex> _(_mutex);

for (auto& tqueue: _queue) {
if (tqueue && !tqueue->empty()) {
for (auto& tqueue : _queue) {
if (tqueue && !tqueue->empty())
tqueue->clear();
}
}
}

Expand Down
7 changes: 3 additions & 4 deletions include/transport/time_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <thread>
#include <type_traits>
#include <vector>
#include <sys/select.h>

namespace qtransport {

Expand Down Expand Up @@ -95,7 +96,7 @@ namespace qtransport {

timeval sleep_time = {.tv_sec = 0, .tv_usec = interval_us};
while (!_stop) {
select(1, NULL, NULL, NULL, &sleep_time);
select(0, NULL, NULL, NULL, &sleep_time);
sleep_time.tv_usec = interval_us;
++_ticks;
}
Expand Down Expand Up @@ -314,9 +315,7 @@ namespace qtransport {
}
}

private:


private:
/**
* @brief Based on current time, adjust and move the bucket index with time
* (sliding window)
Expand Down
26 changes: 18 additions & 8 deletions src/transport_picoquic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <cstring>
#include <ctime>
#include <iostream>
#include <netdb.h>
#include <thread>
#include <unistd.h>

Expand Down Expand Up @@ -213,7 +212,7 @@ int pq_event_cb(picoquic_cnx_t* pq_cnx,
transport->logger->info << "Received RESET stream; conn_id: " << data_ctx->conn_id
<< " data_ctx_id: " << data_ctx->data_ctx_id
<< " stream_id: " << stream_id
<< " RX buf drops: " << data_ctx->metrics.tx_buffer_drops << std::flush;
<< " RX buf drops: " << data_ctx->metrics.rx_buffer_drops << std::flush;

if (!data_ctx->is_default_context) {
transport->deleteDataContext(conn_id, data_ctx->data_ctx_id);
Expand Down Expand Up @@ -1035,7 +1034,7 @@ PicoQuicTransport::send_stream_bytes(DataContext* data_ctx, uint8_t* bytes_ctx,

if (max_len < 5) {
// Not enough bytes to send
logger->debug << "Not enough bytes to send stream size header, waiting for next callback. sid: "
logger->debug << "Not enough bytes to send stream size header, waiting for next callback. data_ctx_id: "
<< data_ctx->current_stream_id << std::flush;
return;
}
Expand Down Expand Up @@ -1209,7 +1208,7 @@ void PicoQuicTransport::on_recv_stream_bytes(DataContext* data_ctx, uint64_t str

if (rx_buf.object_hdr_size < 4) {

uint16_t len_to_copy = length >= 4 ? 4 - rx_buf.object_hdr_size : 4;
uint16_t len_to_copy = length > 4 ? 4 - rx_buf.object_hdr_size : length - rx_buf.object_hdr_size;


std::memcpy(&rx_buf.object_size + rx_buf.object_hdr_size,
Expand All @@ -1219,6 +1218,8 @@ void PicoQuicTransport::on_recv_stream_bytes(DataContext* data_ctx, uint64_t str

if (rx_buf.object_hdr_size < 4) {
logger->debug << "Stream header not complete. hdr " << rx_buf.object_hdr_size
<< " conn_id: " << data_ctx->conn_id
<< " data_ctx_id: " << data_ctx->data_ctx_id
<< " len_to_copy: " << len_to_copy
<< " length: " << length
<< std::flush;
Expand All @@ -1236,8 +1237,9 @@ void PicoQuicTransport::on_recv_stream_bytes(DataContext* data_ctx, uint64_t str

if (rx_buf.object_size > 40000000L) { // Safety check
logger->warning << "on_recv_stream_bytes stream_id: " << stream_id
<< " data length is too large: "
<< std::to_string(rx_buf.object_size)
<< " conn_id: " << data_ctx->conn_id
<< " data_ctx_id: " << data_ctx->data_ctx_id
<< " data length is too large: " << rx_buf.object_size
<< std::flush;

rx_buf.reset_buffer();
Expand Down Expand Up @@ -1303,7 +1305,7 @@ void PicoQuicTransport::on_recv_stream_bytes(DataContext* data_ctx, uint64_t str

bool too_many_in_queue = false;
if (cbNotifyQueue.size() > 200) {
logger->warning << "on_recv_stream_bytes sid: " << data_ctx->current_stream_id << "cbNotifyQueue size" << cbNotifyQueue.size()
logger->warning << "on_recv_stream_bytes data_ctx_id: " << data_ctx->current_stream_id << "cbNotifyQueue size" << cbNotifyQueue.size()
<< std::flush;
}

Expand All @@ -1318,7 +1320,15 @@ void PicoQuicTransport::on_recv_stream_bytes(DataContext* data_ctx, uint64_t str
}

if (length > 0) {
logger->debug << "on_stream_bytes has remaining bytes: " << length << std::flush;
logger->debug << "on_recv_stream_bytes has remaining bytes: " << length
<< " conn_id: " << data_ctx->conn_id
<< " data_ctx_id: " << data_ctx->data_ctx_id
<< " stream_id: " << data_ctx->current_stream_id
<< " rx_hdr_sz: " << rx_buf.object_hdr_size
<< " rx_obj_offset: " << rx_buf.object_offset
<< " rx_obj_sz: " << rx_buf.object_size
<< std::flush;

on_recv_stream_bytes(data_ctx, stream_id, bytes_p, length);
}
}
Expand Down
Loading