Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Endpoint#seconds_{reading_messages,awaiting_semaphore,processing_messages} #10266

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ set(base_SOURCES
atomic.hpp
atomic-file.cpp atomic-file.hpp
base64.cpp base64.hpp
benchmark.cpp benchmark.hpp
boolean.cpp boolean.hpp boolean-script.cpp
bulker.hpp
configobject.cpp configobject.hpp configobject-ti.hpp configobject-script.cpp
Expand Down
34 changes: 34 additions & 0 deletions lib/base/benchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* Icinga 2 | (c) 2024 Icinga GmbH | GPLv2+ */

#include "base/benchmark.hpp"

using namespace icinga;

/**
* Adds the elapsedTime to this instance.
*
* May be called multiple times to accumulate time.
*
* @param elapsedTime The distance between two time points
*
* @return This instance for method chaining
*/
Benchmark& Benchmark::operator+=(const Clock::duration& elapsedTime) noexcept
{
m_Sum.fetch_add(elapsedTime.count(), std::memory_order_relaxed);
return *this;
}

/**
* Adds the time elapsed since startTime to this instance.
*
* May be called multiple times to accumulate time.
*
* @param startTime The start time to subtract from the current time
*
* @return This instance for method chaining
*/
Benchmark& Benchmark::operator+=(const Clock::time_point& startTime) noexcept
{
return *this += Clock::now() - startTime;
}
37 changes: 37 additions & 0 deletions lib/base/benchmark.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* Icinga 2 | (c) 2024 Icinga GmbH | GPLv2+ */

#pragma once

#include "base/atomic.hpp"
#include <chrono>

namespace icinga
{

/**
* Benchmark result.
*
* @ingroup base
*/
class Benchmark
{
public:
using Clock = std::chrono::steady_clock;

Benchmark& operator+=(const Clock::duration&) noexcept;
Benchmark& operator+=(const Clock::time_point&) noexcept;

/**
* @return The total accumulated time in seconds
*/
template<class T>
operator T() const noexcept
{
return std::chrono::duration<T>(Clock::duration(m_Sum.load(std::memory_order_relaxed))).count();
Al2Klimov marked this conversation as resolved.
Show resolved Hide resolved
}

private:
Atomic<Clock::duration::rep> m_Sum {0};
};

}
11 changes: 10 additions & 1 deletion lib/methods/clusterzonechecktask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che
double messagesReceivedPerSecond = 0;
double bytesSentPerSecond = 0;
double bytesReceivedPerSecond = 0;
double secondsReadingMessages = 0;
double secondsAwaitingSemaphore = 0;
double secondsProcessingMessages = 0;

{
auto endpoints (zone->GetEndpoints());
Expand All @@ -160,6 +163,9 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che
messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond();
bytesSentPerSecond += endpoint->GetBytesSentPerSecond();
bytesReceivedPerSecond += endpoint->GetBytesReceivedPerSecond();
secondsReadingMessages += endpoint->GetSecondsReadingMessages();
secondsAwaitingSemaphore += endpoint->GetSecondsAwaitingSemaphore();
secondsProcessingMessages += endpoint->GetSecondsProcessingMessages();
}

if (!connected && endpoints.size() == 1u && *endpoints.begin() == Endpoint::GetLocalEndpoint()) {
Expand Down Expand Up @@ -210,7 +216,10 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che
new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond),
new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond),
new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond),
new PerfdataValue("sum_bytes_received_per_second", bytesReceivedPerSecond)
new PerfdataValue("sum_bytes_received_per_second", bytesReceivedPerSecond),
new PerfdataValue("sum_seconds_reading_messages", secondsReadingMessages),
new PerfdataValue("sum_seconds_awaiting_semaphore", secondsAwaitingSemaphore),
new PerfdataValue("sum_seconds_processing_messages", secondsProcessingMessages)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                        {
                            "counter": false,
                            "crit": null,
                            "label": "sum_seconds_reading_messages",
                            "max": null,
                            "min": null,
                            "type": "PerfdataValue",
                            "unit": "",
                            "value": 0.000787875,
                            "warn": null
                        },
                        {
                            "counter": false,
                            "crit": null,
                            "label": "sum_seconds_processing_messages",
                            "max": null,
                            "min": null,
                            "type": "PerfdataValue",
                            "unit": "",
                            "value": 3.45e-05,
                            "warn": null
                        }

}));

