Skip to content

Commit eeeb643

Browse files
authored
[ISSUE #928] [CPP] Fix some cpp client bug and make logs cleaner (#929)
1 parent a60bec6 commit eeeb643

22 files changed

+62
-63
lines changed

.github/workflows/cpp_build.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
# Disable VS 2022 before https://github.com/bazelbuild/bazel/issues/18592 issue is solved
1212
# Remove macos-11 since there is no such runner available
1313
# os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, windows-2022]
14-
os: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2019]
14+
os: [ubuntu-20.04, ubuntu-22.04, windows-2019]
1515
steps:
1616
- uses: actions/checkout@v2
1717
- name: Compile On Linux

cpp/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ bazel-rocketmq-client-cpp
1919
/compile_commands.json
2020
/.cache/
2121
.clangd
22+
build

cpp/examples/ExampleFifoProducer.cpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
#include "rocketmq/FifoProducer.h"
2929
#include "rocketmq/Logger.h"
3030
#include "rocketmq/Message.h"
31-
#include "rocketmq/Producer.h"
3231
#include "rocketmq/SendReceipt.h"
3332

3433
using namespace ROCKETMQ_NAMESPACE;
@@ -93,8 +92,8 @@ std::string randomString(std::string::size_type len) {
9392
return result;
9493
}
9594

96-
DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
97-
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
95+
DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
96+
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
9897
DEFINE_int32(message_body_size, 4096, "Message body size");
9998
DEFINE_uint32(total, 256, "Number of sample messages to publish");
10099
DEFINE_string(access_key, "", "Your access key ID");

cpp/examples/ExampleProducer.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ std::string randomString(std::string::size_type len) {
5151
return result;
5252
}
5353

54-
DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
55-
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
54+
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
55+
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
5656
DEFINE_int32(message_body_size, 4096, "Message body size");
5757
DEFINE_uint32(total, 256, "Number of sample messages to publish");
5858
DEFINE_string(access_key, "", "Your access key ID");

cpp/examples/ExampleProducerWithAsync.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ std::string randomString(std::string::size_type len) {
8989
return result;
9090
}
9191

92-
DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
93-
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
92+
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
93+
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
9494
DEFINE_int32(message_body_size, 4096, "Message body size");
9595
DEFINE_uint32(total, 256, "Number of sample messages to publish");
9696
DEFINE_uint32(concurrency, 128, "Concurrency of async send");

cpp/examples/ExampleProducerWithFifoMessage.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
4848
return result;
4949
}
5050

51-
DEFINE_string(topic, "fifo_topic_sample", "Topic to which messages are published");
52-
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
51+
DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
52+
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
5353
DEFINE_int32(message_body_size, 4096, "Message body size");
5454
DEFINE_uint32(total, 256, "Number of sample messages to publish");
5555
DEFINE_string(access_key, "", "Your access key ID");

cpp/examples/ExampleProducerWithTimedMessage.cpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include <algorithm>
1818
#include <atomic>
1919
#include <chrono>
20-
#include <cstddef>
2120
#include <iostream>
2221
#include <random>
2322
#include <string>
@@ -50,8 +49,8 @@ std::string randomString(std::string::size_type len) {
5049
return result;
5150
}
5251

53-
DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
54-
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
52+
DEFINE_string(topic, "TimerTopic", "Topic to which messages are published");
53+
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
5554
DEFINE_int32(message_body_size, 4096, "Message body size");
5655
DEFINE_uint32(total, 256, "Number of sample messages to publish");
5756
DEFINE_string(access_key, "", "Your access key ID");

cpp/examples/ExampleProducerWithTransactionalMessage.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
4848
return result;
4949
}
5050

51-
DEFINE_string(topic, "tx_topic_sample", "Topic to which messages are published");
52-
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
51+
DEFINE_string(topic, "TransTopic", "Topic to which messages are published");
52+
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
5353
DEFINE_int32(message_body_size, 4096, "Message body size");
5454
DEFINE_uint32(total, 256, "Number of sample messages to publish");
5555
DEFINE_string(access_key, "", "Your access key ID");

cpp/examples/ExamplePushConsumer.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424

2525
using namespace ROCKETMQ_NAMESPACE;
2626

