Skip to content

Commit a2d44bc

Browse files
committed
Basic implementation using the new collector internal service
1 parent 3fd49b0 commit a2d44bc

17 files changed

+281
-70
lines changed

collector/collector.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <sys/wait.h>
1313

1414
#include "ConfigLoader.h"
15+
#include "SensorClient.h"
1516

1617
extern "C" {
1718
#include <assert.h>

collector/lib/CollectorService.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,21 @@ static const char* OPTIONS[] = {"listening_ports", "8080", nullptr};
2626
CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlValue>* control,
2727
const std::atomic<int>* signum)
2828
: config_(config),
29-
system_inspector_(config_),
3029
control_(control),
3130
signum_(*signum),
3231
server_(OPTIONS),
3332
registry_(std::make_shared<prometheus::Registry>()),
3433
exposer_("9090"),
35-
exporter_(registry_, &config_, &system_inspector_),
3634
config_loader_(config_) {
3735
CLOG(INFO) << "Config: " << config_;
3836

37+
if (config_.grpc_channel != nullptr) {
38+
client_ = std::make_unique<SensorClient>(config_.grpc_channel);
39+
} else {
40+
client_ = std::make_unique<SensorClientStdout>();
41+
}
42+
system_inspector_ = {config_, client_.get()};
43+
3944
// Network tracking
4045
if (!config_.grpc_channel || !config_.DisableNetworkFlows()) {
4146
// In case if no GRPC is used, continue to setup networking infrasturcture
@@ -86,6 +91,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
8691
}
8792

8893
// Prometheus
94+
exporter_ = {registry_, &config_, &system_inspector_};
8995
exposer_.RegisterCollectable(registry_);
9096
}
9197

