Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace aggregator decouple #90

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ cert-*,
-readability-identifier-length,
-readability-isolate-declaration,
-readability-magic-numbers,
-readability-redundant-inline-specifier'
-readability-redundant-inline-specifier,
-readability-use-anyofallof'
# TODO: Re-enable bugprone-exception-escape when no longer throwing
# https://github.com/isocpp/CppCoreGuidelines/issues/1589
WarningsAsErrors: '*'
Expand Down
2 changes: 1 addition & 1 deletion docs/imgs/trace-architecture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions docs/imgs/tracing-ownership-structure.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
49 changes: 26 additions & 23 deletions docs/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,45 +191,45 @@ reading a single global atomic boolean variable. This variable controls all
traces from all threads within the process.

Upon enabling tracing via `App::StartTraceSession`, `cactus_rt` also creates and
starts the `TraceAggregator` threads and registers the appropriate sinks. The
`App` object caches a list of known `ThreadTracers` from all the threads that
currently exists and this is passed to the newly created `TraceAggregator`.
Perfetto's file format specification indicates that the track descriptor packets
must be first written before the actual trace event packets. Thus, after the
creation of the `TraceAggregator` and `Sink` registration, a
starts the `TraceAggregator` thread. Perfetto's file format specification
indicates that the track descriptor packets must be first written before the
actual trace event packets. Thus, after starting the `TraceAggregator`, a
[`ProcessDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ProcessDescriptor)
packet is first written. Upon the registration of each of the cached
`ThreadTracers` as passed through by `App`, a
packet is first written. A
[`ThreadDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ThreadDescriptor)
packet is emitted for each thread. Then, the main loop of the `TraceAggregator`
can run which will write track event packets to the sinks.
packet is emitted for each thread that is known to the `TraceAggregator`. Then,
the main loop of the `TraceAggregator` can run which will write track event
packets to the sinks.

When tracing is disabled via `App::StopTraceSession`, the tracing enabled atomic
bool will be set to false. The system will request the `TraceAggregator` thread
to drain all data from the existing `ThreadTracers` and stop. Once this is done,
the file is closed and the `TraceAggregator` is destroyed to save resources.
the file is closed and the `TraceAggregator` states (interned data, sequence
states) are reset so they can be launched again.

#### Dynamic thread creation

