Skip to content

Commit 9dbda99

Browse files
authored
Merge pull request #90 from cactusdynamics/trace-aggregator-decouple
Trace aggregator decouple
2 parents 1cd42c3 + 089cbbb commit 9dbda99

File tree

17 files changed

+264
-414
lines changed

17 files changed

+264
-414
lines changed

.clang-tidy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ cert-*,
2121
-readability-identifier-length,
2222
-readability-isolate-declaration,
2323
-readability-magic-numbers,
24-
-readability-redundant-inline-specifier'
24+
-readability-redundant-inline-specifier,
25+
-readability-use-anyofallof'
2526
# TODO: Re-enable bugprone-exception-escape when no longer throwing
2627
# https://github.com/isocpp/CppCoreGuidelines/issues/1589
2728
WarningsAsErrors: '*'

docs/imgs/trace-architecture.svg

Lines changed: 1 addition & 1 deletion
Loading

docs/imgs/tracing-ownership-structure.svg

Lines changed: 4 additions & 0 deletions
Loading

docs/tracing.md

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -191,45 +191,45 @@ reading a single global atomic boolean variable. This variable controls all
191191
traces from all threads within the process.
192192

193193
Upon enabling tracing via `App::StartTraceSession`, `cactus_rt` also creates and
194-
starts the `TraceAggregator` threads and registers the appropriate sinks. The
195-
`App` object caches a list of known `ThreadTracers` from all the threads that
196-
currently exists and this is passed to the newly created `TraceAggregator`.
197-
Perfetto's file format specification indicates that the track descriptor packets
198-
must be first written before the actual trace event packets. Thus, after the
199-
creation of the `TraceAggregator` and `Sink` registration, a
194+
starts the `TraceAggregator` thread. Perfetto's file format specification
195+
indicates that the track descriptor packets must be first written before the
196+
actual trace event packets. Thus, after starting the `TraceAggregator`, a
200197
[`ProcessDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ProcessDescriptor)
201-
packet is first written. Upon the registration of each of the cached
202-
`ThreadTracers` as passed through by `App`, a
198+
packet is first written. A
203199
[`ThreadDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ThreadDescriptor)
204-
packet is emitted for each thread. Then, the main loop of the `TraceAggregator`
205-
can run which will write track event packets to the sinks.
200+
packet is emitted for each thread that is known to the `TraceAggregator`. Then,
201+
the main loop of the `TraceAggregator` can run which will write track event
202+
packets to the sinks.
206203

207204
When tracing is disabled via `App::StopTraceSession`, the tracing enabled atomic
208205
bool will be set to false. The system will request the `TraceAggregator` thread
209206
to drain all data from the existing `ThreadTracers` and stop. Once this is done,
210-
the file is closed and the `TraceAggregator` is destroyed to save resources.
207+
the file is closed and the `TraceAggregator` states (interned data, sequence
208+
states) are reset so they can be launched again.
211209

212210
#### Dynamic thread creation
213211

