Skip to content

Commit

Permalink
Merge pull request #39 from wazuh/enhancement/24-design-and-developme…
Browse files Browse the repository at this point in the history
…nt-of-the-concurrency-control-module-for-the-new-agent

Development of the concurrency control module for the new agent
  • Loading branch information
TomasTurina authored Jul 17, 2024
2 parents 824bb1a + f832d54 commit 604de06
Show file tree
Hide file tree
Showing 17 changed files with 427 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
build/

*.so
*.so.*
*.o
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "src/vcpkg"]
path = src/vcpkg
url = https://github.com/microsoft/vcpkg
53 changes: 53 additions & 0 deletions BUILD.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Build Instructions

1. **Clone the Repository**

First, clone the repository using the following command:

```bash
git clone https://github.com/wazuh/wazuh-agent.git
```

2. **Initialize Submodules**

The project uses submodules, so you need to initialize and update them. Run the following commands:

```bash
cd wazuh-agent
git submodule update --init --recursive
```

3. **Build the Project**

Navigate to the `src` folder and create a `build` directory:

```bash
cd src
mkdir build
cd build
```

Run `cmake` to configure the project:

```bash
cmake ..
```

If you want to include tests, configure the project with the following command:

```bash
cmake .. -DBUILD_TESTS=1
```

4. **Build the Project**

Finally, build the project using `make`:

```bash
make
```

## Notes

- The project uses `vcpkg` as a submodule to manage dependencies. By initializing the submodules, `vcpkg` will automatically fetch the necessary dependencies when running CMake.
- Ensure you have `cmake` and `make` installed on your system. You can install them via your package manager.
9 changes: 9 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
cmake_minimum_required(VERSION 3.22)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")

project(Wazuh-Agent)

add_subdirectory(agent)
34 changes: 34 additions & 0 deletions src/agent/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
cmake_minimum_required(VERSION 3.22)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/../vcpkg/scripts/buildsystems/vcpkg.cmake")
set(VCPKG_MANIFEST_DIR ${CMAKE_SOURCE_DIR}/../)

project(Agent)

include_directories(include)

set(SOURCES
src/agent.cpp
src/message_task.cpp
src/task_manager.cpp
)

set(HEADERS
include/agent.hpp
include/itask_manager.hpp
include/task_manager.hpp
include/message_task.hpp
)

add_library(agent ${SOURCES} ${HEADERS})

find_package(Boost REQUIRED COMPONENTS asio)

target_link_libraries(agent PRIVATE ${Boost_LIBRARIES})

if(BUILD_TESTS)
add_subdirectory(tests)
endif()
18 changes: 18 additions & 0 deletions src/agent/include/agent.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <task_manager.hpp>

#include <queue>
#include <string>

class Agent
{
public:
Agent();
~Agent();

private:
std::queue<std::string> m_messageQueue;

TaskManager m_taskManager;
};
16 changes: 16 additions & 0 deletions src/agent/include/itask_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <functional>

template<typename CoroutineTaskType>
class ITaskManager
{
public:
virtual ~ITaskManager() = default;

virtual void start(size_t numThreads) = 0;
virtual void stop() = 0;

virtual void enqueueTask(std::function<void()> task) = 0;
virtual void enqueueTask(CoroutineTaskType task) = 0;
};
11 changes: 11 additions & 0 deletions src/agent/include/message_task.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include <boost/asio.hpp>
#include <boost/beast.hpp>

#include <queue>
#include <string>

boost::asio::awaitable<void> StatefulMessageProcessingTask(std::queue<std::string>& messageQueue);

boost::asio::awaitable<void> StatelessMessageProcessingTask(std::queue<std::string>& messageQueue);
27 changes: 27 additions & 0 deletions src/agent/include/task_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <itask_manager.hpp>

#include <boost/asio/awaitable.hpp>
#include <boost/asio/io_context.hpp>

#include <functional>
#include <thread>
#include <vector>

class TaskManager : public ITaskManager<boost::asio::awaitable<void>>
{
public:
TaskManager();

void start(size_t numThreads) override;
void stop() override;

void enqueueTask(std::function<void()> task) override;
void enqueueTask(boost::asio::awaitable<void> task) override;

private:
boost::asio::io_context m_ioContext;
boost::asio::io_context::work m_work;
std::vector<std::thread> m_threads;
};
20 changes: 20 additions & 0 deletions src/agent/src/agent.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "agent.hpp"

#include <message_task.hpp>

#include <chrono>
#include <thread>

