Skip to content

Commit

Permalink
Significantly refactored trace aggregator
Browse files Browse the repository at this point in the history
Previously we were leaking `ThreadTracer` objects in the
`TraceAggregator` as creating new threads means `ThreadTracer` gets
pushed into `TraceAggregator` but it is never removed. This causes a
memory leak and also makes the `TraceAggregator` slower.

This refactors the entire code to make this work. Some of the highlights
are:

- Removed the feature of dynamically adding in trace `Sink`s. `Sink` can
  now only be specified when the trace session is started. Since we
  don't really have any use case of dynamically hooking in sinks, this
  feature is a lot of complexity for no reason.
  - This also removed the sticky packet feature within the
    `TraceAggregator` which saved additional complexity.
- Remove the feature of the `TraceAggregator` mirroring data to multiple
  `Sink`s. This is also unnecessary. Could create a `MultiSink` feature
  if necessary to emulate this. So now, when you start a trace session,
  you must give a single sink for the data to be pushed to.
- `TraceAggregator` is now a permanent object (as a `shared_ptr`) on the
  `App` instead of being dynamically created and deleted when the trace
  session is started and stopped. This skips the need of having App
  cache the list of `ThreadTracer`s and pass it to the `TraceAggregator`
  during its construction when the trace session starts.
  - Instead, the `TraceAggregator` internally has a `SessionData` object
    (`session_`). This object is recreated and deleted when the trace
    session starts and stops.
  - Since the `TraceAggregator` is permanent now, the `Thread` directly
    register the `ThreadTracer` with the `TraceAggregator` during its
    start up procedure. This replaces registering through the `App`. The
    `Thread` are also now holding a `weak_ptr` to the `TraceAggregator`.
- When a thread shuts down, the `ThreadTracer` is marked as "done". The
  `TraceAggregator` will check for this "done" status if no more events
  are available from the queue. If it is done, the `ThreadTracer` will
  be removed from the `TraceAggregator`.
