Skip to content

Commit

Permalink
feat: Added DispatchCommand function
Browse files Browse the repository at this point in the history
  • Loading branch information
aritosteles committed Aug 16, 2024
1 parent e2d0ae7 commit ddd4c8a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
6 changes: 4 additions & 2 deletions src/agent/command_manager/include/command_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ namespace command_manager
~CommandManager() {};

template<typename T>
boost::asio::awaitable<void> ProcessCommandsFromQueue(const std::function<std::optional<T>()> GetCommand)
boost::asio::awaitable<void> ProcessCommandsFromQueue(const std::function<std::optional<T>()> GetCommand,
const std::function<int(T&)> DispatchMessage)
{
using namespace std::chrono_literals;
const auto executor = co_await boost::asio::this_coro::executor;
Expand All @@ -29,7 +30,8 @@ namespace command_manager
std::cout << "Queue is empty - WAITING" << std::endl;
continue;
}
// DispatchMessage();

DispatchMessage(cmd.value());
}
}
};
Expand Down
50 changes: 49 additions & 1 deletion src/agent/src/agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ Agent::~Agent()

void Agent::Run()
{
// add some test command to the queue
// const nlohmann::json dataContent = {{"command", {{"name", "001"}, {"type", "stateless"}}},
// {"origin", {{"serverName", "node01"}, {"moduleName", "upgradeModule"}}},
// {"parameters", {{"error", 0}, {"data", "Command received"}}},
// {"status", "Pending"}};
//

const nlohmann::json dataContent = {{"command", "upgradeModule"}, {"status", "Pending"}};
const Message messageToSend {MessageType::COMMAND, dataContent, "CommandManager"};
m_agentQueue.push(messageToSend);

m_taskManager.EnqueueTask(m_communicator.WaitForTokenExpirationAndAuthenticate());

m_taskManager.EnqueueTask(m_communicator.GetCommandsFromManager(
Expand All @@ -42,7 +53,44 @@ void Agent::Run()
{
return std::nullopt;
}
return m_agentQueue.getNext(MessageType::COMMAND);
Message m = m_agentQueue.getNext(MessageType::COMMAND);

std::cout << "COMMAND retrieved from Queue:" << std::endl;

// pop message from queue
m_agentQueue.pop(MessageType::COMMAND);

nlohmann::json jdata = m.data.at(0);
for (auto& [key, value] : jdata.items())
{
std::cout << key << ": " << value << std::endl;
}

std::cout << "Data inside data:" << std::endl;

nlohmann::json jdataData = m.data.at(0).at("data");
for (auto& [key, value] : jdataData.items())
{
std::cout << key << ": " << value << std::endl;
}

// change status and push again
// if (jdataData.at("status") == "Pending")
//{
// std::cout << "Message status is Pending. Updating status and pushing back." << std::endl;
// jdataData["status"] = "InProcess";
// Message newMessage(MessageType::COMMAND, jdataData, "CommandManager");
// // m_agentQueue.push(newMessage);
// // return m;
// }

return m;
},
[this](Message& cmd) -> int
{
std::cout << "Dispatching command" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 1;
}));

m_signalHandler.WaitForSignal();
Expand Down

0 comments on commit ddd4c8a

Please sign in to comment.