214212
Each `Thread` owns a `ThreadTracer`. However, when a thread starts, it must
215-
notify the `App` and `TraceAggregator` (if tracing is enabled and it exists) of
216-
its existence and thread id so a
213+
notify the `TraceAggregator` (if tracing is enabled and it exists) of its
214+
existence and thread id so a
217215
[`ThreadDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ThreadDescriptor)
218216
packet can be written to the data stream before any trace event data is written.
219-
If tracing is not enabled and thus `TraceAggregator` is not present, the `App`
220-
will cache the `ThreadTracers` and will pass it onto the `TraceAggregator`
221-
if/when tracing is enabled.
217+
If tracing is not enabled right now, the `TraceAggregator` will cache the
218+
`ThreadTracers` so that once tracing is enabled, the `ThreadDescriptor` packet
219+
is written out.
222220

223-
The `Thread` is able to communicate with the `App` by storing a non-owning
224-
pointer to the `App`. This pointer is setup during `App::RegisterThread` so
225-
there's no explicit dependency between `Thread` and `App` during construction.
226-
This decision may be revisited in the future.
221+
The `Thread` is able to communicate with the `TraceAggregator` by storing a
222+
`weak_ptr` to the `TraceAggregator`. This pointer is set up during
223+
`App::RegisterThread` so there's no explicit dependency between `Thread` and
224+
`App` during construction. This decision may be revisited in the future.
227225

228-
#### Cleanup after thread shutdown
226+
#### Ownership structure
229227

230-
TODO...
228+
The structure is not ideal and has some problems, but works for the most part.
231229

232-
#### Dynamic sink registration
230+
![Trace architecture](imgs/tracing-ownership-structure.svg)
231+
232+
#### Cleanup after thread shutdown
233233

234234
TODO...
235235

@@ -255,6 +255,9 @@ noting:
255255
(i.e. thus `(trusted_packet_sequence_id, iid)` is sufficient to identify an
256256
interned string). This, along with (1), implies we have to intern strings on
257257
a per-thread interner.
258+
3. When a tracing session stops, the string interner states for all known thread
259+
tracers are reset. This means a subsequent session will not have the same
260+
string iids.
258261

259262
### Other notes
260263

examples/tracing_example_no_rt/main.cc

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#include <cactus_rt/tracing.h>
22

3-
#include <list>
43
#include <memory>
54
#include <thread>
65

@@ -25,18 +24,15 @@ void StartTracing(const char* app_name, const char* filename) {
2524

2625
// Create the file sink so the data aggregated by the TraceAggregator will be written to somewhere.
2726
auto file_sink = std::make_shared<FileSink>(filename);
28-
trace_aggregator->RegisterSink(file_sink);
2927

3028
quill::start();
31-
trace_aggregator->Start();
29+
trace_aggregator->Start(file_sink);
3230
}
3331

3432
void StopTracing() {
3533
cactus_rt::tracing::DisableTracing();
3634

37-
trace_aggregator->RequestStop();
38-
trace_aggregator->Join();
39-
trace_aggregator = nullptr; // Destroy the trace aggregator and free all resources.
35+
trace_aggregator->Stop();
4036
}
4137

4238
int main() {

include/cactus_rt/app.h

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
#include <gtest/gtest_prod.h>
55

6-
#include <list>
76
#include <memory>
87
#include <string>
98
#include <vector>
@@ -36,14 +35,7 @@ class App {
3635

3736
std::vector<std::shared_ptr<Thread>> threads_;
3837

39-
// We need to cache the list thread tracers here because the trace_aggregator
40-
// can be dynamically created and stopped. When a new trace aggregator is
41-
// created, it needs to know about all the thread tracers.
42-
//
43-
// TODO: investigate into a weak pointer.
44-
std::list<std::shared_ptr<tracing::ThreadTracer>> thread_tracers_;
45-
std::unique_ptr<tracing::TraceAggregator> trace_aggregator_ = nullptr;
46-
std::mutex aggregator_mutex_;
38+
std::shared_ptr<tracing::TraceAggregator> trace_aggregator_;
4739

4840
void SetDefaultLogFormat(quill::Config& cfg) {
4941
// Create a handler of stdout
@@ -117,11 +109,6 @@ class App {
117109
*/
118110
bool StartTraceSession(std::shared_ptr<tracing::Sink> sink) noexcept;
119111

120-
/**
121-
* @brief Register a custom trace sink after starting the trace session
122-
*/
123-
void RegisterTraceSink(std::shared_ptr<tracing::Sink> sink) noexcept;
124-
125112
/**
126113
* @brief Stops the tracing session for the process. Will be no-op if tracing
127114
* is not enabled. This function is not real-time safe.
@@ -148,18 +135,6 @@ class App {
148135
void StartQuill();
149136

150137
private:
151-
/**
152-
* @brief Register a thread tracer. Should only be called from Thread::RunThread.
153-
*/
154-
void RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tracer) noexcept;
155-
156-
/**
157-
* @brief Remove a thread tracer. Should only be called from Thread::~Thread().
158-
*/
159-
void DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept;
160-
161-
void CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noexcept;
162-
163138
void StopTraceAggregator() noexcept;
164139
};
165140
} // namespace cactus_rt

include/cactus_rt/thread.h

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,25 @@
77
#include <cstdint>
88
#include <memory>
99
#include <string>
10-
#include <vector>
1110

1211
#include "config.h"
1312
#include "quill/Quill.h"
1413
#include "tracing/thread_tracer.h"
14+
#include "tracing/trace_aggregator.h"
1515

1616
namespace cactus_rt {
1717

1818
/// @private
1919
constexpr size_t kDefaultStackSize = 8 * 1024 * 1024; // 8MB default stack space should be plenty
2020

21-
// Necessary forward declaration
22-
class App;
23-
2421
class Thread {
2522
ThreadConfig config_;
2623
std::string name_;
2724
std::vector<size_t> cpu_affinity_;
2825
size_t stack_size_;
2926

3027
quill::Logger* logger_;
31-
std::shared_ptr<tracing::ThreadTracer> tracer_;
28+
std::shared_ptr<tracing::ThreadTracer> tracer_ = nullptr;
3229

3330
std::atomic_bool stop_requested_ = false;
3431

@@ -41,12 +38,10 @@ class Thread {
4138
*/
4239
static void* RunThread(void* data);
4340

44-
friend class App;
45-
46-
// Non-owning App pointer. Used only for notifying that the thread has
47-
// started/stopped for tracing purposes. Set by Thread::Start and read at
41+
// Non-owning TraceAggregator pointer. Used only for notifying that the thread
42+
// has started/stopped for tracing purposes. Set by Thread::Start and read at
4843
// the beginning of Thread::RunThread.
49-
App* app_ = nullptr;
44+
std::weak_ptr<tracing::TraceAggregator> trace_aggregator_;
5045

5146
public:
5247
/**
@@ -60,8 +55,7 @@ class Thread {
6055
name_(name),
6156
cpu_affinity_(config_.cpu_affinity),
6257
stack_size_(static_cast<size_t>(PTHREAD_STACK_MIN) + config_.stack_size),
63-
logger_(quill::create_logger(name_)),
64-
tracer_(std::make_shared<tracing::ThreadTracer>(name, config_.tracer_config.queue_size)) {
58+
logger_(quill::create_logger(name_)) {
6559
if (!config.scheduler) {
6660
throw std::runtime_error("ThreadConfig::scheduler cannot be nullptr");
6761
}
@@ -123,12 +117,16 @@ class Thread {
123117
*
124118
* @private
125119
*/
126-
inline void SetApp(App* app) {
127-
app_ = app;
120+
inline void SetTraceAggregator(std::weak_ptr<tracing::TraceAggregator> trace_aggregator) {
121+
trace_aggregator_ = trace_aggregator;
128122
}
129123

130124
protected:
131-
inline quill::Logger* Logger() const { return logger_; }
125+
inline quill::Logger* Logger() const { return logger_; }
126+
127+
/**
128+
* Gets the current tracer object. Should only ever be called from within the thread itself.
129+
*/
132130
inline tracing::ThreadTracer& Tracer() { return *tracer_; }
133131
inline int64_t StartMonotonicTimeNs() const { return start_monotonic_time_ns_; }
134132
inline const ThreadConfig& Config() const noexcept { return config_; }

include/cactus_rt/tracing/thread_tracer.disabled.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ class ThreadTracer {
7777

7878
void SetTid() noexcept {}
7979

80+
void MarkDone() noexcept {}
81+
82+
void IsDone() noexcept {}
83+
8084
private:
8185
template <typename... Args>
8286
bool Emit(Args&&... /* args */) noexcept {

include/cactus_rt/tracing/thread_tracer.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <readerwriterqueue.h>
99
#include <unistd.h>
1010

11+
#include <atomic>
1112
#include <cstdint>
1213

1314
#include "../experimental/lockless/atomic_message.h"
@@ -36,6 +37,8 @@ class ThreadTracer {
3637

3738
moodycamel::ReaderWriterQueue<TrackEventInternal> queue_;
3839

40+
std::atomic_bool thread_done_;
41+
3942
// The event name interning must be done per thread (per sequence). Thus it is
4043
// stored here. However, this class must NEVER call functions here (other
4144
// than maybe .Size), as the memory allocation can occur. This variable is
@@ -84,6 +87,26 @@ class ThreadTracer {
8487
*/
8588
void SetTid() noexcept { tid_ = gettid(); }
8689

90+
/**
91+
* @brief This marks this thread tracer as "done" and thus the trace
92+
* aggregator will try to remove it after flushing the data.
93+
*
94+
* @private
95+
*/
96+
void MarkDone() noexcept {
97+
thread_done_.store(true, std::memory_order_release);
98+
}
99+
100+
/**
101+
* @brief Checks if this thread tracer is done. Should only be called from
102+
* TraceAggregator.
103+
*
104+
* @private
105+
*/
106+
bool IsDone() noexcept {
107+
return thread_done_.load(std::memory_order_acquire);
108+
}
109+
87110
private:
88111
template <typename... Args>
89112
bool Emit(Args&&... args) noexcept;

include/cactus_rt/tracing/trace_aggregator.disabled.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
namespace cactus_rt::tracing {
1212
class TraceAggregator {
1313
public:
14-
explicit TraceAggregator(std::string /* name */, std::vector<size_t> /* cpu_affinity */) {}
14+
explicit TraceAggregator(std::string /* name */) {}
1515

1616
TraceAggregator(const TraceAggregator&) = delete;
1717
TraceAggregator& operator=(const TraceAggregator&) = delete;
@@ -24,11 +24,9 @@ class TraceAggregator {
2424

2525
void DeregisterThreadTracer(const std::shared_ptr<ThreadTracer>& /* tracer */) {}
2626

27-
void Start() {};
27+
void Start(std::shared_ptr<Sink> /* sink */, std::vector<size_t> /* cpu_affinity */ = {}){};
2828

29-
void RequestStop() noexcept {}
30-
31-
void Join() noexcept {}
29+
void Stop() noexcept {}
3230
};
3331
} // namespace cactus_rt::tracing
3432
#endif

0 commit comments

Comments
 (0)