Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE: Basically fix "Unexpected EOF" log errors #24110

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/kafka/server/protocol_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ parse_v1_header(ss::input_stream<char>& src) {

if (unlikely(client_id_size < 0)) {
// header parsing error, force connection shutdown
throw std::runtime_error(
throw malformed_header_exception(
fmt::format("Invalid client_id size {}", client_id_size));
}

Expand Down
6 changes: 6 additions & 0 deletions src/v/kafka/server/protocol_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ ss::future<std::optional<request_header>> parse_header(ss::input_stream<char>&);

ss::scattered_message<char> response_as_scattered(response_ptr response);

class malformed_header_exception : public std::runtime_error {
public:
explicit malformed_header_exception(const std::string& m)
: std::runtime_error(m) {}
};

} // namespace kafka
11 changes: 11 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "kafka/server/handlers/sasl_handshake.h"
#include "kafka/server/handlers/sync_group.h"
#include "kafka/server/logger.h"
#include "kafka/server/protocol_utils.h"
#include "kafka/server/quota_manager.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
Expand Down Expand Up @@ -427,6 +428,16 @@ ss::future<> server::apply(ss::lw_shared_ptr<net::connection> conn) {
}
}

bool server::is_message_error(std::exception_ptr e) const {
try {
std::rethrow_exception(e);
} catch (const kafka::malformed_header_exception& e) {
return true;
} catch (...) {
return false;
}
}

template<>
ss::future<response_ptr> heartbeat_handler::handle(
request_context ctx, [[maybe_unused]] ss::smp_service_group g) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class server final
// until the end of the server (container/parent)
ss::future<> apply(ss::lw_shared_ptr<net::connection>) final;

bool is_message_error(std::exception_ptr) const final;

ss::smp_service_group smp_group() const { return _smp_group; }

/**
Expand Down
13 changes: 10 additions & 3 deletions src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,17 @@ void server::print_exceptional_future(
ctx,
address,
ex);
} else if (is_message_error(ex)) {
vlog(
_log.warn,
"Message Error[{}] remote address: {} - {}",
ctx,
address,
ex);
} else {
// Authentication exceptions are logged at WARN, not ERROR, because
// they generally point to a misbehaving client rather than a fault
// in the server.
// Authentication and Message exceptions are logged at WARN, not
// ERROR, because they generally point to a misbehaving client
// rather than a fault in the server.
vlog(
_log.error,
"Error[{}] remote address: {} - {}",
Expand Down
9 changes: 9 additions & 0 deletions src/v/net/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ class server {
ss::abort_source& abort_source() { return _as; }
bool abort_requested() const { return _as.abort_requested(); }

/**
Called by print_exceptional_future.
When this function returns 'true', the exception will be logged at WARN
level rather than the usual ERROR. This provides a customization point
for derived classes to avoid producing error logs for certain exception
types which may be application specific.
*/
virtual bool is_message_error(std::exception_ptr) const { return false; };

private:
struct listener {
ss::sstring name;
Expand Down