From 2d606eb2c5ccca03205637b49f60079251cada97 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 12 Nov 2024 19:20:21 -0800 Subject: [PATCH] DO NOT MERGE: Basically fix "Unexpected EOF" log errors But everything is in the wrong place. Not particularly practical to test these locally. In principle, fixes: CORE-8134 CORE-8059 CORE-7883 CORE-7324 CORE-6895 --- src/v/kafka/server/protocol_utils.cc | 2 +- src/v/kafka/server/protocol_utils.h | 6 ++++++ src/v/kafka/server/server.cc | 1 + src/v/net/connection.cc | 11 +++++++++++ src/v/net/connection.h | 1 + 5 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/server/protocol_utils.cc b/src/v/kafka/server/protocol_utils.cc index a35b9ab4e680..913a8bb37f2c 100644 --- a/src/v/kafka/server/protocol_utils.cc +++ b/src/v/kafka/server/protocol_utils.cc @@ -56,7 +56,7 @@ parse_v1_header(ss::input_stream& src) { if (unlikely(client_id_size < 0)) { // header parsing error, force connection shutdown - throw std::runtime_error( + throw malformed_header_exception( fmt::format("Invalid client_id size {}", client_id_size)); } diff --git a/src/v/kafka/server/protocol_utils.h b/src/v/kafka/server/protocol_utils.h index 5ff17862a487..920f6bcbf2d7 100644 --- a/src/v/kafka/server/protocol_utils.h +++ b/src/v/kafka/server/protocol_utils.h @@ -26,4 +26,10 @@ ss::future> parse_header(ss::input_stream&); ss::scattered_message response_as_scattered(response_ptr response); +class malformed_header_exception : public std::runtime_error { +public: + explicit malformed_header_exception(const std::string& m) + : std::runtime_error(m) {} +}; + } // namespace kafka diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index 48d52947cdda..94a0dcee3b9d 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -49,6 +49,7 @@ #include "kafka/server/handlers/sasl_handshake.h" #include "kafka/server/handlers/sync_group.h" #include "kafka/server/logger.h" +#include "kafka/server/protocol_utils.h" #include "kafka/server/quota_manager.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" diff --git a/src/v/net/connection.cc b/src/v/net/connection.cc index 945aaa72eb10..69a932c6f11b 100644 --- a/src/v/net/connection.cc +++ b/src/v/net/connection.cc @@ -10,6 +10,7 @@ #include "net/connection.h" #include "base/seastarx.h" +#include "kafka/server/protocol_utils.h" #include "net/exceptions.h" #include "net/types.h" #include "ssx/abort_source.h" @@ -130,6 +131,16 @@ bool is_auth_error(std::exception_ptr e) { __builtin_unreachable(); } +bool is_message_error(std::exception_ptr e) { + try { + std::rethrow_exception(e); + } catch (const kafka::malformed_header_exception& e) { + return true; + } catch (...) { + return false; + } +} + connection::connection( boost::intrusive::list& hook, ss::sstring name, diff --git a/src/v/net/connection.h b/src/v/net/connection.h index 6979b6cd308e..d1e3a6624215 100644 --- a/src/v/net/connection.h +++ b/src/v/net/connection.h @@ -33,6 +33,7 @@ bool is_reconnect_error(const std::system_error& e); std::optional is_disconnect_exception(std::exception_ptr); bool is_auth_error(std::exception_ptr); +bool is_message_error(std::exception_ptr); class connection : public boost::intrusive::list_base_hook<> { public: