-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Design and development of the concurrency control module for the new agent #24
Comments
UpdateI've written some initial code exploring the usage of coroutines for the concurrency control module. The code demonstrates how to handle HTTP requests using Boost.Asio and C++20 coroutines. Below is the code snippet: boost::asio::awaitable<void> send_http_request(
const std::string host,
const std::string port,
const std::string target,
std::queue<std::string>& messageQueue)
{
auto executor = co_await boost::asio::this_coro::executor;
boost::asio::steady_timer timer(executor);
boost::asio::ip::tcp::resolver resolver(executor);
while (true)
{
boost::beast::error_code ec;
boost::asio::ip::tcp::socket socket(executor);
auto const results = co_await resolver.async_resolve(host, port, boost::asio::use_awaitable);
co_await boost::asio::async_connect(socket, results, boost::asio::use_awaitable);
while (true)
{
std::string message;
{
if (messageQueue.empty())
{
timer.expires_after(100ms);
co_await timer.async_wait(boost::asio::use_awaitable);
continue;
}
message = std::move(messageQueue.front());
messageQueue.pop();
}
// print message and thread id
std::cout << "Message: " << message << " Thread ID: " << std::this_thread::get_id() << std::endl;
// HTTP request
boost::beast::http::request<boost::beast::http::string_body> req {
boost::beast::http::verb::post, target, 11
};
req.set(boost::beast::http::field::host, host);
req.set(boost::beast::http::field::user_agent, BOOST_boost::beast_VERSION_STRING);
req.body() = message;
req.prepare_payload();
co_await boost::beast::http::async_write(
socket,
req,
boost::asio::redirect_error(boost::asio::use_awaitable, ec)
);
if (ec)
{
socket.close();
break;
}
// HTTP response
boost::beast::flat_buffer buffer;
boost::beast::http::response<boost::beast::http::dynamic_body> res;
co_await boost::beast::http::async_read(
socket,
buffer,
res,
boost::asio::redirect_error(boost::asio::use_awaitable, ec)
);
if (ec)
{
socket.close();
break;
}
// if there's no error pop sent messages from the queue
}
}
}
int main()
{
std::queue<std::string> statelessQueue;
std::queue<std::string> statefulQueue;
for (int i = 0; i < 100; i++)
{
statelessQueue.push("Stateless message " + std::to_string(i));
}
for (int i = 0; i < 100; i++)
{
statefulQueue.push("Stateful message " + std::to_string(i));
}
// Spawn coroutines for each message type
// With only one io_context, the coroutines will run concurrently
// on the same thread
boost::asio::io_context io_context;
boost::asio::co_spawn(
io_context,
send_http_request("localhost", "8080", "/stateless", statelessQueue),
boost::asio::detached
);
boost::asio::co_spawn(
io_context,
send_http_request("localhost", "8080", "/stateful", statefulQueue),
boost::asio::detached
);
io_context.run();
return 0;
} ConclusionsThe coroutines run concurrently on the same thread, ensuring that they do not block each other while waiting for a response from the server. This approach effectively avoids parallelism (multithreading) while maintaining concurrency. Next StepsDerive a class design that encapsulates the coroutine-based implementation, making it easier to use and maintain, or to replace for a thread based one. Ensure the class design hides implementation specifics, providing a clean interface for scheduling and managing tasks. Make it possible to use more than one thread if needed. (This example is just illustrative and is still missing key aspects) |
UpdateWe've made further progress on the design of the new concurrency control module for the agent. Here are the tasks and components identified so far and whether they need to run on it's own thread as independent processes: AgentTasks:
All of the message processing subtasks from the |
UpdateAfter investigating how to extend Libcurl’s curl_multi interface is designed for handling multiple transfers simultaneously in a non-blocking way. To integrate this with coroutines, you need to manage file descriptors and ensure that network operations do not block. This involves setting up an event loop, such as epoll on Linux, to monitor these file descriptors and resume the appropriate coroutine when data is ready. However, using epoll is platform-specific, adding another layer of complexity for cross-platform support. Additionally, avoiding polling for results requires a more sophisticated setup where the event loop efficiently waits for events without continuously checking (polling) the state, which adds complexity. Given these challenges, Boost.Beast/Asio provides a more straightforward and integrated solution for coroutine support, as it is designed to work seamlessly with C++20 coroutines out of the box. This reflects my current understanding of the problem, but feel free to provide any feedback or suggestions on this. |
UpdateI'm working on an idea for a class that will act as a task scheduler capable of handling both regular tasks and coroutine tasks. The class is templated to allow flexibility in specifying the coroutine task type -an example uses Boost.Asio-. The goal is to create a generic interface (IAgentTaskManager) that can be implemented with different libraries if needed. Modules and other Agent processes could be enqueued as regular tasks, together with other coroutines. In the Boost implementation, Asio's io_context manages the scheduling of the tasks among the available threads. template <typename CoroutineTaskType>
class IAgentTaskManager {
public:
virtual ~IAgentTaskManager() = default;
virtual void start(size_t numThreads) = 0;
virtual void stop() = 0;
virtual void enqueueTask(std::function<void()> task) = 0;
virtual void enqueueTask(CoroutineTaskType task) = 0;
}; An example implementation using boost: class AgentTaskManager : public IAgentTaskManager<boost::asio::awaitable<void>>
{
public:
AgentTaskManager() : m_work(m_ioContext)
{
}
void start(size_t numThreads) override
{
stop();
for (size_t i = 0; i < numThreads; ++i)
{
m_threads.emplace_back(
[this] ()
{
m_ioContext.run();
}
);
}
}
void stop() override
{
m_ioContext.stop();
for (std::thread &thread : m_threads)
{
if (thread.joinable())
{
thread.join();
}
}
m_threads.clear();
m_ioContext.reset();
}
void enqueueTask(std::function<void()> task) override
{
boost::asio::post(m_ioContext, std::move(task));
}
void enqueueTask(boost::asio::awaitable<void> task) override
{
boost::asio::co_spawn(
m_ioContext,
std::move(task),
boost::asio::detached
);
}
private:
boost::asio::io_context m_ioContext;
boost::asio::io_context::work m_work;
std::vector<std::thread> m_threads;
}; |
UpdateCreated a branch with #33 as starting point (linked to this issue). I brought in the code from above. Currently working on adding basic tests for the implemented methods and the stateless and stateful tasks as enqueued coroutines. |
Update
|
Description
As part of the development of the new agent MVP, it is necessary to develop a new concurrency control module.
Take into account the following:
The text was updated successfully, but these errors were encountered: