Skip to content

Commit

Permalink
Merge tag '2.16.2'
Browse files Browse the repository at this point in the history
version 2.16.2
  • Loading branch information
jul-stas committed Feb 9, 2022
2 parents 2f746d5 + 90df2c9 commit 4bf44c7
Show file tree
Hide file tree
Showing 27 changed files with 455 additions and 314 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
2.16.2
===========

Bug Fixes
--------
* [CPP-946] Core dump on unclean event loop shutdown
* [PR #513] Fix SNI events
* [PR #518] Replace deprecated function for OpenSSL >= 3.0

2.16.1
===========

Expand Down
2 changes: 1 addition & 1 deletion include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

#define CASS_VERSION_MAJOR 2
#define CASS_VERSION_MINOR 16
#define CASS_VERSION_PATCH 1
#define CASS_VERSION_PATCH 2
#define CASS_VERSION_SUFFIX ""

#ifdef __cplusplus
Expand Down
21 changes: 19 additions & 2 deletions src/address_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

using namespace datastax::internal::core;

bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
Address* output) {
bool AddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
Address* output) {
Address connected_address = connected_host->address();
const Value* peer_value = peers_row->get_by_name("peer");
const Value* rpc_value = peers_row->get_by_name("rpc_address");
Expand Down Expand Up @@ -59,6 +59,12 @@ bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connec
return true;
}

bool AddressFactory::is_peer(const Row* peers_row, const Host::Ptr& connected_host,
const Address& expected) {
Address address;
return create(peers_row, connected_host, &address) && address == expected;
}

bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
Address* output) {
CassUuid host_id;
Expand All @@ -78,3 +84,14 @@ bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_
connected_host->address().port(), to_string(host_id));
return true;
}

bool SniAddressFactory::is_peer(const Row* peers_row, const Host::Ptr& connected_host,
const Address& expected) {
const Value* value = peers_row->get_by_name("rpc_address");
Address rpc_address;
if (!value ||
!value->decoder().as_inet(value->size(), connected_host->address().port(), &rpc_address)) {
return false;
}
return rpc_address == expected;
}
15 changes: 6 additions & 9 deletions src/address_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,15 @@ namespace datastax { namespace internal { namespace core {
class Row;

/**
* An interface for constructing `Address` from `system.local`/`system.peers` row data.
* An address factory that creates `Address` using the `rpc_address` column.
*/
class AddressFactory : public RefCounted<AddressFactory> {
public:
typedef SharedRefPtr<AddressFactory> Ptr;
virtual ~AddressFactory() {}
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output) = 0;
};

/**
* An address factory that creates `Address` using the `rpc_address` column.
*/
class DefaultAddressFactory : public AddressFactory {
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output);
virtual bool is_peer(const Row* peers_row, const Host::Ptr& connected_host,
const Address& expected);
};

/**
Expand All @@ -48,13 +43,15 @@ class DefaultAddressFactory : public AddressFactory {
*/
class SniAddressFactory : public AddressFactory {
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output);
virtual bool is_peer(const Row* peers_row, const Host::Ptr& connected_host,
const Address& expected);
};