checkable->ProcessCheckResult(cr);
Expand Down
9 changes: 9 additions & 0 deletions lib/methods/icingachecktask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
double messagesReceivedPerSecond = 0;
double bytesSentPerSecond = 0;
double bytesReceivedPerSecond = 0;
double secondsReadingMessages = 0;
double secondsAwaitingSemaphore = 0;
double secondsProcessingMessages = 0;

for (const Endpoint::Ptr& endpoint : endpoints)
{
Expand All @@ -140,6 +143,9 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond();
bytesSentPerSecond += endpoint->GetBytesSentPerSecond();
bytesReceivedPerSecond += endpoint->GetBytesReceivedPerSecond();
secondsReadingMessages += endpoint->GetSecondsReadingMessages();
secondsAwaitingSemaphore += endpoint->GetSecondsAwaitingSemaphore();
secondsProcessingMessages += endpoint->GetSecondsProcessingMessages();
}

perfdata->Add(new PerfdataValue("last_messages_sent", lastMessageSent));
Expand All @@ -148,6 +154,9 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
perfdata->Add(new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond));
perfdata->Add(new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond));
perfdata->Add(new PerfdataValue("sum_bytes_received_per_second", bytesReceivedPerSecond));
perfdata->Add(new PerfdataValue("sum_seconds_reading_messages", secondsReadingMessages));
perfdata->Add(new PerfdataValue("sum_seconds_awaiting_semaphore", secondsAwaitingSemaphore));
perfdata->Add(new PerfdataValue("sum_seconds_processing_messages", secondsProcessingMessages));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                        {
                            "counter": false,
                            "crit": null,
                            "label": "sum_seconds_reading_messages",
                            "max": null,
                            "min": null,
                            "type": "PerfdataValue",
                            "unit": "",
                            "value": 0.000264792,
                            "warn": null
                        },
                        {
                            "counter": false,
                            "crit": null,
                            "label": "sum_seconds_processing_messages",
                            "max": null,
                            "min": null,
                            "type": "PerfdataValue",
                            "unit": "",
                            "value": 5.2666e-05,
                            "warn": null
                        }


cr->SetPerformanceData(perfdata);
ServiceState state = ServiceOK;
Expand Down
15 changes: 15 additions & 0 deletions lib/remote/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,18 @@ double Endpoint::GetBytesReceivedPerSecond() const
{
return m_BytesReceived.CalculateRate(Utility::GetTime(), 60);
}

double Endpoint::GetSecondsReadingMessages() const
{
return m_InputReadTime;
}

double Endpoint::GetSecondsAwaitingSemaphore() const
{
return m_InputSemaphoreTime;
}

double Endpoint::GetSecondsProcessingMessages() const
{
return m_InputProcessTime;
}
17 changes: 17 additions & 0 deletions lib/remote/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "remote/i2-remote.hpp"
#include "remote/endpoint-ti.hpp"
#include "base/benchmark.hpp"
#include "base/ringbuffer.hpp"
#include <set>

Expand Down Expand Up @@ -43,12 +44,24 @@ class Endpoint final : public ObjectImpl<Endpoint>
void AddMessageSent(int bytes);
void AddMessageReceived(int bytes);

template<class R, class S, class P>
void AddInputTimes(const R& readTime, const S& semaphoreTime, const P& processTime)
{
m_InputReadTime += readTime;
m_InputSemaphoreTime += semaphoreTime;
m_InputProcessTime += processTime;
}

double GetMessagesSentPerSecond() const override;
double GetMessagesReceivedPerSecond() const override;

double GetBytesSentPerSecond() const override;
double GetBytesReceivedPerSecond() const override;

double GetSecondsReadingMessages() const override;
double GetSecondsAwaitingSemaphore() const override;
double GetSecondsProcessingMessages() const override;

protected:
void OnAllConfigLoaded() override;

Expand All @@ -61,6 +74,10 @@ class Endpoint final : public ObjectImpl<Endpoint>
mutable RingBuffer m_MessagesReceived{60};
mutable RingBuffer m_BytesSent{60};
mutable RingBuffer m_BytesReceived{60};

Benchmark m_InputReadTime;
Benchmark m_InputSemaphoreTime;
Benchmark m_InputProcessTime;
};

}
Expand Down
12 changes: 12 additions & 0 deletions lib/remote/endpoint.ti
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ class Endpoint : ConfigObject
[no_user_modify, no_storage] double bytes_received_per_second {
get;
};

[no_user_modify, no_storage] double seconds_reading_messages {
get;
};

[no_user_modify, no_storage] double seconds_awaiting_semaphore {
get;
};

