Skip to content

Commit 0b94d71

Browse files
authored
[EXPORTER] Optimize OTLP HTTP compression (open-telemetry#3178)
1 parent 2b9bff9 commit 0b94d71

File tree

4 files changed

+232
-29
lines changed

4 files changed

+232
-29
lines changed

exporters/otlp/src/otlp_http_client.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,7 @@ OtlpHttpClient::createSession(
890890
// Parse uri and store it to cache
891891
if (http_uri_.empty())
892892
{
893-
auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url));
893+
const auto parse_url = opentelemetry::ext::http::common::UrlParser(options_.url);
894894
if (!parse_url.success_)
895895
{
896896
std::string error_message = "[OTLP HTTP Client] Export failed, invalid url: " + options_.url;

ext/src/http/client/curl/http_client_curl.cc

+104-24
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33

44
#include <curl/curl.h>
55
#include <curl/curlver.h>
6+
#include <algorithm>
7+
#include <array>
68
#include <atomic>
79
#include <chrono>
8-
#include <cstddef>
910
#include <cstdint>
11+
#include <cstring>
1012
#include <list>
1113
#include <mutex>
1214
#include <string>
1315
#include <thread>
16+
#include <type_traits>
1417
#include <unordered_map>
1518
#include <unordered_set>
1619
#include <utility>
@@ -57,11 +60,85 @@ nostd::shared_ptr<HttpCurlGlobalInitializer> HttpCurlGlobalInitializer::GetInsta
5760
return shared_initializer;
5861
}
5962

63+
#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
64+
// Original source:
65+
// https://stackoverflow.com/questions/12398377/is-it-possible-to-have-zlib-read-from-and-write-to-the-same-memory-buffer/12412863#12412863
66+
int deflateInPlace(z_stream *strm, unsigned char *buf, uint32_t len, uint32_t *max_len)
67+
{
68+
// must be large enough to hold zlib or gzip header (if any) and one more byte -- 11 works for the
69+
// worst case here, but if gzip encoding is used and a deflateSetHeader() call is inserted in this
70+
// code after the deflateReset(), then the 11 needs to be increased to accommodate the resulting
71+
// gzip header size plus one
72+
std::array<unsigned char, 11> temp{};
73+
74+
// kick start the process with a temporary output buffer -- this allows deflate to consume a large
75+
// chunk of input data in order to make room for output data there
76+
strm->next_in = buf;
77+
strm->avail_in = len;
78+
if (*max_len < len)
79+
{
80+
*max_len = len;
81+
}
82+
strm->next_out = temp.data();
83+
strm->avail_out = (std::min)(static_cast<decltype(z_stream::avail_out)>(temp.size()), *max_len);
84+
auto ret = deflate(strm, Z_FINISH);
85+
if (ret == Z_STREAM_ERROR)
86+
{
87+
return ret;
88+
}
89+
90+
// if we can, copy the temporary output data to the consumed portion of the input buffer, and then
91+
// continue to write up to the start of the consumed input for as long as possible
92+
auto have = strm->next_out - temp.data(); // number of bytes in temp[]
93+
if (have <= static_cast<decltype(have)>(strm->avail_in ? len - strm->avail_in : *max_len))
94+
{
95+
std::memcpy(buf, temp.data(), have);
96+
strm->next_out = buf + have;
97+
have = 0;
98+
while (ret == Z_OK)
99+
{
100+
strm->avail_out =
101+
strm->avail_in ? strm->next_in - strm->next_out : (buf + *max_len) - strm->next_out;
102+
ret = deflate(strm, Z_FINISH);
103+
}
104+
if (ret != Z_BUF_ERROR || strm->avail_in == 0)
105+
{
106+
*max_len = strm->next_out - buf;
107+
return ret == Z_STREAM_END ? Z_OK : ret;
108+
}
109+
}
110+
111+
// the output caught up with the input due to insufficiently compressible data -- copy the
112+
// remaining input data into an allocated buffer and complete the compression from there to the
113+
// now empty input buffer (this will only occur for long incompressible streams, more than ~20 MB
114+
// for the default deflate memLevel of 8, or when *max_len is too small and less than the length
115+
// of the header plus one byte)
116+
auto hold = static_cast<std::remove_const_t<decltype(z_stream::next_in)>>(
117+
strm->zalloc(strm->opaque, strm->avail_in, 1)); // allocated buffer to hold input data
118+
if (hold == Z_NULL)
119+
{
120+
return Z_MEM_ERROR;
121+
}
122+
std::memcpy(hold, strm->next_in, strm->avail_in);
123+
strm->next_in = hold;
124+
if (have)
125+
{
126+
std::memcpy(buf, temp.data(), have);
127+
strm->next_out = buf + have;
128+
}
129+
strm->avail_out = (buf + *max_len) - strm->next_out;
130+
ret = deflate(strm, Z_FINISH);
131+
strm->zfree(strm->opaque, hold);
132+
*max_len = strm->next_out - buf;
133+
return ret == Z_OK ? Z_BUF_ERROR : (ret == Z_STREAM_END ? Z_OK : ret);
134+
}
135+
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW
136+
60137
void Session::SendRequest(
61138
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) noexcept
62139
{
63140
is_session_active_.store(true, std::memory_order_release);
64-
std::string url = host_ + std::string(http_request_->uri_);
141+
const auto &url = host_ + http_request_->uri_;
65142
auto callback_ptr = callback.get();
66143
bool reuse_connection = false;
67144

@@ -76,44 +153,47 @@ void Session::SendRequest(
76153
if (http_request_->compression_ == opentelemetry::ext::http::client::Compression::kGzip)
77154
{
78155
#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
79-
http_request_->AddHeader("Content-Encoding", "gzip");
80-
81-
opentelemetry::ext::http::client::Body compressed_body(http_request_->body_.size());
82-
z_stream zs;
83-
zs.zalloc = Z_NULL;
84-
zs.zfree = Z_NULL;
85-
zs.opaque = Z_NULL;
86-
zs.avail_in = static_cast<uInt>(http_request_->body_.size());
87-
zs.next_in = http_request_->body_.data();
88-
zs.avail_out = static_cast<uInt>(compressed_body.size());
89-
zs.next_out = compressed_body.data();
156+
z_stream zs{};
157+
zs.zalloc = Z_NULL;
158+
zs.zfree = Z_NULL;
159+
zs.opaque = Z_NULL;
90160

91161
// ZLIB: Have to maually specify 16 bits for the Gzip headers
92-
const int window_bits = 15 + 16;
162+
static constexpr int kWindowBits = MAX_WBITS + 16;
163+
static constexpr int kMemLevel = MAX_MEM_LEVEL;
93164

94-
int stream =
95-
deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY);
165+
auto stream = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, kWindowBits, kMemLevel,
166+
Z_DEFAULT_STRATEGY);
96167

97168
if (stream == Z_OK)
98169
{
99-
deflate(&zs, Z_FINISH);
100-
deflateEnd(&zs);
101-
compressed_body.resize(zs.total_out);
102-
http_request_->SetBody(compressed_body);
170+
auto size = static_cast<uInt>(http_request_->body_.size());
171+
auto max_size = size;
172+
stream = deflateInPlace(&zs, http_request_->body_.data(), size, &max_size);
173+
174+
if (stream == Z_OK)
175+
{
176+
http_request_->AddHeader("Content-Encoding", "gzip");
177+
http_request_->body_.resize(max_size);
178+
}
103179
}
104-
else
180+
181+
if (stream != Z_OK)
105182
{
106183
if (callback)
107184
{
108-
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed, "");
185+
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed,
186+
zs.msg ? zs.msg : "");
109187
}
110188
is_session_active_.store(false, std::memory_order_release);
111189
}
190+
191+
deflateEnd(&zs);
112192
#else
113193
OTEL_INTERNAL_LOG_ERROR(
114194
"[HTTP Client Curl] Set WITH_OTLP_HTTP_COMPRESSION=ON to use gzip compression with the "
115195
"OTLP HTTP Exporter");
116-
#endif
196+
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW
117197
}
118198