27-
DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
28-
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
29-
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console");
27+
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
28+
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
29+
DEFINE_string(group, "PushConsumer", "GroupId, created through your instance management console");
3030
DEFINE_string(access_key, "", "Your access key ID");
3131
DEFINE_string(access_secret, "", "Your access secret");
3232
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");

cpp/examples/ExampleSimpleConsumer.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323

2424
using namespace ROCKETMQ_NAMESPACE;
2525

26-
DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
27-
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
28-
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console");
26+
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
27+
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
28+
DEFINE_string(group, "SimpleConsumer", "GroupId, created through your instance management console");
2929
DEFINE_string(access_key, "", "Your access key ID");
3030
DEFINE_string(access_secret, "", "Your access secret");
3131
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");

cpp/source/base/include/InvocationContext.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ struct InvocationContext : public BaseInvocationContext {
8181

8282
if (!status.ok() && grpc::StatusCode::DEADLINE_EXCEEDED == status.error_code()) {
8383
auto diff =
84-
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - context.deadline())
85-
.count();
84+
std::chrono::duration_cast<std::chrono::milliseconds>(
85+
std::chrono::system_clock::now() - context.deadline()).count();
8686
SPDLOG_WARN("Asynchronous RPC[{}.{}] timed out, elapsing {}ms, deadline-over-due: {}ms",
8787
absl::FormatTime(created_time, absl::UTCTimeZone()), elapsed, diff);
8888
}

cpp/source/client/ClientManagerImpl.cpp

+6-5
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_s
4848
state_(State::CREATED),
4949
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
5050
with_ssl_(with_ssl) {
51+
5152
certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
5253
tls_channel_credential_options_.set_verify_server_certs(false);
5354
tls_channel_credential_options_.set_check_call_host(false);
@@ -78,7 +79,7 @@ ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_s
7879
*/
7980
channel_arguments_.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
8081

81-
channel_arguments_.SetSslTargetNameOverride("localhost");
82+
// channel_arguments_.SetSslTargetNameOverride("localhost");
8283

8384
SPDLOG_INFO("ClientManager[ResourceNamespace={}] created", resource_namespace_);
8485
}
@@ -282,7 +283,7 @@ bool ClientManagerImpl::send(const std::string& target_host,
282283
SendMessageRequest& request,
283284
SendResultCallback cb) {
284285
assert(cb);
285-
SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString());
286+
SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.ShortDebugString());
286287
RpcClientSharedPtr client = getRpcClient(target_host);
287288
// Invocation context will be deleted in its onComplete() method.
288289
auto invocation_context = new InvocationContext<SendMessageResponse>();
@@ -440,7 +441,7 @@ bool ClientManagerImpl::send(const std::string& target_host,
440441

441442
case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
442443
SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address,
443-
invocation_context->response.DebugString());
444+
invocation_context->response.ShortDebugString());
444445
send_result.ec = ErrorCode::MessagePropertyConflictWithType;
445446
break;
446447
}
@@ -482,7 +483,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_hos
482483
auto search = rpc_clients_.find(target_host);
483484
if (search == rpc_clients_.end() || !search->second->ok()) {
484485
if (search == rpc_clients_.end()) {
485-
SPDLOG_INFO("Create a RPC client to {}", target_host.data());
486+
SPDLOG_INFO("Create a RPC client to [{}]", target_host.data());
486487
} else if (!search->second->ok()) {
487488
SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host);
488489
}
@@ -549,7 +550,7 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host,
549550
std::chrono::milliseconds timeout,
550551
const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) {
551552
SPDLOG_DEBUG("Name server connection URL: {}", target_host);
552-
SPDLOG_DEBUG("Query route request: {}", request.DebugString());
553+
SPDLOG_DEBUG("Query route request: {}", request.ShortDebugString());
553554
RpcClientSharedPtr client = getRpcClient(target_host, false);
554555
if (!client) {
555556
SPDLOG_WARN("Failed to create RPC client for name server[host={}]", target_host);

cpp/source/client/LogInterceptor.cpp

+7-7
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
5252
if (methods->QueryInterceptionHookPoint(grpc::experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
5353
std::multimap<std::string, std::string>* metadata = methods->GetSendInitialMetadata();
5454
if (metadata) {
55-
SPDLOG_DEBUG("[Outbound]Headers of {}: \n{}", client_rpc_info_->method(),
56-
absl::StrJoin(*metadata, "\n", absl::PairFormatter(" --> ")));
55+
SPDLOG_DEBUG("[Outbound]Headers of {}: {}", client_rpc_info_->method(),
56+
absl::StrJoin(*metadata, " ", absl::PairFormatter(" --> ")));
5757
}
5858
}
5959

@@ -73,8 +73,8 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
7373
absl::string_view(it.second.data(), it.second.length())});
7474
}
7575
if (!response_headers.empty()) {
76-
SPDLOG_DEBUG("[Inbound]Response Headers of {}:\n{}", client_rpc_info_->method(),
77-
absl::StrJoin(response_headers, "\n", absl::PairFormatter(" --> ")));
76+
SPDLOG_DEBUG("[Inbound]Response Headers of {}: {}", client_rpc_info_->method(),
77+
absl::StrJoin(response_headers, " ", absl::PairFormatter(" --> ")));
7878
} else {
7979
SPDLOG_DEBUG("[Inbound]Response metadata of {} is empty", client_rpc_info_->method());
8080
}
@@ -85,12 +85,12 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
8585
void* message = methods->GetRecvMessage();
8686
if (message) {
8787
auto* response = reinterpret_cast<google::protobuf::Message*>(message);
88-
std::string&& response_text = response->DebugString();
88+
std::string&& response_text = response->ShortDebugString();
8989
std::size_t limit = 1024;
9090
if (response_text.size() <= limit) {
91-
SPDLOG_DEBUG("[Inbound] {}\n{}", client_rpc_info_->method(), response_text);
91+
SPDLOG_DEBUG("[Inbound] {} {}", client_rpc_info_->method(), response_text);
9292
} else {
93-
SPDLOG_DEBUG("[Inbound] {}\n{}...", client_rpc_info_->method(), response_text.substr(0, limit));
93+
SPDLOG_DEBUG("[Inbound] {} {}...", client_rpc_info_->method(), response_text.substr(0, limit));
9494
}
9595
}
9696
}