Each `Thread` owns a `ThreadTracer`. However, when a thread starts, it must
notify the `App` and `TraceAggregator` (if tracing is enabled and it exists) of
its existence and thread id so a
notify the `TraceAggregator` (if tracing is enabled and it exists) of its
existence and thread id so a
[`ThreadDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ThreadDescriptor)
packet can be written to the data stream before any trace event data is written.
If tracing is not enabled and thus `TraceAggregator` is not present, the `App`
will cache the `ThreadTracers` and will pass it onto the `TraceAggregator`
if/when tracing is enabled.
If tracing is not enabled right now, the `TraceAggregator` will cache the
`ThreadTracers` so that once tracing is enabled, the `ThreadDescriptor` packet
is written out.

The `Thread` is able to communicate with the `App` by storing a non-owning
pointer to the `App`. This pointer is setup during `App::RegisterThread` so
there's no explicit dependency between `Thread` and `App` during construction.
This decision may be revisited in the future.
The `Thread` is able to communicate with the `TraceAggregator` by storing a
`weak_ptr` to the `TraceAggregator`. This pointer is set up during
`App::RegisterThread` so there's no explicit dependency between `Thread` and
`App` during construction. This decision may be revisited in the future.

#### Cleanup after thread shutdown
#### Ownership structure

TODO...
The structure is not ideal and has some problems, but works for the most part.

#### Dynamic sink registration
![Trace architecture](imgs/tracing-ownership-structure.svg)

#### Cleanup after thread shutdown

TODO...

Expand All @@ -255,6 +255,9 @@ noting:
(i.e. thus `(trusted_packet_sequence_id, iid)` is sufficient to identify an
interned string). This, along with (1), implies we have to intern strings on
a per-thread interner.
3. When a tracing session stops, the string interner states for all known thread
tracers are reset. This means a subsequent session will not have the same
string iids.

### Other notes

Expand Down
8 changes: 2 additions & 6 deletions examples/tracing_example_no_rt/main.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <cactus_rt/tracing.h>

#include <list>
#include <memory>
#include <thread>

Expand All @@ -25,18 +24,15 @@ void StartTracing(const char* app_name, const char* filename) {

// Create the file sink so the data aggregated by the TraceAggregator will be written to somewhere.
auto file_sink = std::make_shared<FileSink>(filename);
trace_aggregator->RegisterSink(file_sink);

quill::start();
trace_aggregator->Start();
trace_aggregator->Start(file_sink);
}

void StopTracing() {
cactus_rt::tracing::DisableTracing();

trace_aggregator->RequestStop();
trace_aggregator->Join();
trace_aggregator = nullptr; // Destroy the trace aggregator and free all resources.
trace_aggregator->Stop();
}

int main() {
Expand Down
27 changes: 1 addition & 26 deletions include/cactus_rt/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#include <gtest/gtest_prod.h>

#include <list>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -36,14 +35,7 @@ class App {

std::vector<std::shared_ptr<Thread>> threads_;

// We need to cache the list thread tracers here because the trace_aggregator
// can be dynamically created and stopped. When a new trace aggregator is
// created, it needs to know about all the thread tracers.
//
// TODO: investigate into a weak pointer.
std::list<std::shared_ptr<tracing::ThreadTracer>> thread_tracers_;
std::unique_ptr<tracing::TraceAggregator> trace_aggregator_ = nullptr;
std::mutex aggregator_mutex_;
std::shared_ptr<tracing::TraceAggregator> trace_aggregator_;

void SetDefaultLogFormat(quill::Config& cfg) {
// Create a handler of stdout
Expand Down Expand Up @@ -117,11 +109,6 @@ class App {
*/
bool StartTraceSession(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Register a custom trace sink after starting the trace session
*/
void RegisterTraceSink(std::shared_ptr<tracing::Sink> sink) noexcept;

/**
* @brief Stops the tracing session for the process. Will be no-op if tracing
* is not enabled. This function is not real-time safe.
Expand All @@ -148,18 +135,6 @@ class App {
void StartQuill();

private:
/**
* @brief Register a thread tracer. Should only be called from Thread::RunThread.
*/
void RegisterThreadTracer(std::shared_ptr<tracing::ThreadTracer> thread_tracer) noexcept;

/**
* @brief Remove a thread tracer. Should only be called from Thread::~Thread().
*/
void DeregisterThreadTracer(const std::shared_ptr<tracing::ThreadTracer>& thread_tracer) noexcept;

void CreateAndStartTraceAggregator(std::shared_ptr<tracing::Sink> sink) noexcept;

void StopTraceAggregator() noexcept;
};
} // namespace cactus_rt
Expand Down
28 changes: 13 additions & 15 deletions include/cactus_rt/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,25 @@
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "config.h"
#include "quill/Quill.h"
#include "tracing/thread_tracer.h"
#include "tracing/trace_aggregator.h"

namespace cactus_rt {

/// @private
constexpr size_t kDefaultStackSize = 8 * 1024 * 1024; // 8MB default stack space should be plenty

// Necessary forward declaration
class App;

class Thread {
ThreadConfig config_;
std::string name_;
std::vector<size_t> cpu_affinity_;
size_t stack_size_;

quill::Logger* logger_;
std::shared_ptr<tracing::ThreadTracer> tracer_;
std::shared_ptr<tracing::ThreadTracer> tracer_ = nullptr;

std::atomic_bool stop_requested_ = false;

Expand All @@ -41,12 +38,10 @@ class Thread {
*/
static void* RunThread(void* data);

friend class App;

// Non-owning App pointer. Used only for notifying that the thread has
// started/stopped for tracing purposes. Set by Thread::Start and read at
// Non-owning TraceAggregator pointer. Used only for notifying that the thread
// has started/stopped for tracing purposes. Set by Thread::Start and read at
// the beginning of Thread::RunThread.
App* app_ = nullptr;
std::weak_ptr<tracing::TraceAggregator> trace_aggregator_;

public:
/**
Expand All @@ -60,8 +55,7 @@ class Thread {
name_(name),
cpu_affinity_(config_.cpu_affinity),
stack_size_(static_cast<size_t>(PTHREAD_STACK_MIN) + config_.stack_size),
logger_(quill::create_logger(name_)),
tracer_(std::make_shared<tracing::ThreadTracer>(name, config_.tracer_config.queue_size)) {
logger_(quill::create_logger(name_)) {
if (!config.scheduler) {
throw std::runtime_error("ThreadConfig::scheduler cannot be nullptr");
}
Expand Down Expand Up @@ -123,12 +117,16 @@ class Thread {
*
* @private
*/
inline void SetApp(App* app) {
app_ = app;
inline void SetTraceAggregator(std::weak_ptr<tracing::TraceAggregator> trace_aggregator) {
trace_aggregator_ = trace_aggregator;
}

protected:
inline quill::Logger* Logger() const { return logger_; }
inline quill::Logger* Logger() const { return logger_; }

/**
* Gets the current tracer object. Should only ever be called from within the thread itself.
*/
inline tracing::ThreadTracer& Tracer() { return *tracer_; }
inline int64_t StartMonotonicTimeNs() const { return start_monotonic_time_ns_; }
inline const ThreadConfig& Config() const noexcept { return config_; }
Expand Down
4 changes: 4 additions & 0 deletions include/cactus_rt/tracing/thread_tracer.disabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class ThreadTracer {

void SetTid() noexcept {}

void MarkDone() noexcept {}

void IsDone() noexcept {}

private:
template <typename... Args>
bool Emit(Args&&... /* args */) noexcept {
Expand Down
23 changes: 23 additions & 0 deletions include/cactus_rt/tracing/thread_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <readerwriterqueue.h>
#include <unistd.h>

#include <atomic>
#include <cstdint>

#include "../experimental/lockless/atomic_message.h"
Expand Down Expand Up @@ -36,6 +37,8 @@ class ThreadTracer {

moodycamel::ReaderWriterQueue<TrackEventInternal> queue_;

std::atomic_bool thread_done_;

// The event name interning must be done per thread (per sequence). Thus it is
// stored here. However, this class must NEVER call functions here (other
// than maybe .Size), as the memory allocation can occur. This variable is
Expand Down Expand Up @@ -84,6 +87,26 @@ class ThreadTracer {
*/
void SetTid() noexcept { tid_ = gettid(); }

/**
* @brief This marks this thread tracer as "done" and thus the trace
* aggregator will try to remove it after flushing the data.
*
* @private
*/
void MarkDone() noexcept {
thread_done_.store(true, std::memory_order_release);
}

/**
* @brief Checks if this thread tracer is done. Should only be called from
* TraceAggregator.
*
* @private
*/
bool IsDone() noexcept {
return thread_done_.load(std::memory_order_acquire);
}

private:
template <typename... Args>
bool Emit(Args&&... args) noexcept;
Expand Down
8 changes: 3 additions & 5 deletions include/cactus_rt/tracing/trace_aggregator.disabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace cactus_rt::tracing {
class TraceAggregator {
public:
explicit TraceAggregator(std::string /* name */, std::vector<size_t> /* cpu_affinity */) {}
explicit TraceAggregator(std::string /* name */) {}

TraceAggregator(const TraceAggregator&) = delete;
TraceAggregator& operator=(const TraceAggregator&) = delete;
Expand All @@ -24,11 +24,9 @@ class TraceAggregator {

void DeregisterThreadTracer(const std::shared_ptr<ThreadTracer>& /* tracer */) {}

void Start() {};
void Start(std::shared_ptr<Sink> /* sink */, std::vector<size_t> /* cpu_affinity */ = {}){};

void RequestStop() noexcept {}

void Join() noexcept {}
void Stop() noexcept {}
};
} // namespace cactus_rt::tracing
#endif
Loading
Loading