Skip to content

Commit 558494d

Browse files
author
Francisco Facioni
committed
Adds support for compressing websockets
1 parent 49edf89 commit 558494d

File tree

3 files changed

+165
-8
lines changed

3 files changed

+165
-8
lines changed

include/crow/settings.h

+3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
/* #ifdef - enables ssl */
1212
//#define CROW_ENABLE_SSL
1313

14+
/* #ifdef - enables websocket compression */
15+
//#define CROW_ENABLE_WEBSOCKET_COMPRESSION
16+
1417
/* #define - specifies log level */
1518
/*
1619
Debug = 0

include/crow/websocket.h

+45-8
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#pragma once
22
#include <boost/algorithm/string/predicate.hpp>
33
#include <boost/array.hpp>
4+
#include <boost/algorithm/string/split.hpp>
45
#include "crow/socket_adaptors.h"
56
#include "crow/http_request.h"
67
#include "crow/TinySHA1.hpp"
8+
#include "crow/zlib.hpp"
79

810
namespace crow
911
{
@@ -61,7 +63,18 @@ namespace crow
6163
return;
6264
}
6365
}
64-
66+
#ifdef CROW_ENABLE_WEBSOCKET_COMPRESSION
67+
std::string extensionsHeader = req.get_header_value("Sec-WebSocket-Extensions");
68+
std::vector<std::string> extensions;
69+
boost::split(extensions, extensionsHeader, boost::is_any_of(";"));
70+
if (std::find(extensions.begin(), extensions.end(), "permessage-deflate") != extensions.end())
71+
{
72+
bool reset_compressor_on_send_ = std::find(extensions.begin(), extensions.end(), "server_no_context_takeover") != extensions.end();
73+
compressor_.reset(new zlib_compressor(reset_compressor_on_send_, true, 15, Z_BEST_COMPRESSION, 8, Z_DEFAULT_STRATEGY));
74+
bool reset_decompressor_on_send_ = std::find(extensions.begin(), extensions.end(), "client_no_context_takeover") != extensions.end();
75+
decompressor_.reset(new zlib_decompressor(reset_decompressor_on_send_, true, 15));
76+
}
77+
#endif
6578
// Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
6679
// Sec-WebSocket-Version: 13
6780
std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
@@ -98,19 +111,23 @@ namespace crow
98111
void send_binary(const std::string& msg) override
99112
{
100113
dispatch([this, msg]{
101-
auto header = build_header(2, msg.size());
114+
std::string msg_ = compressor_ ? compressor_->compress(msg) : msg;
115+
auto header = build_header(2, msg_.size());
116+
if (compressor_) header[0] += 0x40;
102117
write_buffers_.emplace_back(std::move(header));
103-
write_buffers_.emplace_back(msg);
118+
write_buffers_.emplace_back(msg_);
104119
do_write();
105120
});
106121
}
107122

108123
void send_text(const std::string& msg) override
109124
{
110125
dispatch([this, msg]{
111-
auto header = build_header(1, msg.size());
126+
std::string msg_ = compressor_ ? compressor_->compress(msg) : msg;
127+
auto header = build_header(1, msg_.size());
128+
if (compressor_) header[0] += 0x40;
112129
write_buffers_.emplace_back(std::move(header));
113-
write_buffers_.emplace_back(msg);
130+
write_buffers_.emplace_back(msg_);
114131
do_write();
115132
});
116133
}
@@ -167,6 +184,16 @@ namespace crow
167184
write_buffers_.emplace_back(header);
168185
write_buffers_.emplace_back(std::move(hello));
169186
write_buffers_.emplace_back(crlf);
187+
if (compressor_ && decompressor_) {
188+
write_buffers_.emplace_back(
189+
"Sec-WebSocket-Extensions: permessage-deflate"
190+
"; server_max_window_bits=" + std::to_string(decompressor_->window_bits) +
191+
"; client_max_window_bits=" + std::to_string(compressor_->window_bits) +
192+
(compressor_->reset_before_compress ? "; server_no_context_takeover": "") +
193+
(decompressor_->reset_before_decompress ? "; client_no_context_takeover": "")
194+
);
195+
write_buffers_.emplace_back(crlf);
196+
}
170197
write_buffers_.emplace_back(crlf);
171198
do_write();
172199
if (open_handler_)
@@ -368,6 +395,11 @@ namespace crow
368395
return mini_header_ & 0x8000;
369396
}
370397

398+
bool is_compressed()
399+
{
400+
return mini_header_ & 0x4000;
401+
}
402+
371403
int opcode()
372404
{
373405
return (mini_header_ & 0x0f00) >> 8;
@@ -387,7 +419,7 @@ namespace crow
387419
if (is_FIN())
388420
{
389421
if (message_handler_)
390-
message_handler_(*this, message_, is_binary_);
422+
message_handler_(*this, (is_compressed() && decompressor_) ? decompressor_->decompress(message_) : message_, is_binary_);
391423
message_.clear();
392424
}
393425
}
@@ -398,7 +430,7 @@ namespace crow
398430
if (is_FIN())
399431
{
400432
if (message_handler_)
401-
message_handler_(*this, message_, is_binary_);
433+
message_handler_(*this, (is_compressed() && decompressor_) ? decompressor_->decompress(message_) : message_, is_binary_);
402434
message_.clear();
403435
}
404436
}
@@ -410,7 +442,7 @@ namespace crow
410442
if (is_FIN())
411443
{
412444
if (message_handler_)
413-
message_handler_(*this, message_, is_binary_);
445+
message_handler_(*this, (is_compressed() && decompressor_) ? decompressor_->decompress(message_) : message_, is_binary_);
414446
message_.clear();
415447
}
416448
}
@@ -514,6 +546,11 @@ namespace crow
514546
bool pong_received_{false};
515547
bool is_close_handler_called_{false};
516548

549+
bool reset_compressor_on_send_{false};
550+
bool reset_decompressor_on_send_{false};
551+
std::unique_ptr<zlib_compressor> compressor_;
552+
std::unique_ptr<zlib_decompressor> decompressor_;
553+
517554
std::function<void(crow::websocket::connection&)> open_handler_;
518555
std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
519556
std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;

include/crow/zlib.hpp

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#pragma once
2+
#include "crow/settings.h"
3+
#include <boost/asio/streambuf.hpp>
4+
#include <zlib.h>
5+
#include <string>
6+
7+
namespace crow {
8+
class zlib_compressor {
9+
public:
10+
zlib_compressor(bool reset_before_compress, bool noheader, int window_bits, int level, int mem_level, int strategy)
11+
: reset_before_compress(reset_before_compress)
12+
, window_bits(window_bits) {
13+
stream = std::make_unique<z_stream>();
14+
stream->zalloc = 0;
15+
stream->zfree = 0;
16+
stream->opaque = 0;
17+
18+
::deflateInit2(stream.get(),
19+
level,
20+
Z_DEFLATED,
21+
(noheader ? -window_bits : window_bits),
22+
mem_level,
23+
strategy
24+
);
25+
}
26+
27+
~zlib_compressor() {
28+
::deflateEnd(stream.get());
29+
}
30+
31+
std::string compress(const std::string& src) {
32+
if(reset_before_compress)
33+
::deflateReset(stream.get());
34+
35+
stream->next_in = reinterpret_cast<uint8_t*>(const_cast<char*>(src.c_str()));
36+
stream->avail_in = src.size();
37+
38+
const uint64_t bufferSize = 256;
39+
boost::asio::streambuf buffer;
40+
do {
41+
boost::asio::streambuf::mutable_buffers_type chunk = buffer.prepare(bufferSize);
42+
43+
uint8_t* next_out = boost::asio::buffer_cast<uint8_t*>(chunk);
44+
45+
stream->next_out = next_out;
46+
stream->avail_out = bufferSize;
47+
48+
::deflate(stream.get(), reset_before_compress ? Z_FINISH : Z_SYNC_FLUSH);
49+
50+
uint64_t outputSize = stream->next_out - next_out;
51+
buffer.commit(outputSize);
52+
} while(stream->avail_out == 0);
53+
54+
uint64_t buffer_size = buffer.size();
55+
if(reset_before_compress) buffer_size -= 4;
56+
57+
return std::string(boost::asio::buffer_cast<const char*>(buffer.data()), buffer_size);
58+
}
59+
60+
std::unique_ptr<z_stream> stream;
61+
62+
bool reset_before_compress;
63+
int window_bits;
64+
};
65+
66+
class zlib_decompressor {
67+
public:
68+
zlib_decompressor(bool reset_before_decompress, bool noheader, int window_bits)
69+
: reset_before_decompress(reset_before_decompress)
70+
, window_bits(window_bits) {
71+
stream = std::make_unique<z_stream>();
72+
stream->zalloc = 0;
73+
stream->zfree = 0;
74+
stream->opaque = 0;
75+
76+
::inflateInit2(stream.get(), (noheader ? -window_bits : window_bits));
77+
}
78+
79+
~zlib_decompressor() {
80+
inflateEnd(stream.get());
81+
}
82+
83+
std::string decompress(std::string src) {
84+
if(reset_before_decompress)
85+
inflateReset(stream.get());
86+
87+
src.push_back('\x00');
88+
src.push_back('\x00');
89+
src.push_back('\xff');
90+
src.push_back('\xff');
91+
92+
stream->next_in = reinterpret_cast<uint8_t*>(const_cast<char*>(src.c_str()));
93+
stream->avail_in = src.size();
94+
95+
const uint64_t bufferSize = 256;
96+
boost::asio::streambuf buffer;
97+
do {
98+
boost::asio::streambuf::mutable_buffers_type chunk = buffer.prepare(bufferSize);
99+
100+
uint8_t* next_out = boost::asio::buffer_cast<uint8_t*>(chunk);
101+
102+
stream->next_out = next_out;
103+
stream->avail_out = bufferSize;
104+
105+
int ret = ::inflate(stream.get(), reset_before_decompress ? Z_FINISH : Z_SYNC_FLUSH);
106+
buffer.commit(stream->next_out - next_out);
107+
} while(stream->avail_out == 0);
108+
109+
return std::string(boost::asio::buffer_cast<const char*>(buffer.data()), buffer.size());
110+
}
111+
112+
std::unique_ptr<z_stream> stream;
113+
114+
bool reset_before_decompress;
115+
int window_bits;
116+
};
117+
}

0 commit comments

Comments
 (0)