- `TraceAggregator` no longer supports `RequestStop` and `Join`. This is
  replaced with a single `Stop` call which will wait for the
  `TraceAggregator` to fully stop and reset the state of the
  TraceAggregator (and any registered `ThreadTracer`'s string interner).
  - Right now, there is a potential data race during the
    `TraceAggregator.Stop`, as we access `session_` without a lock. This
    is most likely OK as we don't expect `TraceAggregator.Stop` to be
    called from multiple threads or rapidly recreated for now. **Should
    probably fix it in the future.**
- String interner states are now reset when a trace session stops. This
  means if another trace session is started, the strings it remembers
  are reset. The id starting positions are also reset.
  - Since we no longer have sticky packets, we also no longer emit a
    trace event packet that contains interned data from a previous trace
    session.
  • Loading branch information
shuhaowu committed Jul 24, 2024
1 parent 2f1cb13 commit 86cf4b6
Show file tree
Hide file tree
Showing 17 changed files with 264 additions and 414 deletions.
3 changes: 2 additions & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ cert-*,
-readability-identifier-length,
-readability-isolate-declaration,
-readability-magic-numbers,
-readability-redundant-inline-specifier'
-readability-redundant-inline-specifier,
-readability-use-anyofallof'
# TODO: Re-enable bugprone-exception-escape when no longer throwing
# https://github.com/isocpp/CppCoreGuidelines/issues/1589
WarningsAsErrors: '*'
Expand Down
2 changes: 1 addition & 1 deletion docs/imgs/trace-architecture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions docs/imgs/tracing-ownership-structure.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
49 changes: 26 additions & 23 deletions docs/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,45 +191,45 @@ reading a single global atomic boolean variable. This variable controls all
traces from all threads within the process.

Upon enabling tracing via `App::StartTraceSession`, `cactus_rt` also creates and
starts the `TraceAggregator` threads and registers the appropriate sinks. The
`App` object caches a list of known `ThreadTracers` from all the threads that
currently exists and this is passed to the newly created `TraceAggregator`.
Perfetto's file format specification indicates that the track descriptor packets
must be first written before the actual trace event packets. Thus, after the
creation of the `TraceAggregator` and `Sink` registration, a
starts the `TraceAggregator` thread. Perfetto's file format specification
indicates that the track descriptor packets must be first written before the
actual trace event packets. Thus, after starting the `TraceAggregator`, a
[`ProcessDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ProcessDescriptor)
packet is first written. Upon the registration of each of the cached
`ThreadTracers` as passed through by `App`, a
packet is first written. A
[`ThreadDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ThreadDescriptor)
packet is emitted for each thread. Then, the main loop of the `TraceAggregator`
can run which will write track event packets to the sinks.
packet is emitted for each thread that is known to the `TraceAggregator`. Then,
the main loop of the `TraceAggregator` can run which will write track event
packets to the sinks.

When tracing is disabled via `App::StopTraceSession`, the tracing enabled atomic
bool will be set to false. The system will request the `TraceAggregator` thread
to drain all data from the existing `ThreadTracers` and stop. Once this is done,
the file is closed and the `TraceAggregator` is destroyed to save resources.
the file is closed and the `TraceAggregator` states (interned data, sequence
states) are reset so they can be launched again.

#### Dynamic thread creation

Each `Thread` owns a `ThreadTracer`. However, when a thread starts, it must
notify the `App` and `TraceAggregator` (if tracing is enabled and it exists) of
its existence and thread id so a
notify the `TraceAggregator` (if tracing is enabled and it exists) of its
existence and thread id so a
[`ThreadDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ThreadDescriptor)
packet can be written to the data stream before any trace event data is written.
If tracing is not enabled and thus `TraceAggregator` is not present, the `App`
will cache the `ThreadTracers` and will pass it onto the `TraceAggregator`
if/when tracing is enabled.
If tracing is not enabled right now, the `TraceAggregator` will cache the
`ThreadTracers` so that once tracing is enabled, the `ThreadDescriptor` packet
is written out.

The `Thread` is able to communicate with the `App` by storing a non-owning
pointer to the `App`. This pointer is setup during `App::RegisterThread` so
there's no explicit dependency between `Thread` and `App` during construction.
This decision may be revisited in the future.
The `Thread` is able to communicate with the `TraceAggregator` by storing a
`weak_ptr` to the `TraceAggregator`. This pointer is setup during
`App::RegisterThread` so there's no explicit dependency between `Thread` and
`App` during construction. This decision may be revisited in the future.

#### Cleanup after thread shutdown
#### Ownership structure

TODO...
The structure is not ideal and has some problems, but works for the most part.

#### Dynamic sink registration
![Trace architecture](imgs/tracing-ownership-structure.svg)

#### Cleanup after thread shutdown

TODO...

Expand All @@ -255,6 +255,9 @@ noting:
(i.e. thus `(trusted_packet_sequence_id, iid)` is sufficient to identify an
interned string). This, along with (1), implies we have to intern strings on
a per-thread interner.
3. When a tracing session stop, the string interner states for all known thread
tracers are reset. This means a subsequent session will not have the same
string iids.

### Other notes

Expand Down
8 changes: 2 additions & 6 deletions examples/tracing_example_no_rt/main.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <cactus_rt/tracing.h>

#include <list>
#include <memory>
#include <thread>

Expand All @@ -25,18 +24,15 @@ void StartTracing(const char* app_name, const char* filename) {

// Create the file sink so the data aggregated by the TraceAggregator will be written to somewhere.
auto file_sink = std::make_shared<FileSink>(filename);
trace_aggregator->RegisterSink(file_sink);

quill::start();
trace_aggregator->Start();
trace_aggregator->Start(file_sink);
}

void StopTracing() {
cactus_rt::tracing::DisableTracing();

trace_aggregator->RequestStop();
trace_aggregator->Join();
trace_aggregator = nullptr; // Destroy the trace aggregator and free all resources.
trace_aggregator->Stop();
}

int main() {
Expand Down
27 changes: 1 addition & 26 deletions include/cactus_rt/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#include <gtest/gtest_prod.h>

#include <list>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -36,14 +35,7 @@ class App {

std::vector<std::shared_ptr<Thread>> threads_;

// We need to cache the list thread tracers here because the trace_aggregator
// can be dynamically created and stopped. When a new trace aggregator is
// created, it needs to know about all the thread tracers.
//
// TODO: investigate into a weak pointer.
std::list<std::shared_ptr<tracing::ThreadTracer>> thread_tracers_;
std::unique_ptr<tracing::TraceAggregator> trace_aggregator_ = nullptr;
std::mutex aggregator_mutex_;
std::shared_ptr<tracing::TraceAggregator> trace_aggregator_;

void SetDefaultLogFormat(quill::Config& cfg) {
// Create a handler of stdout
Expand Down Expand Up @@ -117,11 +109,6 @@ class App {
*/
bool StartTraceSession(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Register a custom trace sink after starting the trace session
*/
void RegisterTraceSink(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Stops the tracing session for the process. Will be no-op if tracing
* is not enabled. This function is not real-time safe.
Expand All @@ -148,18 +135,6 @@ class App {
void StartQuill();

private:
/**
* @brief Register a thread tracer. Should only be called from Thread::RunThread.
*/
void RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tracer) noexcept;

/**
* @brief Remove a thread tracer. Should only be called from Thread::~Thread().
*/
void DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept;

void CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noexcept;

void StopTraceAggregator() noexcept;
};
} // namespace cactus_rt
Expand Down
28 changes: 13 additions & 15 deletions include/cactus_rt/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,25 @@
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "config.h"
#include "quill/Quill.h"
#include "tracing/thread_tracer.h"
#include "tracing/trace_aggregator.h"

namespace cactus_rt {

/// @private
constexpr size_t kDefaultStackSize = 8 * 1024 * 1024; // 8MB default stack space should be plenty

// Necessary forward declaration
class App;

class Thread {
ThreadConfig config_;
std::string name_;
std::vector<size_t> cpu_affinity_;
size_t stack_size_;

quill::Logger* logger_;
std::shared_ptr<tracing::ThreadTracer> tracer_;
std::shared_ptr<tracing::ThreadTracer> tracer_ = nullptr;

std::atomic_bool stop_requested_ = false;

Expand All @@ -41,12 +38,10 @@ class Thread {
*/
static void* RunThread(void* data);

friend class App;

// Non-owning App pointer. Used only for notifying that the thread has
// started/stopped for tracing purposes. Set by Thread::Start and read at
// Non-owning TraceAggregator pointer. Used only for notifying that the thread
// has started/stopped for tracing purposes. Set by Thread::Start and read at
// the beginning of Thread::RunThread.
App* app_ = nullptr;
std::weak_ptr<tracing::TraceAggregator> trace_aggregator_;

public:
/**
Expand All @@ -60,8 +55,7 @@ class Thread {
name_(name),
cpu_affinity_(config_.cpu_affinity),
stack_size_(static_cast<size_t>(PTHREAD_STACK_MIN) + config_.stack_size),
logger_(quill::create_logger(name_)),
tracer_(std::make_shared<tracing::ThreadTracer>(name, config_.tracer_config.queue_size)) {
logger_(quill::create_logger(name_)) {
if (!config.scheduler) {
throw std::runtime_error("ThreadConfig::scheduler cannot be nullptr");
}
Expand Down Expand Up @@ -123,12 +117,16 @@ class Thread {
*
* @private
*/
inline void SetApp(App* app) {
app_ = app;
inline void SetTraceAggregator(std::weak_ptr<tracing::TraceAggregator> trace_aggregator) {
trace_aggregator_ = trace_aggregator;
}

protected:
inline quill::Logger* Logger() const { return logger_; }
inline quill::Logger* Logger() const { return logger_; }

/**
* Gets the current tracer object. Should only ever be called from within the thread itself.
*/
inline tracing::ThreadTracer& Tracer() { return *tracer_; }
inline int64_t StartMonotonicTimeNs() const { return start_monotonic_time_ns_; }
inline const ThreadConfig& Config() const noexcept { return config_; }
Expand Down
4 changes: 4 additions & 0 deletions include/cactus_rt/tracing/thread_tracer.disabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class ThreadTracer {

void SetTid() noexcept {}

void MarkDone() noexcept {}

void IsDone() noexcept {}

private:
template <typename... Args>
bool Emit(Args&&... /* args */) noexcept {
Expand Down
23 changes: 23 additions & 0 deletions include/cactus_rt/tracing/thread_tracer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CACTUS_TRACING_THREAD_TRACER_H_
#define CACTUS_TRACING_THREAD_TRACER_H_

#include <atomic>
#ifndef CACTUS_RT_TRACING_ENABLED
#include "thread_tracer.disabled.h"
#else
Expand Down Expand Up @@ -36,6 +37,8 @@ class ThreadTracer {

moodycamel::ReaderWriterQueue<TrackEventInternal> queue_;

std::atomic_bool thread_done_;

// The event name interning must be done per thread (per sequence). Thus it is
// stored here. However, this class must NEVER call functions here (other
// than maybe .Size), as the memory allocation can occur. This variable is
Expand Down Expand Up @@ -84,6 +87,26 @@ class ThreadTracer {
*/
void SetTid() noexcept { tid_ = gettid(); }

/**
* @brief This marks this thread tracer as "done" and thus the trace
* aggregator will try to remove it after flushing the data.
*
* @private
*/
void MarkDone() noexcept {
thread_done_.store(true, std::memory_order_release);
}

/**
* @brief Checks if this thread tracer is done. Should only be called from
* TraceAggregator.
*
* @private
*/
bool IsDone() noexcept {
return thread_done_.load(std::memory_order_acquire);
}

private:
template <typename... Args>
bool Emit(Args&&... args) noexcept;
Expand Down
8 changes: 3 additions & 5 deletions include/cactus_rt/tracing/trace_aggregator.disabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace cactus_rt::tracing {
class TraceAggregator {
public:
explicit TraceAggregator(std::string /* name */, std::vector<size_t> /* cpu_affinity */) {}
explicit TraceAggregator(std::string /* name */) {}

TraceAggregator(const TraceAggregator&) = delete;
TraceAggregator& operator=(const TraceAggregator&) = delete;
Expand All @@ -24,11 +24,9 @@ class TraceAggregator {

void DeregisterThreadTracer(const std::shared_ptr<ThreadTracer>& /* tracer */) {}

void Start() {};
void Start(std::shared_ptr<Sink> /* sink */, std::vector<size_t> cpu_affinity = {}) {};

void RequestStop() noexcept {}

void Join() noexcept {}
void Stop() noexcept {}
};
} // namespace cactus_rt::tracing
#endif
Loading

0 comments on commit 86cf4b6

Please sign in to comment.