Agent::Agent()
{
m_taskManager.start(std::thread::hardware_concurrency());
m_taskManager.enqueueTask(StatefulMessageProcessingTask(m_messageQueue));
m_taskManager.enqueueTask(StatelessMessageProcessingTask(m_messageQueue));
}

Agent::~Agent()
{
// Sleep for 2 seconds
std::this_thread::sleep_for(std::chrono::seconds(2));
m_taskManager.stop();
}
83 changes: 83 additions & 0 deletions src/agent/src/message_task.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#include <message_task.hpp>

#include <chrono>
#include <iostream>

namespace
{

boost::asio::awaitable<void> send_http_request(const std::string host,
const std::string port,
const std::string target,
std::queue<std::string>& messageQueue)
{
using namespace std::chrono_literals;

auto executor = co_await boost::asio::this_coro::executor;
boost::asio::steady_timer timer(executor);
boost::asio::ip::tcp::resolver resolver(executor);

while (true)
{
boost::beast::error_code ec;
boost::asio::ip::tcp::socket socket(executor);

auto const results = co_await resolver.async_resolve(host, port, boost::asio::use_awaitable);
co_await boost::asio::async_connect(socket, results, boost::asio::use_awaitable);

while (true)
{
std::string message;
{
if (messageQueue.empty())
{
timer.expires_after(100ms);
co_await timer.async_wait(boost::asio::use_awaitable);
continue;
}
message = std::move(messageQueue.front());
messageQueue.pop();
}

// HTTP request
boost::beast::http::request<boost::beast::http::string_body> req {
boost::beast::http::verb::post, target, 11};
req.set(boost::beast::http::field::host, host);
req.set(boost::beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING);
req.body() = message;
req.prepare_payload();
co_await boost::beast::http::async_write(
socket, req, boost::asio::redirect_error(boost::asio::use_awaitable, ec));

if (ec)
{
socket.close();
break;
}

// HTTP response
boost::beast::flat_buffer buffer;
boost::beast::http::response<boost::beast::http::dynamic_body> res;
co_await boost::beast::http::async_read(
socket, buffer, res, boost::asio::redirect_error(boost::asio::use_awaitable, ec));

if (ec)
{
socket.close();
break;
}
}
}
}

} // namespace

boost::asio::awaitable<void> StatefulMessageProcessingTask(std::queue<std::string>& messageQueue)
{
co_await send_http_request("localhost", "8080", "/stateless", messageQueue);
}

boost::asio::awaitable<void> StatelessMessageProcessingTask(std::queue<std::string>& messageQueue)
{
co_await send_http_request("localhost", "8080", "/stateful", messageQueue);
}
45 changes: 45 additions & 0 deletions src/agent/src/task_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include <task_manager.hpp>

#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/post.hpp>

TaskManager::TaskManager()
: m_work(m_ioContext)
{
}

void TaskManager::start(size_t numThreads)
{
stop();

for (size_t i = 0; i < numThreads; ++i)
{
m_threads.emplace_back([this]() { m_ioContext.run(); });
}
}

void TaskManager::stop()
{
m_ioContext.stop();

for (std::thread& thread : m_threads)
{
if (thread.joinable())
{
thread.join();
}
}
m_threads.clear();
m_ioContext.reset();
}

void TaskManager::enqueueTask(std::function<void()> task)
{
boost::asio::post(m_ioContext, std::move(task));
}

void TaskManager::enqueueTask(boost::asio::awaitable<void> task)
{
boost::asio::co_spawn(m_ioContext, std::move(task), boost::asio::detached);
}
16 changes: 16 additions & 0 deletions src/agent/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Wazuh Agent tests

find_package(GTest CONFIG REQUIRED)

set(TEST_SOURCES
agent_test.cpp
task_manager_test.cpp
)

add_executable(agent_test agent_test.cpp)
target_link_libraries(agent_test PRIVATE agent GTest::gtest)
add_test(NAME AgentTest COMMAND agent_test)

add_executable(task_manager_test task_manager_test.cpp)
target_link_libraries(task_manager_test PRIVATE agent GTest::gtest)
add_test(NAME TaskManagerTest COMMAND task_manager_test)
14 changes: 14 additions & 0 deletions src/agent/tests/agent_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include <agent.hpp>

#include <gtest/gtest.h>

TEST(AgentTests, AgentConstruction)
{
EXPECT_NO_THROW(Agent {});
}

int main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading

0 comments on commit 604de06

Please sign in to comment.