cpp/source/client/RpcClientImpl.cpp

-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
#include "RpcClientImpl.h"
1818

19-
#include <chrono>
2019
#include <functional>
2120
#include <sstream>
2221
#include <thread>
@@ -26,7 +25,6 @@
2625
#include "RpcClient.h"
2726
#include "TelemetryBidiReactor.h"
2827
#include "TlsHelper.h"
29-
#include "absl/time/time.h"
3028

3129
ROCKETMQ_NAMESPACE_BEGIN
3230

cpp/source/client/SessionImpl.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ bool SessionImpl::await() {
3434

3535
void SessionImpl::syncSettings() {
3636
auto ptr = client_.lock();
37-
SPDLOG_INFO("Sync client settings to {}", rpc_client_->remoteAddress());
37+
SPDLOG_INFO("Request client settings to {}", rpc_client_->remoteAddress());
3838
TelemetryCommand command;
3939
command.mutable_settings()->CopyFrom(ptr->clientSettings());
4040
telemetry_->write(command);

cpp/source/client/TelemetryBidiReactor.cpp

+10-12
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,12 @@
1616
*/
1717
#include "TelemetryBidiReactor.h"
1818

19-
#include <atomic>
20-
#include <cstdint>
2119
#include <memory>
2220
#include <utility>
2321

2422
#include "ClientManager.h"
2523
#include "MessageExt.h"
2624
#include "Metadata.h"
27-
#include "RpcClient.h"
2825
#include "Signature.h"
2926
#include "google/protobuf/util/time_util.h"
3027
#include "rocketmq/Logger.h"
@@ -70,7 +67,7 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
7067
RemoveHold();
7168

7269
if (!ok) {
73-
SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_);
70+
SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().ShortDebugString(), peer_address_);
7471
signalClose();
7572
return;
7673
}
@@ -91,7 +88,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
9188
if (!ok) {
9289
// for read stream
9390
RemoveHold();
94-
SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
91+
// SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
9592
signalClose();
9693
return;
9794
}
@@ -103,7 +100,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
103100
}
104101
}
105102

