Skip to content

Commit

Permalink
Adds support for compressing websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
Francisco Facioni committed Nov 15, 2018
1 parent 49edf89 commit 5234ebf
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 8 deletions.
3 changes: 3 additions & 0 deletions include/crow/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
/* #ifdef - enables ssl */
//#define CROW_ENABLE_SSL

/* #ifdef - enables websocket compression */
//#define CROW_ENABLE_WEBSOCKET_COMPRESSION

/* #define - specifies log level */
/*
Debug = 0
Expand Down
53 changes: 45 additions & 8 deletions include/crow/websocket.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#pragma once
#include <boost/algorithm/string/predicate.hpp>
#include <boost/array.hpp>
#include <boost/algorithm/string/split.hpp>
#include "crow/socket_adaptors.h"
#include "crow/http_request.h"
#include "crow/TinySHA1.hpp"
#include "crow/zlib.hpp"

namespace crow
{
Expand Down Expand Up @@ -61,7 +63,18 @@ namespace crow
return;
}
}

#ifdef CROW_ENABLE_WEBSOCKET_COMPRESSION
std::string extensionsHeader = req.get_header_value("Sec-WebSocket-Extensions");
std::vector<std::string> extensions;
boost::split(extensions, extensionsHeader, boost::is_any_of(";"));
if (std::find(extensions.begin(), extensions.end(), "permessage-deflate") != extensions.end())
{
bool reset_compressor_on_send_ = std::find(extensions.begin(), extensions.end(), "server_no_context_takeover") != extensions.end();
compressor_.reset(new zlib_compressor(reset_compressor_on_send_, true, 15, Z_BEST_COMPRESSION, 8, Z_DEFAULT_STRATEGY));
bool reset_decompressor_on_send_ = std::find(extensions.begin(), extensions.end(), "client_no_context_takeover") != extensions.end();
decompressor_.reset(new zlib_decompressor(reset_decompressor_on_send_, true, 15));
}
#endif
// Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
// Sec-WebSocket-Version: 13
std::string magic = req.get_header_value("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
Expand Down Expand Up @@ -98,19 +111,23 @@ namespace crow
void send_binary(const std::string& msg) override
{
dispatch([this, msg]{
auto header = build_header(2, msg.size());
std::string msg_ = compressor_ ? compressor_->compress(msg) : msg;
auto header = build_header(2, msg_.size());
if (compressor_) header[0] += 0x40;
write_buffers_.emplace_back(std::move(header));
write_buffers_.emplace_back(msg);
write_buffers_.emplace_back(msg_);
do_write();
});
}

void send_text(const std::string& msg) override
{
dispatch([this, msg]{
auto header = build_header(1, msg.size());
std::string msg_ = compressor_ ? compressor_->compress(msg) : msg;
auto header = build_header(1, msg_.size());
if (compressor_) header[0] += 0x40;
write_buffers_.emplace_back(std::move(header));
write_buffers_.emplace_back(msg);
write_buffers_.emplace_back(msg_);
do_write();
});
}
Expand Down Expand Up @@ -167,6 +184,16 @@ namespace crow
write_buffers_.emplace_back(header);
write_buffers_.emplace_back(std::move(hello));
write_buffers_.emplace_back(crlf);
if (compressor_ && decompressor_) {
write_buffers_.emplace_back(
"Sec-WebSocket-Extensions: permessage-deflate"
"; server_max_window_bits=" + std::to_string(decompressor_->window_bits) +
"; client_max_window_bits=" + std::to_string(compressor_->window_bits) +
(compressor_->reset_before_compress ? "; server_no_context_takeover": "") +
(decompressor_->reset_before_decompress ? "; client_no_context_takeover": "")
);
write_buffers_.emplace_back(crlf);
}
write_buffers_.emplace_back(crlf);
do_write();
if (open_handler_)
Expand Down Expand Up @@ -368,6 +395,11 @@ namespace crow
return mini_header_ & 0x8000;
}

bool is_compressed()
{
return mini_header_ & 0x4000;
}

int opcode()
{
return (mini_header_ & 0x0f00) >> 8;
Expand All @@ -387,7 +419,7 @@ namespace crow
if (is_FIN())
{
if (message_handler_)
message_handler_(*this, message_, is_binary_);
message_handler_(*this, (is_compressed() && decompressor_) ? decompressor_->decompress(message_) : message_, is_binary_);
message_.clear();
}
}
Expand All @@ -398,7 +430,7 @@ namespace crow
if (is_FIN())
{
if (message_handler_)
message_handler_(*this, message_, is_binary_);
message_handler_(*this, (is_compressed() && decompressor_) ? decompressor_->decompress(message_) : message_, is_binary_);
message_.clear();
}
}
Expand All @@ -410,7 +442,7 @@ namespace crow
if (is_FIN())
{
if (message_handler_)
message_handler_(*this, message_, is_binary_);
message_handler_(*this, (is_compressed() && decompressor_) ? decompressor_->decompress(message_) : message_, is_binary_);
message_.clear();
}
}
Expand Down Expand Up @@ -514,6 +546,11 @@ namespace crow
bool pong_received_{false};
bool is_close_handler_called_{false};

bool reset_compressor_on_send_{false};
bool reset_decompressor_on_send_{false};
std::unique_ptr<zlib_compressor> compressor_;
std::unique_ptr<zlib_decompressor> decompressor_;

std::function<void(crow::websocket::connection&)> open_handler_;
std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler_;
std::function<void(crow::websocket::connection&, const std::string&)> close_handler_;
Expand Down
117 changes: 117 additions & 0 deletions include/crow/zlib.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#pragma once
#include "crow/settings.h"
#include <boost/asio/streambuf.hpp>
#include <zlib.h>
#include <string>

namespace crow {
class zlib_compressor {
public:
zlib_compressor(bool reset_before_compress, bool noheader, int window_bits, int level, int mem_level, int strategy)
: reset_before_compress(reset_before_compress)
, window_bits(window_bits) {
stream = std::make_unique<z_stream>();
stream->zalloc = 0;
stream->zfree = 0;
stream->opaque = 0;

::deflateInit2(stream.get(),
level,
Z_DEFLATED,
(noheader ? -window_bits : window_bits),
mem_level,
strategy
);
}

~zlib_compressor() {
::deflateEnd(stream.get());
}

std::string compress(const std::string& src) {
if(reset_before_compress)
::deflateReset(stream.get());

stream->next_in = reinterpret_cast<uint8_t*>(const_cast<char*>(src.c_str()));
stream->avail_in = src.size();

const uint64_t bufferSize = 256;
boost::asio::streambuf buffer;
do {
boost::asio::streambuf::mutable_buffers_type chunk = buffer.prepare(bufferSize);

uint8_t* next_out = boost::asio::buffer_cast<uint8_t*>(chunk);

stream->next_out = next_out;
stream->avail_out = bufferSize;

::deflate(stream.get(), reset_before_compress ? Z_FINISH : Z_SYNC_FLUSH);

uint64_t outputSize = stream->next_out - next_out;
buffer.commit(outputSize);
} while(stream->avail_out == 0);

uint64_t buffer_size = buffer.size();
if(!reset_before_compress) buffer_size -= 4;

return std::string(boost::asio::buffer_cast<const char*>(buffer.data()), buffer_size);
}

std::unique_ptr<z_stream> stream;

bool reset_before_compress;
int window_bits;
};

class zlib_decompressor {
public:
zlib_decompressor(bool reset_before_decompress, bool noheader, int window_bits)
: reset_before_decompress(reset_before_decompress)
, window_bits(window_bits) {
stream = std::make_unique<z_stream>();
stream->zalloc = 0;
stream->zfree = 0;
stream->opaque = 0;

::inflateInit2(stream.get(), (noheader ? -window_bits : window_bits));
}

~zlib_decompressor() {
inflateEnd(stream.get());
}

std::string decompress(std::string src) {
if(reset_before_decompress)
inflateReset(stream.get());

src.push_back('\x00');
src.push_back('\x00');
src.push_back('\xff');
src.push_back('\xff');

stream->next_in = reinterpret_cast<uint8_t*>(const_cast<char*>(src.c_str()));
stream->avail_in = src.size();

const uint64_t bufferSize = 256;
boost::asio::streambuf buffer;
do {
boost::asio::streambuf::mutable_buffers_type chunk = buffer.prepare(bufferSize);

uint8_t* next_out = boost::asio::buffer_cast<uint8_t*>(chunk);

stream->next_out = next_out;
stream->avail_out = bufferSize;

int ret = ::inflate(stream.get(), reset_before_decompress ? Z_FINISH : Z_SYNC_FLUSH);
buffer.commit(stream->next_out - next_out);
} while(stream->avail_out == 0);

return std::string(boost::asio::buffer_cast<const char*>(buffer.data()), buffer.size());
}

std::unique_ptr<z_stream> stream;

bool reset_before_decompress;
int window_bits;
};
}

0 comments on commit 5234ebf

Please sign in to comment.