collector/lib/CollectorService.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class CollectorService {
3333
bool WaitForGRPCServer();
3434

3535
CollectorConfig& config_;
36+
std::unique_ptr<ISensorClient> client_;
3637
system_inspector::Service system_inspector_;
3738

3839
std::atomic<ControlValue>* control_;

collector/lib/CollectorStatsExporter.h

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define _COLLECTOR_STATS_EXPORTER_H_
33

44
#include <memory>
5+
#include <utility>
56

67
#include "CollectorConfig.h"
78
#include "CollectorStats.h"
@@ -13,6 +14,32 @@ namespace collector {
1314

1415
class CollectorStatsExporter {
1516
public:
17+
CollectorStatsExporter() = default;
18+
CollectorStatsExporter(const CollectorStatsExporter&) = delete;
19+
CollectorStatsExporter(CollectorStatsExporter&&) = delete;
20+
CollectorStatsExporter& operator=(const CollectorStatsExporter&) = delete;
21+
~CollectorStatsExporter() = default;
22+
23+
CollectorStatsExporter& operator=(CollectorStatsExporter&& other) noexcept {
24+
auto swap_running = other.thread_.running();
25+
26+
if (swap_running) {
27+
other.stop();
28+
}
29+
30+
registry_.swap(other.registry_);
31+
std::swap(config_, other.config_);
32+
std::swap(system_inspector_, other.system_inspector_);
33+
connections_total_reporter_.swap(other.connections_total_reporter_);
34+
connections_rate_reporter_.swap(other.connections_rate_reporter_);
35+
36+
if (swap_running) {
37+
start();
38+
}
39+
40+
return *this;
41+
}
42+
1643
CollectorStatsExporter(std::shared_ptr<prometheus::Registry> registry, const CollectorConfig* config, system_inspector::Service* si);
1744

1845
bool start();
@@ -24,8 +51,8 @@ class CollectorStatsExporter {
2451

2552
private:
2653
std::shared_ptr<prometheus::Registry> registry_;
27-
const CollectorConfig* config_;
28-
system_inspector::Service* system_inspector_;
54+
const CollectorConfig* config_{};
55+
system_inspector::Service* system_inspector_{};
2956
std::shared_ptr<CollectorConnectionStats<unsigned int>> connections_total_reporter_;
3057
std::shared_ptr<CollectorConnectionStats<float>> connections_rate_reporter_;
3158
StoppableThread thread_;

collector/lib/ProcessSignalFormatter.cpp

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <google/protobuf/util/time_util.h>
66

7+
#include "internalapi/sensor/collector_iservice.pb.h"
78
#include "internalapi/sensor/signal_iservice.pb.h"
89

910
#include "CollectorStats.h"
@@ -65,9 +66,9 @@ ProcessSignalFormatter::ProcessSignalFormatter(
6566
event_extractor_->Init(inspector);
6667
}
6768

68-
ProcessSignalFormatter::~ProcessSignalFormatter() {}
69+
ProcessSignalFormatter::~ProcessSignalFormatter() = default;
6970

70-
const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* event) {
71+
const sensor::MsgFromCollector* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* event) {
7172
if (process_signals[event->get_type()] == ProcessSignalType::UNKNOWN_PROCESS_TYPE) {
7273
return nullptr;
7374
}
@@ -80,38 +81,34 @@ const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* eve
8081
}
8182

8283
ProcessSignal* process_signal = CreateProcessSignal(event);
83-
if (!process_signal) {
84+
if (process_signal == nullptr) {
8485
return nullptr;
8586
}
8687

87-
Signal* signal = Allocate<Signal>();
88-
signal->set_allocated_process_signal(process_signal);
89-
90-
SignalStreamMessage* signal_stream_message = AllocateRoot();
91-
signal_stream_message->clear_collector_register_request();
92-
signal_stream_message->set_allocated_signal(signal);
93-
return signal_stream_message;
88+
auto* msg = AllocateRoot();
89+
msg->clear_info();
90+
msg->clear_register_();
91+
msg->set_allocated_process_signal(process_signal);
92+
return msg;
9493
}
9594

96-
const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
95+
const sensor::MsgFromCollector* ProcessSignalFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
9796
Reset();
9897
if (!ValidateProcessDetails(tinfo)) {
9998
CLOG(INFO) << "Dropping process event: " << tinfo;
10099
return nullptr;
101100
}
102101

103-
ProcessSignal* process_signal = CreateProcessSignal(tinfo);
104-
if (!process_signal) {
102+
ProcessSignal* signal = CreateProcessSignal(tinfo);
103+
if (signal == nullptr) {
105104
return nullptr;
106105
}
107106

108-
Signal* signal = Allocate<Signal>();
109-
signal->set_allocated_process_signal(process_signal);
110-
111-
SignalStreamMessage* signal_stream_message = AllocateRoot();
112-
signal_stream_message->clear_collector_register_request();
113-
signal_stream_message->set_allocated_signal(signal);
114-
return signal_stream_message;
107+
auto* msg = AllocateRoot();
108+
msg->clear_register_();
109+
msg->clear_info();
110+
msg->set_allocated_process_signal(signal);
111+
return msg;
115112
}
116113

117114
ProcessSignal* ProcessSignalFormatter::CreateProcessSignal(sinsp_evt* event) {
@@ -173,7 +170,7 @@ ProcessSignal* ProcessSignalFormatter::CreateProcessSignal(sinsp_evt* event) {
173170
// set time
174171
auto timestamp = Allocate<Timestamp>();
175172
*timestamp = TimeUtil::NanosecondsToTimestamp(event->get_ts());
176-
signal->set_allocated_time(timestamp);
173+
signal->set_allocated_creation_time(timestamp);
177174

178175
// set container_id
179176
if (const std::string* container_id = event_extractor_->get_container_id(event)) {
@@ -238,7 +235,7 @@ ProcessSignal* ProcessSignalFormatter::CreateProcessSignal(sinsp_threadinfo* tin
238235
// set time
239236
auto timestamp = Allocate<Timestamp>();
240237
*timestamp = TimeUtil::NanosecondsToTimestamp(tinfo->m_clone_ts);
241-
signal->set_allocated_time(timestamp);
238+
signal->set_allocated_creation_time(timestamp);
242239

243240
// set container_id
244241
signal->set_container_id(tinfo->m_container_id);
@@ -315,20 +312,20 @@ void ProcessSignalFormatter::CountLineage(const std::vector<LineageInfo>& lineag
315312

316313
void ProcessSignalFormatter::GetProcessLineage(sinsp_threadinfo* tinfo,
317314
std::vector<LineageInfo>& lineage) {
318-
if (tinfo == NULL) {
315+
if (tinfo == nullptr) {
319316
return;
320317
}
321-
sinsp_threadinfo* mt = NULL;
318+
sinsp_threadinfo* mt = nullptr;
322319
if (tinfo->is_main_thread()) {
323320
mt = tinfo;
324321
} else {
325322
mt = tinfo->get_main_thread();
326-
if (mt == NULL) {
323+
if (mt == nullptr) {
327324
return;
328325
}
329326
}
330-
sinsp_threadinfo::visitor_func_t visitor = [this, &lineage](sinsp_threadinfo* pt) {
331-
if (pt == NULL) {
327+
sinsp_threadinfo::visitor_func_t visitor = [&lineage](sinsp_threadinfo* pt) {
328+
if (pt == nullptr) {
332329
return false;
333330
}
334331
if (pt->m_pid == 0) {

collector/lib/ProcessSignalFormatter.h

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,35 @@
66
#include <gtest/gtest_prod.h>
77

88
#include "api/v1/signal.pb.h"
9-
#include "internalapi/sensor/signal_iservice.pb.h"
10-
#include "storage/process_indicator.pb.h"
9+
#include "internalapi/sensor/collector_iservice.pb.h"
1110

1211
#include "CollectorConfig.h"
13-
#include "CollectorStats.h"
1412
#include "ContainerMetadata.h"
1513
#include "EventNames.h"
1614
#include "ProtoSignalFormatter.h"
1715

1816
// forward definitions
1917
class sinsp;
2018
class sinsp_threadinfo;
21-
namespace collector {
22-
namespace system_inspector {
19+
20+
namespace collector::system_inspector {
2321
class EventExtractor;
2422
}
25-
} // namespace collector
2623

2724
namespace collector {
2825

29-
class ProcessSignalFormatter : public ProtoSignalFormatter<sensor::SignalStreamMessage> {
26+
class ProcessSignalFormatter : public ProtoSignalFormatter<sensor::MsgFromCollector> {
3027
public:
3128
ProcessSignalFormatter(sinsp* inspector, const CollectorConfig& config);
3229
~ProcessSignalFormatter();
3330

3431
using Signal = v1::Signal;
35-
using ProcessSignal = storage::ProcessSignal;
36-
using LineageInfo = storage::ProcessSignal_LineageInfo;
32+
using ProcessSignal = sensor::ProcessSignal;
33+
using LineageInfo = sensor::ProcessSignal_LineageInfo;
34+
using MsgFromCollector = sensor::MsgFromCollector;
3735

38-
const sensor::SignalStreamMessage* ToProtoMessage(sinsp_evt* event) override;
39-
const sensor::SignalStreamMessage* ToProtoMessage(sinsp_threadinfo* tinfo);
36+
const MsgFromCollector* ToProtoMessage(sinsp_evt* event) override;
37+
const MsgFromCollector* ToProtoMessage(sinsp_threadinfo* tinfo);
4038

4139
void GetProcessLineage(sinsp_threadinfo* tinfo, std::vector<LineageInfo>& lineage);
4240

collector/lib/ProcessSignalHandler.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
namespace collector {
1515

16-
std::string compute_process_key(const ::storage::ProcessSignal& s) {
16+
std::string compute_process_key(const ::sensor::ProcessSignal& s) {
1717
std::stringstream ss;
1818
ss << s.container_id() << " " << s.name() << " ";
1919
if (s.args().length() <= 256) {
@@ -39,21 +39,21 @@ bool ProcessSignalHandler::Stop() {
3939
SignalHandler::Result ProcessSignalHandler::HandleSignal(sinsp_evt* evt) {
4040
const auto* signal_msg = formatter_.ToProtoMessage(evt);
4141

42-
if (!signal_msg) {
42+
if (signal_msg == nullptr) {
4343
++(stats_->nProcessResolutionFailuresByEvt);
4444
return IGNORED;
4545
}
4646

47-
const char* name = signal_msg->signal().process_signal().name().c_str();
48-
const int pid = signal_msg->signal().process_signal().pid();
47+
const char* name = signal_msg->process_signal().name().c_str();
48+
const uint32_t pid = signal_msg->process_signal().pid();
4949
DTRACE_PROBE2(collector, process_signal_handler, name, pid);
5050

51-
if (!rate_limiter_.Allow(compute_process_key(signal_msg->signal().process_signal()))) {
51+
if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
5252
++(stats_->nProcessRateLimitCount);
5353
return IGNORED;
5454
}
5555

56-
auto result = client_->PushSignals(*signal_msg);
56+
auto result = client_->SendMsg(*signal_msg);
5757
if (result == SignalHandler::PROCESSED) {
5858
++(stats_->nProcessSent);
5959
} else if (result == SignalHandler::ERROR) {
@@ -65,17 +65,17 @@ SignalHandler::Result ProcessSignalHandler::HandleSignal(sinsp_evt* evt) {
6565

6666
SignalHandler::Result ProcessSignalHandler::HandleExistingProcess(sinsp_threadinfo* tinfo) {
6767
const auto* signal_msg = formatter_.ToProtoMessage(tinfo);
68-
if (!signal_msg) {
68+
if (signal_msg == nullptr) {
6969
++(stats_->nProcessResolutionFailuresByTinfo);
7070
return IGNORED;
7171
}
7272

73-
if (!rate_limiter_.Allow(compute_process_key(signal_msg->signal().process_signal()))) {
73+
if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
7474
++(stats_->nProcessRateLimitCount);
7575
return IGNORED;
7676
}
7777

78-
auto result = client_->PushSignals(*signal_msg);
78+
auto result = client_->SendMsg(*signal_msg);
7979
if (result == SignalHandler::PROCESSED) {
8080
++(stats_->nProcessSent);
8181
} else if (result == SignalHandler::ERROR) {

collector/lib/ProcessSignalHandler.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "CollectorConfig.h"
99
#include "ProcessSignalFormatter.h"
1010
#include "RateLimit.h"
11+
#include "SensorClient.h"
1112
#include "SignalHandler.h"
1213
#include "system-inspector/Service.h"
1314

@@ -22,7 +23,7 @@ class ProcessSignalHandler : public SignalHandler {
2223
public:
2324
ProcessSignalHandler(
2425
sinsp* inspector,
25-
ISignalServiceClient* client,
26+
ISensorClient* client,
2627
system_inspector::Stats* stats,
2728
const CollectorConfig& config)
2829
: client_(client),
@@ -38,7 +39,7 @@ class ProcessSignalHandler : public SignalHandler {
3839
std::vector<std::string> GetRelevantEvents() override;
3940

4041
private:
41-
ISignalServiceClient* client_;
42+
ISensorClient* client_;
4243
ProcessSignalFormatter formatter_;
4344
system_inspector::Stats* stats_;
4445
RateLimitCache rate_limiter_;

0 commit comments

Comments
 (0)