106-
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString());
103+
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.ShortDebugString());
107104
auto client = client_.lock();
108105
if (!client) {
109106
SPDLOG_INFO("Client for {} has destructed", peer_address_);
@@ -114,19 +111,20 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
114111
switch (read_.command_case()) {
115112
case rmq::TelemetryCommand::kSettings: {
116113
auto settings = read_.settings();
117-
SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString());
114+
SPDLOG_INFO("Receive settings from {}: {}", peer_address_, settings.ShortDebugString());
118115
applySettings(settings);
119116
sync_settings_promise_.set_value(true);
120117
break;
121118
}
119+
122120
case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
123-
SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString());
124-
auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message());
121+
SPDLOG_INFO("Receive orphan transaction command: {}", read_.ShortDebugString());
122+
auto message = client->manager()->wrapMessage(
123+
read_.recover_orphaned_transaction_command().message());
125124
auto raw = const_cast<Message*>(message.get());
126125
raw->mutableExtension().target_endpoint = peer_address_;
127126
raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id();
128127
client->recoverOrphanedTransaction(message);
129-
130128
break;
131129
}
132130

@@ -156,7 +154,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
156154
}
157155

158156
default: {
159-
SPDLOG_WARN("Unsupported command");
157+
SPDLOG_WARN("Telemetry command receive unsupported command");
160158
break;
161159
}
162160
}
@@ -291,7 +289,7 @@ void TelemetryBidiReactor::tryWriteNext() {
291289
}
292290

293291
if (!writes_.empty()) {
294-
SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString());
292+
SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().ShortDebugString());
295293
AddHold();
296294
StartWrite(&(writes_.front()));
297295
}

cpp/source/client/include/TopicRouteData.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class TopicRouteData {
4343

4444
std::string debugString() const {
4545
return absl::StrJoin(message_queues_.begin(), message_queues_.end(), ",",
46-
[](std::string* out, const rmq::MessageQueue& m) { out->append(m.DebugString()); });
46+
[](std::string* out, const rmq::MessageQueue& m) { out->append(m.ShortDebugString()); });
4747
};
4848

4949
private:

cpp/source/log/LoggerImpl.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ Logger& getLogger() {
131131
const std::size_t LoggerImpl::DEFAULT_MAX_LOG_FILE_QUANTITY = 16;
132132
const std::size_t LoggerImpl::DEFAULT_FILE_SIZE = 1048576 * 256;
133133
const char* LoggerImpl::USER_HOME_ENV = "HOME";
134-
const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] [%^---%L---%$] [thread %t] %v %@";
134+
const char* LoggerImpl::DEFAULT_PATTERN = "%Y-%m-%d %H:%M:%S.%e [%^--%L--%$] [%7t] %v %@";
135+
// const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] [%^---%L---%$] [thread %t] %v %@";
135136

136137
ROCKETMQ_NAMESPACE_END

cpp/source/rocketmq/ClientImpl.cpp

+5-8
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919
#include <algorithm>
2020
#include <atomic>
2121
#include <chrono>
22-
#include <cstdint>
2322
#include <cstdlib>
2423
#include <exception>
2524
#include <functional>
26-
#include <iterator>
2725
#include <memory>
2826
#include <string>
2927
#include <system_error>
@@ -43,9 +41,6 @@
4341
#include "absl/strings/str_split.h"
4442
#include "fmt/format.h"
4543
#include "opencensus/stats/stats.h"
46-
#include "rocketmq/Logger.h"
47-
#include "rocketmq/Message.h"
48-
#include "rocketmq/MessageListener.h"
4944
#include "spdlog/spdlog.h"
5045

5146
ROCKETMQ_NAMESPACE_BEGIN
@@ -175,12 +170,14 @@ void ClientImpl::start() {
175170
auto telemetry_functor = [ptr]() {
176171
std::shared_ptr<ClientImpl> base = ptr.lock();
177172
if (base) {
178-
SPDLOG_INFO("Sync client settings to servers");
173+
SPDLOG_DEBUG("Sync client settings to servers");
179174
base->syncClientSettings();
180175
}
181176
};
182-
telemetry_handle_ = client_manager_->getScheduler()->schedule(telemetry_functor, TELEMETRY_TASK_NAME,
183-
std::chrono::minutes(5), std::chrono::minutes(5));
177+
178+
telemetry_handle_ = client_manager_->getScheduler()->schedule(
179+
telemetry_functor, TELEMETRY_TASK_NAME,
180+
std::chrono::minutes(5), std::chrono::minutes(5));
184181

185182
auto&& metric_service_endpoint = metricServiceEndpoint();
186183
if (!metric_service_endpoint.empty()) {

0 commit comments

Comments
 (0)