[no_user_modify, no_storage] double seconds_processing_messages {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Colleagues, addition we could also (just for the API) record seconds_processing_messages PER message. I.e. there should be also another attribute returning a dict like {"event::CheckResult":42.0,... saying that this endpoint e.g. spent 42 seconds handling (already read and decoded) event::CheckResult messages since program start.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@Al2Klimov Al2Klimov Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get;
};
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works! 👍

< HTTP/1.1 200 OK
< Server: Icinga/v2.14.0-375-gecea52568
< Content-Type: application/json
< Content-Length: 591
<
{
    "results": [
        {
            "attrs": {
                "seconds_processing_messages": 3.1292e-05,
                "seconds_reading_messages": 0.000155709
            },
            "joins": {},
            "meta": {},
            "name": "dummy",
            "type": "Endpoint"
        },
        {
            "attrs": {
                "seconds_processing_messages": 0,
                "seconds_reading_messages": 0
            },
            "joins": {},
            "meta": {},
            "name": "ws-aklimov7777777.local",
            "type": "Endpoint"
        }
    ]
}

};

}
19 changes: 19 additions & 0 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "remote/jsonrpcconnection.hpp"
#include "remote/apilistener.hpp"
#include "remote/apifunction.hpp"
#include "base/benchmark.hpp"
#include "remote/jsonrpc.hpp"
#include "base/defer.hpp"
#include "base/configtype.hpp"
Expand Down Expand Up @@ -66,11 +67,17 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
return ch::duration_cast<ch::milliseconds>(d).count();
});

Benchmark::Clock::time_point readStarted, readFinished, processingStarted;

m_Stream->next_layer().SetSeen(&m_Seen);

while (!m_ShuttingDown) {
String jsonString;

if (m_Endpoint) {
readStarted = Benchmark::Clock::now();
}

try {
jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
} catch (const std::exception& ex) {
Expand All @@ -81,6 +88,10 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
break;
}

if (m_Endpoint) {
readFinished = Benchmark::Clock::now();
}

m_Seen = Utility::GetTime();
if (m_Endpoint) {
m_Endpoint->AddMessageReceived(jsonString.GetLength());
Expand All @@ -96,13 +107,21 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;

if (m_Endpoint) {
processingStarted = Benchmark::Clock::now();
}

Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
if (String method = message->Get("method"); !method.IsEmpty()) {
rpcMethod = std::move(method);
}

MessageHandler(message);

if (m_Endpoint) {
m_Endpoint->AddInputTimes(readFinished - readStarted, cpuBoundDuration, processingStarted);
}

l_TaskStats.InsertValue(Utility::GetTime(), 1);

auto total = ch::steady_clock::now() - start;
Expand Down
4 changes: 4 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ set(base_test_SOURCES
icingaapplication-fixture.cpp
base-array.cpp
base-base64.cpp
base-benchmark.cpp
base-convert.cpp
base-dictionary.cpp
base-fifo.cpp
Expand Down Expand Up @@ -112,6 +113,9 @@ add_boost_test(base
base_array/clone
base_array/json
base_base64/base64
base_benchmark/zero
base_benchmark/one
base_benchmark/two
base_convert/tolong
base_convert/todouble
base_convert/tostring
Expand Down
51 changes: 51 additions & 0 deletions test/base-benchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/* Icinga 2 | (c) 2024 Icinga GmbH | GPLv2+ */

#include "base/benchmark.hpp"
#include "base/utility.hpp"
#include <BoostTestTargetConfig.h>
#include <chrono>
#include <cmath>

using namespace icinga;

static bool AssertSumSeconds(Benchmark& sum, double seconds)
{
return std::abs(((double)sum - seconds) / seconds) < 0.05;
}

BOOST_AUTO_TEST_SUITE(base_benchmark)

BOOST_AUTO_TEST_CASE(zero)
{
BOOST_CHECK_EQUAL((double)Benchmark(), 0);
}

BOOST_AUTO_TEST_CASE(one)
{
Benchmark sum;

auto start (Benchmark::Clock::now());
Utility::Sleep(0.25);
sum += start;

BOOST_CHECK(AssertSumSeconds(sum, 0.25));
}

BOOST_AUTO_TEST_CASE(two)
{
Benchmark sum;

{
auto start (Benchmark::Clock::now());
Utility::Sleep(0.25);
sum += start;
}

auto start (Benchmark::Clock::now());
Utility::Sleep(0.5);
sum += start;

BOOST_CHECK(AssertSumSeconds(sum, 0.75));
}

BOOST_AUTO_TEST_SUITE_END()
Loading