119199
curl_operation_.reset(new HttpOperation(
@@ -226,7 +306,7 @@ HttpClient::~HttpClient()
226306
std::shared_ptr<opentelemetry::ext::http::client::Session> HttpClient::CreateSession(
227307
nostd::string_view url) noexcept
228308
{
229-
auto parsedUrl = common::UrlParser(std::string(url));
309+
const auto parsedUrl = common::UrlParser(std::string(url));
230310
if (!parsedUrl.success_)
231311
{
232312
return std::make_shared<Session>(*this);

ext/src/http/client/curl/http_operation_curl.cc

+1-3
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,7 @@ HttpOperation::HttpOperation(opentelemetry::ext::http::client::Method method,
301301
{
302302
for (auto &kv : this->request_headers_)
303303
{
304-
std::string header = std::string(kv.first);
305-
header += ": ";
306-
header += std::string(kv.second);
304+
const auto header = std::string(kv.first).append(": ").append(kv.second);
307305
curl_resource_.headers_chunk =
308306
curl_slist_append(curl_resource_.headers_chunk, header.c_str());
309307
}

ext/test/http/curl_http_test.cc

+126-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <map>
1212
#include <memory>
1313
#include <mutex>
14+
#include <numeric>
1415
#include <string>
1516
#include <thread>
1617
#include <utility>
@@ -558,7 +559,6 @@ TEST_F(BasicCurlHttpTests, ElegantQuitQuick)
558559
ASSERT_TRUE(handler->is_called_);
559560
ASSERT_TRUE(handler->got_response_);
560561
}
561-
562562
TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
563563
{
564564
{
@@ -581,3 +581,128 @@ TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
581581
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
582582
}
583583
}
584+
585+
#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
586+
struct GzipEventHandler : public CustomEventHandler
587+
{
588+
~GzipEventHandler() override = default;
589+
590+
void OnResponse(http_client::Response & /* response */) noexcept override {}
591+
592+
void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override
593+
{
594+
is_called_ = true;
595+
state_ = state;
596+
reason_ = std::string{reason};
597+
}
598+
599+
bool is_called_ = false;
600+
http_client::SessionState state_ = static_cast<http_client::SessionState>(-1);
601+
std::string reason_;
602+
};
603+
604+
TEST_F(BasicCurlHttpTests, GzipCompressibleData)
605+
{
606+
received_requests_.clear();
607+
auto session_manager = http_client::HttpClientFactory::Create();
608+
EXPECT_TRUE(session_manager != nullptr);
609+
610+
auto session = session_manager->CreateSession("http://127.0.0.1:19000");
611+
auto request = session->CreateRequest();
612+
request->SetUri("post/");
613+
request->SetMethod(http_client::Method::Post);
614+
615+
const auto original_size = 500UL;
616+
http_client::Body body(original_size);
617+
std::iota(body.begin(), body.end(), 0);
618+
request->SetBody(body);
619+
request->AddHeader("Content-Type", "text/plain");
620+
request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip);
621+
auto handler = std::make_shared<GzipEventHandler>();
622+
session->SendRequest(handler);
623+
ASSERT_TRUE(waitForRequests(30, 1));
624+
session->FinishSession();
625+
ASSERT_TRUE(handler->is_called_);
626+
ASSERT_EQ(handler->state_, http_client::SessionState::Response);
627+
ASSERT_TRUE(handler->reason_.empty());
628+
629+
auto http_request =
630+
dynamic_cast<opentelemetry::ext::http::client::curl::Request *>(request.get());
631+
ASSERT_TRUE(http_request != nullptr);
632+
ASSERT_LT(http_request->body_.size(), original_size);
633+
634+
session_manager->CancelAllSessions();
635+
session_manager->FinishAllSessions();
636+
}
637+
638+
TEST_F(BasicCurlHttpTests, GzipIncompressibleData)
639+
{
640+
received_requests_.clear();
641+
auto session_manager = http_client::HttpClientFactory::Create();
642+
EXPECT_TRUE(session_manager != nullptr);
643+
644+
auto session = session_manager->CreateSession("http://127.0.0.1:19000");
645+
auto request = session->CreateRequest();
646+
request->SetUri("post/");
647+
request->SetMethod(http_client::Method::Post);
648+
649+
// Random data generated using code snippet below.
650+
// const auto original_size = 500UL;
651+
// http_client::Body body(original_size);
652+
// std::random_device rd;
653+
// std::mt19937 gen(rd());
654+
// std::uniform_int_distribution<> uid(1, 255);
655+
// std::generate(body.begin(), body.end(), [&]() { return uid(gen); });
656+
657+
// The input values are fixed to make the test repeatable in the event that some distributions
658+
// might yield results that are, in fact, compressible.
659+
http_client::Body body = {
660+
140, 198, 12, 56, 165, 185, 173, 20, 13, 83, 127, 223, 77, 38, 224, 43, 236, 10, 178,
661+
75, 169, 157, 136, 199, 74, 30, 148, 195, 51, 30, 225, 21, 121, 219, 7, 155, 198, 121,
662+
205, 102, 80, 38, 132, 202, 45, 229, 206, 90, 150, 202, 53, 221, 54, 37, 172, 90, 238,
663+
248, 191, 240, 109, 227, 248, 41, 251, 121, 35, 226, 107, 122, 15, 242, 203, 45, 64, 195,
664+
186, 23, 1, 158, 61, 196, 182, 26, 201, 47, 211, 241, 251, 209, 255, 170, 181, 192, 89,
665+
133, 176, 60, 178, 97, 168, 223, 152, 9, 118, 98, 169, 240, 170, 15, 13, 161, 24, 57,
666+
123, 117, 230, 30, 244, 117, 238, 255, 198, 232, 95, 148, 37, 61, 67, 103, 31, 240, 52,
667+
21, 145, 175, 201, 86, 19, 61, 228, 76, 131, 185, 111, 149, 203, 143, 16, 142, 95, 173,
668+
42, 106, 39, 203, 116, 235, 20, 162, 112, 173, 112, 70, 126, 191, 210, 219, 90, 145, 126,
669+
118, 43, 241, 101, 66, 175, 179, 5, 233, 208, 164, 180, 83, 214, 194, 173, 29, 179, 149,
670+
75, 202, 17, 152, 139, 130, 94, 247, 142, 249, 159, 224, 205, 131, 93, 82, 186, 226, 210,
671+
84, 17, 212, 155, 61, 226, 103, 152, 37, 3, 193, 216, 219, 203, 101, 99, 33, 59, 38,
672+
106, 62, 232, 127, 44, 125, 90, 169, 148, 238, 34, 106, 12, 221, 90, 173, 67, 122, 232,
673+
161, 89, 198, 43, 241, 195, 248, 219, 35, 47, 200, 11, 227, 168, 246, 243, 103, 38, 17,
674+
203, 237, 203, 158, 204, 89, 231, 19, 24, 25, 199, 160, 233, 43, 117, 144, 196, 117, 152,
675+
42, 121, 189, 217, 202, 221, 250, 157, 237, 47, 29, 64, 32, 10, 32, 243, 28, 114, 158,
676+
228, 102, 36, 191, 139, 217, 161, 162, 186, 19, 141, 212, 49, 1, 239, 153, 107, 249, 31,
677+
235, 138, 73, 80, 58, 152, 15, 149, 50, 42, 84, 75, 95, 82, 56, 86, 143, 45, 214,
678+
11, 184, 164, 181, 249, 74, 184, 26, 207, 165, 162, 240, 154, 90, 56, 175, 72, 4, 166,
679+
188, 78, 232, 87, 243, 50, 59, 62, 175, 213, 210, 182, 31, 123, 91, 118, 98, 249, 23,
680+
170, 240, 228, 236, 121, 87, 132, 129, 250, 41, 227, 204, 250, 147, 145, 109, 149, 210, 21,
681+
174, 165, 127, 234, 64, 211, 52, 93, 126, 117, 231, 216, 210, 15, 16, 2, 167, 215, 178,
682+
104, 245, 119, 211, 235, 120, 135, 202, 117, 150, 101, 94, 201, 136, 179, 205, 167, 212, 236,
683+
7, 178, 132, 228, 65, 230, 90, 171, 109, 31, 83, 31, 210, 123, 136, 76, 186, 81, 205,
684+
63, 35, 21, 121, 152, 22, 242, 199, 106, 217, 199, 211, 206, 165, 88, 77, 112, 108, 193,
685+
122, 8, 193, 74, 91, 50, 6, 156, 185, 165, 15, 92, 116, 3, 18, 244, 165, 191, 2,
686+
183, 9, 164, 116, 75, 127};
687+
const auto original_size = body.size();
688+
689+
request->SetBody(body);
690+
request->AddHeader("Content-Type", "text/plain");
691+
request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip);
692+
auto handler = std::make_shared<GzipEventHandler>();
693+
session->SendRequest(handler);
694+
ASSERT_TRUE(waitForRequests(30, 1));
695+
session->FinishSession();
696+
ASSERT_TRUE(handler->is_called_);
697+
ASSERT_EQ(handler->state_, http_client::SessionState::Response);
698+
ASSERT_TRUE(handler->reason_.empty());
699+
700+
auto http_request =
701+
dynamic_cast<opentelemetry::ext::http::client::curl::Request *>(request.get());
702+
ASSERT_TRUE(http_request != nullptr);
703+
ASSERT_EQ(http_request->body_.size(), original_size);
704+
705+
session_manager->CancelAllSessions();
706+
session_manager->FinishAllSessions();
707+
}
708+
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW

0 commit comments

Comments
 (0)