Skip to content

Commit

Permalink
DO NOT MERGE: Basically fix "Unexpected EOF" log errors
Browse files Browse the repository at this point in the history
But everything is in the wrong place.
Not particularly practical to test these locally.

In principle, fixes:

CORE-8134
CORE-8059
CORE-7883
CORE-7324
CORE-6895
  • Loading branch information
oleiman committed Nov 13, 2024
1 parent 0ef6b9b commit 2d606eb
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/v/kafka/server/protocol_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ parse_v1_header(ss::input_stream<char>& src) {

if (unlikely(client_id_size < 0)) {
// header parsing error, force connection shutdown
throw std::runtime_error(
throw malformed_header_exception(
fmt::format("Invalid client_id size {}", client_id_size));
}

Expand Down
6 changes: 6 additions & 0 deletions src/v/kafka/server/protocol_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ ss::future<std::optional<request_header>> parse_header(ss::input_stream<char>&);

ss::scattered_message<char> response_as_scattered(response_ptr response);

class malformed_header_exception : public std::runtime_error {
public:
explicit malformed_header_exception(const std::string& m)
: std::runtime_error(m) {}
};

} // namespace kafka
1 change: 1 addition & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "kafka/server/handlers/sasl_handshake.h"
#include "kafka/server/handlers/sync_group.h"
#include "kafka/server/logger.h"
#include "kafka/server/protocol_utils.h"
#include "kafka/server/quota_manager.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
Expand Down
11 changes: 11 additions & 0 deletions src/v/net/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "net/connection.h"

#include "base/seastarx.h"
#include "kafka/server/protocol_utils.h"
#include "net/exceptions.h"
#include "net/types.h"
#include "ssx/abort_source.h"
Expand Down Expand Up @@ -130,6 +131,16 @@ bool is_auth_error(std::exception_ptr e) {
__builtin_unreachable();
}

bool is_message_error(std::exception_ptr e) {
try {
std::rethrow_exception(e);
} catch (const kafka::malformed_header_exception& e) {
return true;
} catch (...) {
return false;
}
}

connection::connection(
boost::intrusive::list<connection>& hook,
ss::sstring name,
Expand Down
1 change: 1 addition & 0 deletions src/v/net/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ bool is_reconnect_error(const std::system_error& e);
std::optional<ss::sstring> is_disconnect_exception(std::exception_ptr);

bool is_auth_error(std::exception_ptr);
bool is_message_error(std::exception_ptr);

class connection : public boost::intrusive::list_base_hook<> {
public:
Expand Down

0 comments on commit 2d606eb

Please sign in to comment.