Skip to content

Commit

Permalink
Merge branch '2.18' into backport-11759-to-2.18
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdinur authored Jan 2, 2025
2 parents 15a89f5 + c0bb6bd commit 5bdc811
Show file tree
Hide file tree
Showing 26 changed files with 515 additions and 118 deletions.
22 changes: 13 additions & 9 deletions ddtrace/contrib/internal/asgi/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ async def _blocked_asgi_app(scope, receive, send):
await send({"type": "http.response.body", "body": b""})


def _parse_response_cookies(response_headers):
cookies = {}
try:
result = response_headers.get("set-cookie", "").split("=", maxsplit=1)
if len(result) == 2:
cookie_key, cookie_value = result
cookies[cookie_key] = cookie_value
except Exception:
log.debug("failed to extract response cookies", exc_info=True)
return cookies


class TraceMiddleware:
"""
ASGI application middleware that traces the requests.
Expand Down Expand Up @@ -211,7 +223,6 @@ async def __call__(self, scope, receive, send):
peer_ip = client[0]
else:
peer_ip = None

trace_utils.set_http_meta(
span,
self.integration_config,
Expand All @@ -234,15 +245,8 @@ async def wrapped_send(message):
except Exception:
log.warning("failed to extract response headers", exc_info=True)
response_headers = None

if span and message.get("type") == "http.response.start" and "status" in message:
cookies = {}
try:
cookie_key, cookie_value = response_headers.get("set-cookie", "").split("=", maxsplit=1)
cookies[cookie_key] = cookie_value
except Exception:
log.debug("failed to extract response cookies", exc_info=True)

cookies = _parse_response_cookies(response_headers)
status_code = message["status"]
trace_utils.set_http_meta(
span,
Expand Down
11 changes: 0 additions & 11 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
if task_span:
task_span.set_exc_info(*sys.exc_info())

prerun_span = core.get_item("prerun_span")
if prerun_span:
prerun_span.set_exc_info(*sys.exc_info())

raise
finally:
task_span = core.get_item("task_span")
Expand All @@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
)
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
log.debug(
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
)
prerun_span.finish()

return _traced_apply_async_inner
3 changes: 0 additions & 3 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class UploaderBuilder
static void set_output_filename(std::string_view _output_filename);

static std::variant<Uploader, std::string> build();

static void postfork_child();
};

} // namespace Datadog
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ namespace Datadog {
void
Datadog::CodeProvenance::postfork_child()
{
get_instance().mtx.~mutex(); // Destroy the mutex
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&get_instance().mtx) std::mutex(); // Recreate the mutex
get_instance().reset(); // Reset the state
}

void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ddup_postfork_child()
Datadog::Uploader::postfork_child();
Datadog::SampleManager::postfork_child();
Datadog::CodeProvenance::postfork_child();
Datadog::UploaderBuilder::postfork_child();
}

void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@ Datadog::Profile::collect(const ddog_prof_Sample& sample, int64_t endtime_ns)
void
Datadog::Profile::postfork_child()
{
profile_mtx.unlock();
new (&profile_mtx) std::mutex();
cycle_buffers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,6 @@ Datadog::Uploader::postfork_parent()
void
Datadog::Uploader::postfork_child()
{
unlock();
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&upload_lock) std::mutex();
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,10 @@ Datadog::UploaderBuilder::build()

return Datadog::Uploader{ output_filename, ddog_exporter };
}

void
Datadog::UploaderBuilder::postfork_child()
{
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&tag_mutex) std::mutex();
}
5 changes: 5 additions & 0 deletions ddtrace/internal/datadog/profiling/stack_v2/src/sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ _stack_v2_atfork_child()
// so we don't even reveal this function to the user
_set_pid(getpid());
ThreadSpanLinks::postfork_child();

// `thread_info_map_lock` and `task_link_map_lock` are global locks held in echion
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&thread_info_map_lock) std::mutex;
new (&task_link_map_lock) std::mutex;
}

__attribute__((constructor)) void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ ThreadSpanLinks::reset()
void
ThreadSpanLinks::postfork_child()
{
// Explicitly destroy and reconstruct the mutex to avoid undefined behavior
get_instance().mtx.~mutex();
// NB placement-new to re-init and leak the mutex because doing anything else is UB
new (&get_instance().mtx) std::mutex();

get_instance().reset();
}

Expand Down
125 changes: 84 additions & 41 deletions ddtrace/profiling/collector/_memalloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,47 +42,95 @@ static PyObject* object_string = NULL;

#define ALLOC_TRACKER_MAX_COUNT UINT64_MAX

// The data coordination primitives in this and related files are related to a crash we started seeing.
// We don't have a precise understanding of the causal factors within the runtime that lead to this condition,
// since the GIL alone was sufficient in the past for preventing this issue.
// We add an option here to _add_ a crash, in order to observe this condition in a future diagnostic iteration.
// **This option is _intended_ to crash the Python process** do not use without a good reason!
static char g_crash_on_mutex_pass_str[] = "_DD_PROFILING_MEMALLOC_CRASH_ON_MUTEX_PASS";
static const char* g_truthy_values[] = { "1", "true", "yes", "on", "enable", "enabled", NULL }; // NB the sentinel NULL
static memlock_t g_memalloc_lock;

static alloc_tracker_t* global_alloc_tracker;

// This is a multiplatform way to define an operation to happen at static initialization time
static void
memalloc_init(void);

#ifdef _MSC_VER
#pragma section(".CRT$XCU", read)
__declspec(allocate(".CRT$XCU")) void (*memalloc_init_func)(void) = memalloc_init;

#elif defined(__GNUC__) || defined(__clang__)
__attribute__((constructor))
#else
#error Unsupported compiler
#endif
static void
memalloc_init()
{
// Check if we should crash the process on mutex pass
char* crash_on_mutex_pass_str = getenv(g_crash_on_mutex_pass_str);
bool crash_on_mutex_pass = false;
if (crash_on_mutex_pass_str) {
for (int i = 0; g_truthy_values[i]; i++) {
if (strcmp(crash_on_mutex_pass_str, g_truthy_values[i]) == 0) {
crash_on_mutex_pass = true;
break;
}
}
}
memlock_init(&g_memalloc_lock, crash_on_mutex_pass);
}

static void
memalloc_add_event(memalloc_context_t* ctx, void* ptr, size_t size)
{
/* Do not overflow; just ignore the new events if we ever reach that point */
if (global_alloc_tracker->alloc_count >= ALLOC_TRACKER_MAX_COUNT)
uint64_t alloc_count = atomic_add_clamped(&global_alloc_tracker->alloc_count, 1, ALLOC_TRACKER_MAX_COUNT);

/* Return if we've reached the maximum number of allocations */
if (alloc_count == 0)
return;

global_alloc_tracker->alloc_count++;
// Return if we can't take the guard
if (!memalloc_take_guard()) {
return;
}

/* Avoid loops */
if (memalloc_get_reentrant())
// In this implementation, the `global_alloc_tracker` isn't intrinsically protected. Before we read or modify,
// take the lock. The count of allocations is already forward-attributed elsewhere, so if we can't take the lock
// there's nothing to do.
if (!memlock_trylock(&g_memalloc_lock)) {
return;
}

/* Determine if we can capture or if we need to sample */
if (global_alloc_tracker->allocs.count < ctx->max_events) {
/* set a barrier so we don't loop as getting a traceback allocates memory */
memalloc_set_reentrant(true);
/* Buffer is not full, fill it */
traceback_t* tb = memalloc_get_traceback(ctx->max_nframe, ptr, size, ctx->domain);
memalloc_set_reentrant(false);
if (tb)
if (tb) {
traceback_array_append(&global_alloc_tracker->allocs, tb);
}
} else {
/* Sampling mode using a reservoir sampling algorithm: replace a random
* traceback with this one */
uint64_t r = random_range(global_alloc_tracker->alloc_count);
uint64_t r = random_range(alloc_count);

if (r < ctx->max_events) {
/* set a barrier so we don't loop as getting a traceback allocates memory */
memalloc_set_reentrant(true);
// In addition to event size, need to check that the tab is in a good state
if (r < ctx->max_events && global_alloc_tracker->allocs.tab != NULL) {
/* Replace a random traceback with this one */
traceback_t* tb = memalloc_get_traceback(ctx->max_nframe, ptr, size, ctx->domain);
memalloc_set_reentrant(false);

// Need to check not only that the tb returned
if (tb) {
traceback_free(global_alloc_tracker->allocs.tab[r]);
global_alloc_tracker->allocs.tab[r] = tb;
}
}
}

memlock_unlock(&g_memalloc_lock);
memalloc_yield_guard();
}

static void
Expand All @@ -98,12 +146,6 @@ memalloc_free(void* ctx, void* ptr)
alloc->free(alloc->ctx, ptr);
}

#ifdef _PY37_AND_LATER
Py_tss_t memalloc_reentrant_key = Py_tss_NEEDS_INIT;
#else
int memalloc_reentrant_key = -1;
#endif

static void*
memalloc_alloc(int use_calloc, void* ctx, size_t nelem, size_t elsize)
{
Expand Down Expand Up @@ -233,7 +275,10 @@ memalloc_start(PyObject* Py_UNUSED(module), PyObject* args)

global_memalloc_ctx.domain = PYMEM_DOMAIN_OBJ;

global_alloc_tracker = alloc_tracker_new();
if (memlock_trylock(&g_memalloc_lock)) {
global_alloc_tracker = alloc_tracker_new();
memlock_unlock(&g_memalloc_lock);
}

PyMem_GetAllocator(PYMEM_DOMAIN_OBJ, &global_memalloc_ctx.pymem_allocator_obj);
PyMem_SetAllocator(PYMEM_DOMAIN_OBJ, &alloc);
Expand All @@ -258,8 +303,11 @@ memalloc_stop(PyObject* Py_UNUSED(module), PyObject* Py_UNUSED(args))

PyMem_SetAllocator(PYMEM_DOMAIN_OBJ, &global_memalloc_ctx.pymem_allocator_obj);
memalloc_tb_deinit();
alloc_tracker_free(global_alloc_tracker);
global_alloc_tracker = NULL;
if (memlock_trylock(&g_memalloc_lock)) {
alloc_tracker_free(global_alloc_tracker);
global_alloc_tracker = NULL;
memlock_unlock(&g_memalloc_lock);
}

memalloc_heap_tracker_deinit();

Expand Down Expand Up @@ -310,9 +358,15 @@ iterevents_new(PyTypeObject* type, PyObject* Py_UNUSED(args), PyObject* Py_UNUSE
if (!iestate)
return NULL;

iestate->alloc_tracker = global_alloc_tracker;
/* reset the current traceback list */
global_alloc_tracker = alloc_tracker_new();
if (memlock_trylock(&g_memalloc_lock)) {
iestate->alloc_tracker = global_alloc_tracker;
global_alloc_tracker = alloc_tracker_new();
memlock_unlock(&g_memalloc_lock);
} else {
Py_TYPE(iestate)->tp_free(iestate);
return NULL;
}
iestate->seq_index = 0;

PyObject* iter_and_count = PyTuple_New(3);
Expand All @@ -326,8 +380,11 @@ iterevents_new(PyTypeObject* type, PyObject* Py_UNUSED(args), PyObject* Py_UNUSE
static void
iterevents_dealloc(IterEventsState* iestate)
{
alloc_tracker_free(iestate->alloc_tracker);
Py_TYPE(iestate)->tp_free(iestate);
if (memlock_trylock(&g_memalloc_lock)) {
alloc_tracker_free(iestate->alloc_tracker);
Py_TYPE(iestate)->tp_free(iestate);
memlock_unlock(&g_memalloc_lock);
}
}

static PyObject*
Expand Down Expand Up @@ -442,20 +499,6 @@ PyInit__memalloc(void)
return NULL;
}

#ifdef _PY37_AND_LATER
if (PyThread_tss_create(&memalloc_reentrant_key) != 0) {
#else
memalloc_reentrant_key = PyThread_create_key();
if (memalloc_reentrant_key == -1) {
#endif
#ifdef MS_WINDOWS
PyErr_SetFromWindowsErr(0);
#else
PyErr_SetFromErrno(PyExc_OSError);
#endif
return NULL;
}

if (PyType_Ready(&MemallocIterEvents_Type) < 0)
return NULL;
Py_INCREF((PyObject*)&MemallocIterEvents_Type);
Expand Down
Loading

0 comments on commit 5bdc811

Please sign in to comment.