inline AddressFactory* create_address_factory_from_config(const Config& config) {
if (config.cloud_secure_connection_config().is_loaded()) {
return new SniAddressFactory();
} else {
return new DefaultAddressFactory();
return new AddressFactory();
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,17 @@ LockedHostMap::LockedHostMap(const HostMap& hosts)
LockedHostMap::~LockedHostMap() { uv_mutex_destroy(&mutex_); }

LockedHostMap::const_iterator LockedHostMap::find(const Address& address) const {
return hosts_.find(address);
HostMap::const_iterator it = hosts_.find(address);
if (it == hosts_.end()) {
// If this is from an event (not SNI) and we're using SNI addresses then fallback to using the
// "rpc_address" to compare.
for (HostMap::const_iterator i = hosts_.begin(), end = hosts_.end(); i != end; ++i) {
if (i->second->rpc_address() == address) {
return i;
}
}
}
return it;
}

Host::Ptr LockedHostMap::get(const Address& address) const {
Expand Down Expand Up @@ -470,9 +480,10 @@ void Cluster::on_reconnect(ControlConnector* connector) {

void Cluster::internal_close() {
is_closing_ = true;
bool was_timer_running = timer_.is_running();
timer_.stop();
monitor_reporting_timer_.stop();
if (timer_.is_running()) {
timer_.stop();
if (was_timer_running) {
handle_close();
} else if (reconnector_) {
reconnector_->cancel();
Expand Down Expand Up @@ -648,7 +659,7 @@ void Cluster::notify_host_remove(const Address& address) {
notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host));
}

hosts_.erase(address);
hosts_.erase(it->first);
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
end = load_balancing_policies_.end();
it != end; ++it) {
Expand Down
12 changes: 7 additions & 5 deletions src/collection_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,23 @@ bool CollectionIterator::next() {
}

bool CollectionIterator::decode_value() {
DataType::ConstPtr data_type;
if (collection_->value_type() == CASS_VALUE_TYPE_MAP) {
data_type =
const DataType::ConstPtr& data_type =
(index_ % 2 == 0) ? collection_->primary_data_type() : collection_->secondary_data_type();
value_ = decoder_.decode_value(data_type);
} else {
data_type = collection_->primary_data_type();
value_ = decoder_.decode_value(collection_->primary_data_type());
}

return decoder_.decode_value(data_type, value_, true);
return value_.is_valid();
}

bool TupleIterator::next() {
if (next_ == end_) {
return false;
}
current_ = next_++;
return decoder_.decode_value(*current_, value_);

value_ = decoder_.decode_value(*current_);
return value_.is_valid();
}
17 changes: 8 additions & 9 deletions src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ static NopControlConnectionListener nop_listener__;
ControlConnectionSettings::ControlConnectionSettings()
: use_schema(CASS_DEFAULT_USE_SCHEMA)
, use_token_aware_routing(CASS_DEFAULT_USE_TOKEN_AWARE_ROUTING)
, address_factory(new DefaultAddressFactory()) {}
, address_factory(new AddressFactory()) {}

ControlConnectionSettings::ControlConnectionSettings(const Config& config)
: connection_settings(config)
Expand Down Expand Up @@ -395,17 +395,16 @@ void ControlConnection::handle_refresh_node(RefreshNodeCallback* callback) {
const Row* row = NULL;
ResultIterator rows(callback->result().get());

while (rows.next() && !found_host) {
row = rows.row();
if (callback->is_all_peers) {
Address address;
bool is_valid_address = settings_.address_factory->create(row, connection_->host(), &address);
if (is_valid_address && callback->address == address) {
if (callback->is_all_peers) {
while (!found_host && rows.next()) {
row = rows.row();
if (settings_.address_factory->is_peer(row, connection_->host(), callback->address)) {
found_host = true;
}
} else {
found_host = true;
}
} else if (rows.next()) {
row = rows.row();
found_host = true;
}

if (!found_host) {
Expand Down
44 changes: 25 additions & 19 deletions src/decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,39 @@ bool Decoder::decode_warnings(WarningVec& output) {
return true;
}

bool Decoder::decode_value(const DataType::ConstPtr& data_type, Value& output,
bool is_inside_collection /*= false*/) {
const char* buffer = NULL;
Value Decoder::decode_value(const DataType::ConstPtr& data_type) {
int32_t size = 0;

if (!decode_int32(size)) {
return false;
}
if (!decode_int32(size)) return Value();

if (size >= 0) {
buffer = input_;
Decoder decoder(input_, size, protocol_version_);
input_ += size;
remaining_ -= size;
Decoder decoder(buffer, size, protocol_version_);

if (data_type->is_collection()) {
int32_t count;
if (!decoder.decode_int32(count)) return false;
output = Value(data_type, count, decoder);
} else {
output = Value(data_type, decoder);

int32_t count = 0;
if (!data_type->is_collection()) {
return Value(data_type, decoder);
} else if (decoder.decode_int32(count)) {
return Value(data_type, count, decoder);
}
} else { // null value
output = Value(data_type);
return Value();
}
return Value(data_type);
}

return true;
bool Decoder::update_value(Value& value) {
int32_t size = 0;
if (decode_int32(size)) {
if (size >= 0) {
Decoder decoder(input_, size, protocol_version_);
input_ += size;
remaining_ -= size;
return value.update(decoder);
}
Decoder decoder;
return value.update(decoder);
}
return false;
}

void Decoder::notify_error(const char* detail, size_t bytes) const {
Expand Down
6 changes: 4 additions & 2 deletions src/decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,10 @@ class Decoder {
bool decode_write_type(CassWriteType& output);
bool decode_warnings(WarningVec& output);

bool decode_value(const DataType::ConstPtr& data_type, Value& output,
bool is_inside_collection = false);
Value decode_value(const DataType::ConstPtr& data_type);
bool update_value(Value& value);

bool is_null() const { return input_ == NULL; }

protected:
// Testing only
Expand Down
41 changes: 26 additions & 15 deletions src/host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,6 @@ void Host::set(const Row* row, bool use_tokens) {
if (dse_server_version_ < VersionNumber(6, 7, 0)) {
server_version_ = VersionNumber(3, 11, 0);
}
} else {
LOG_WARN("Invalid DSE version string \"%s\" on host %s", dse_version_str.c_str(),
address().to_string().c_str());
}
}

Expand Down Expand Up @@ -170,6 +167,18 @@ void Host::set(const Row* row, bool use_tokens) {
"If this is incorrect you should configure a specific interface for rpc_address on "
"the server.",
address_string_.c_str());
v = row->get_by_name("listen_address"); // Available in system.local
if (v && !v->is_null()) {
v->decoder().as_inet(v->size(), address_.port(), &rpc_address_);
} else {
v = row->get_by_name("peer"); // Available in system.peers
if (v && !v->is_null()) {
v->decoder().as_inet(v->size(), address_.port(), &rpc_address_);
}
}
if (!rpc_address_.is_valid()) {
LOG_WARN("Unable to set rpc_address from either listen_address or peer");
}
}
} else {
LOG_WARN("No rpc_address for host %s in system.local or system.peers.",
Expand Down Expand Up @@ -213,30 +222,32 @@ void Host::close_unpooled_connections(uv_loop_t *loop) {
}
}

static CassInet to_inet(const Host::Ptr& host) {
CassInet address;
if (host->address().is_resolved()) {
address.address_length = host->address().to_inet(address.address);
} else {
address.address_length = host->rpc_address().to_inet(&address.address);
}
return address;
}

ExternalHostListener::ExternalHostListener(const CassHostListenerCallback callback, void* data)
: callback_(callback)
, data_(data) {}

void ExternalHostListener::on_host_up(const Host::Ptr& host) {
CassInet address;
address.address_length = host->address().to_inet(address.address);
callback_(CASS_HOST_LISTENER_EVENT_UP, address, data_);
callback_(CASS_HOST_LISTENER_EVENT_UP, to_inet(host), data_);
}

void ExternalHostListener::on_host_down(const Host::Ptr& host) {
CassInet address;
address.address_length = host->address().to_inet(address.address);
callback_(CASS_HOST_LISTENER_EVENT_DOWN, address, data_);
callback_(CASS_HOST_LISTENER_EVENT_DOWN, to_inet(host), data_);
}

void ExternalHostListener::on_host_added(const Host::Ptr& host) {
CassInet address;
address.address_length = host->address().to_inet(address.address);
callback_(CASS_HOST_LISTENER_EVENT_ADD, address, data_);
callback_(CASS_HOST_LISTENER_EVENT_ADD, to_inet(host), data_);
}

void ExternalHostListener::on_host_removed(const Host::Ptr& host) {
CassInet address;
address.address_length = host->address().to_inet(address.address);
callback_(CASS_HOST_LISTENER_EVENT_REMOVE, address, data_);
callback_(CASS_HOST_LISTENER_EVENT_REMOVE, to_inet(host), data_);
}
8 changes: 6 additions & 2 deletions src/map_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
using namespace datastax::internal::core;

bool MapIterator::decode_pair() {
if (!decoder_.decode_value(map_->primary_data_type(), key_, true)) return false;
return decoder_.decode_value(map_->secondary_data_type(), value_, true);
key_ = decoder_.decode_value(map_->primary_data_type());
if (key_.data_type()) {
value_ = decoder_.decode_value(map_->secondary_data_type());
return value_.is_valid();
}
return false;
}

bool MapIterator::next() {
Expand Down
13 changes: 13 additions & 0 deletions src/ref_counted.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ class SharedRefPtr {
return *this;
}

#if defined(__cpp_rvalue_references)
SharedRefPtr<T>& operator=(SharedRefPtr<T>&& ref) noexcept {
if (ptr_ != NULL) {
ptr_->dec_ref();
}
ptr_ = ref.ptr_;
ref.ptr_ = NULL;
return *this;
}

SharedRefPtr(SharedRefPtr<T>&& ref) noexcept : ptr_(ref.ptr_) { ref.ptr_ = NULL; }
#endif

~SharedRefPtr() {
if (ptr_ != NULL) {
ptr_->dec_ref();
Expand Down
2 changes: 1 addition & 1 deletion src/request_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ RequestProcessorSettings::RequestProcessorSettings()
, max_tracing_wait_time_ms(CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS)
, retry_tracing_wait_time_ms(CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS)
, tracing_consistency(CASS_DEFAULT_TRACING_CONSISTENCY)
, address_factory(new DefaultAddressFactory()) {
, address_factory(new AddressFactory()) {
profiles.set_empty_key("");
}

Expand Down
Loading

0 comments on commit 4bf44c7

Please sign in to comment.