1
1
#include " Scadup.h"
2
2
3
+ #define LOG_TAG " Broker"
4
+ #include " ../utils/logging.h"
5
+
3
6
#define XMK (x ) #x
4
7
#define GET (x ) XMK(x)
5
8
@@ -16,6 +19,30 @@ void signalCatch(int value)
16
19
LOGI (" caught signal: %d" , value);
17
20
}
18
21
22
+ bool Scadup::makeSocket (SOCKET& socket)
23
+ {
24
+ #ifdef _WIN32
25
+ WSADATA wsaData;
26
+ WORD version = MAKEWORD (2 , 2 );
27
+ int wsResult = WSAStartup (version, &wsaData);
28
+ if (wsResult != 0 ) {
29
+ LOGE (" WSAStartup fail: %s!" , strerror (errno));
30
+ return false ;
31
+ }
32
+ #endif
33
+ bool status = true ;
34
+ socket = ::socket (AF_INET, SOCK_STREAM, 0 );
35
+ if (socket <= 0 ) {
36
+ LOGE (" Generating socket fail(%s)." ,
37
+ (errno != 0 ? strerror (errno) : std::to_string (socket).c_str ()));
38
+ #ifdef _WIN32
39
+ WSACleanup ();
40
+ #endif
41
+ status = false ;
42
+ }
43
+ return status;
44
+ }
45
+
19
46
ssize_t Scadup::writes (SOCKET socket, const uint8_t * data, size_t len)
20
47
{
21
48
if (data == nullptr || len == 0 )
@@ -24,7 +51,7 @@ ssize_t Scadup::writes(SOCKET socket, const uint8_t* data, size_t len)
24
51
return -1 ;
25
52
std::mutex mtxLck = {};
26
53
std::lock_guard<std::mutex> lock (mtxLck);
27
- ssize_t left = (ssize_t )len;
54
+ auto left = (ssize_t )len;
28
55
auto * buff = new (std::nothrow) uint8_t [left];
29
56
if (buff == nullptr ) {
30
57
LOGE (" Socket buffer malloc size %zu failed!" , left);
@@ -57,31 +84,30 @@ ssize_t Scadup::writes(SOCKET socket, const uint8_t* data, size_t len)
57
84
58
85
int Scadup::connect (const char * ip, unsigned short port, unsigned int total)
59
86
{
60
- SOCKET socket = ::socket (AF_INET, SOCK_STREAM, 0 );
61
- if (socket < 0 ) {
62
- LOGE (" Generating socket to connect(%s)." ,
63
- (errno != 0 ? strerror (errno) : std::to_string (socket).c_str ()));
87
+ SOCKET sock = -1 ;
88
+ if (!makeSocket (sock)) {
89
+ LOGE (" Connect to make socket fail!" );
64
90
return -1 ;
65
91
}
66
92
sockaddr_in local{};
67
93
local.sin_family = AF_INET;
68
94
local.sin_port = htons (port);
69
95
local.sin_addr .s_addr = inet_addr (ip);
70
- int flag = 1 ;
71
- setsockopt (socket , SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (char ));
96
+ char flag = 1 ;
97
+ setsockopt (sock , SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (char ));
72
98
LOGI (" ------ connecting to %s:%d ------" , ip, port);
73
99
unsigned int tries = 0 ;
74
- while (::connect (socket , reinterpret_cast <struct sockaddr *>(&local), sizeof (local)) == (-1 )) {
100
+ while (::connect (sock , reinterpret_cast <struct sockaddr *>(&local), sizeof (local)) == (-1 )) {
75
101
if (tries < total) {
76
102
wait (Time100ms * (long )pow (2 , tries));
77
103
tries++;
78
104
} else {
79
105
LOGE (" Retrying to connect (times=%d, %s)." , tries, (errno != 0 ? strerror (errno) : " No error" ));
80
- close (socket );
106
+ Close (sock );
81
107
return -2 ;
82
108
}
83
109
}
84
- return socket ;
110
+ return sock ;
85
111
}
86
112
87
113
SOCKET Scadup::socket2Broker (const char * ip, unsigned short port, uint64_t & ssid, uint32_t timeout)
@@ -92,19 +118,19 @@ SOCKET Scadup::socket2Broker(const char* ip, unsigned short port, uint64_t& ssid
92
118
return -1 ;
93
119
}
94
120
Header head{};
95
- ssize_t size = ::recv (socket, &head, sizeof (head), 0 );
121
+ ssize_t size = ::recv (socket, reinterpret_cast < char *>( &head) , sizeof (head), 0 );
96
122
if (size > 0 ) {
97
123
if (head.size == sizeof (head) && head.flag == BROKER)
98
124
ssid = head.ssid ;
99
125
else
100
126
LOGW (" Mismatch flag %s, size %u." , GET_FLAG (head.flag ), head.size );
101
127
} else {
102
128
if (size == 0 ) {
103
- LOGE (" Connection closed by peer, close %d" , socket);
129
+ LOGE (" Connection closed by peer, close %d: %s " , socket, strerror (errno) );
104
130
} else {
105
131
LOGE (" Recv fail(%ld), close %d: %s" , size, socket, strerror (errno));
106
132
}
107
- close (socket);
133
+ Close (socket);
108
134
return -3 ;
109
135
}
110
136
return socket;
@@ -120,10 +146,9 @@ int Broker::setup(unsigned short port)
120
146
{
121
147
signal (SIGPIPE, signalCatch);
122
148
123
- SOCKET sock = ::socket (AF_INET, SOCK_STREAM, 0 );
124
- if (sock < 0 ) {
125
- LOGE (" Generating socket to setup(%s)." ,
126
- (errno != 0 ? strerror (errno) : std::to_string (sock).c_str ()));
149
+ SOCKET sock = -1 ;
150
+ if (!makeSocket (sock)) {
151
+ LOGE (" Setup to make socket fail!" );
127
152
return -1 ;
128
153
}
129
154
@@ -134,15 +159,15 @@ int Broker::setup(unsigned short port)
134
159
if (::bind (sock, reinterpret_cast <struct sockaddr *>(&local), sizeof (local)) < 0 ) {
135
160
LOGE (" Binding socket (%s)." ,
136
161
(errno != 0 ? strerror (errno) : std::to_string (sock).c_str ()));
137
- close (sock);
162
+ Close (sock);
138
163
return -2 ;
139
164
}
140
165
141
166
const int backlog = 50 ;
142
167
if (listen (sock, backlog) < 0 ) {
143
168
LOGE (" listening socket (%s)." ,
144
169
(errno != 0 ? strerror (errno) : std::to_string (sock).c_str ()));
145
- close (sock);
170
+ Close (sock);
146
171
return -3 ;
147
172
}
148
173
@@ -158,7 +183,7 @@ int Broker::setup(unsigned short port)
158
183
check.detach ();
159
184
}
160
185
161
- socklen_t size = static_cast <socklen_t >(sizeof (local));
186
+ auto size = static_cast <socklen_t >(sizeof (local));
162
187
getsockname (sock, reinterpret_cast <struct sockaddr *>(&local), &size);
163
188
LOGI (" listens localhost [%s:%d]." , inet_ntoa (local.sin_addr ), port);
164
189
@@ -193,7 +218,7 @@ void Broker::taskAllot(Networks& works, const Network& work)
193
218
{
194
219
if (work.head .flag == PUBLISHER) {
195
220
Header head{};
196
- ssize_t len = recv (work.socket , &head, sizeof (head), MSG_WAITALL);
221
+ ssize_t len = recv (work.socket , reinterpret_cast < char *>( &head) , sizeof (head), MSG_WAITALL);
197
222
if (len == 0 || (len < 0 && errno == EPIPE)) {
198
223
setOffline (works, work.socket );
199
224
LOGW (" Socket lost/closing by itself!" );
@@ -212,7 +237,7 @@ void Broker::taskAllot(Networks& works, const Network& work)
212
237
LOGI (" start heart beat task" );
213
238
while (m_active) {
214
239
Header head{};
215
- ssize_t len = ::recv (socket, &head, HEAD_SIZE, 0 );
240
+ ssize_t len = ::recv (socket, reinterpret_cast < char *>( &head) , HEAD_SIZE, 0 );
216
241
if (len == 0 || (len < 0 && errno == EPIPE) || (len > 0 && head.cmd == 0xff )) {
217
242
setOffline (works, socket);
218
243
LOGW (" Socket %d lost/closing by itself!" , socket);
@@ -239,8 +264,8 @@ int Broker::ProxyTask(Networks& works, const Network& work)
239
264
work.IP , work.PORT , work.head .size );
240
265
const size_t sz1 = sizeof (Message::Payload::status);
241
266
size_t left = work.head .size - HEAD_SIZE;
242
- static Message sval {};
243
- Message * msg = new (sval ) Message;
267
+ static Message mval {};
268
+ auto * msg = new (mval ) Message;
244
269
msg->payload .content = new char [left - sz1];
245
270
memset (msg->payload .content , 0 , left - sz1);
246
271
size_t len = 0 ;
@@ -250,7 +275,7 @@ int Broker::ProxyTask(Networks& works, const Network& work)
250
275
ssize_t got = ::recv (work.socket , reinterpret_cast <char *>(payload + len), size, 0 );
251
276
if (got < 0 ) {
252
277
if (errno != EAGAIN && errno != EWOULDBLOCK) {
253
- LOGE (" Call recv failed: %s" , strerror (errno));
278
+ LOGE (" Call recv(%d) failed: %s" , got , strerror (errno));
254
279
delete[] msg->payload .content ;
255
280
return -1 ;
256
281
}
@@ -272,47 +297,47 @@ int Broker::ProxyTask(Networks& works, const Network& work)
272
297
queue_push (&g_msgQue, msg);
273
298
setOffline (works, work.socket );
274
299
std::vector<Network>& vec = works[SUBSCRIBER];// only SUBSCRIBER should be sent
275
- if (vec.size () <= 0 ) {
300
+ if (vec.empty () ) {
276
301
LOGW (" No subscriber to publish!" );
277
302
}
278
303
void * message = queue_front (&g_msgQue);
279
304
if (message != nullptr ) {
280
- Message msg = *reinterpret_cast <Message*>(message);
281
- if (msg .head .flag != PUBLISHER) {
282
- LOGW (" Message invalid(%d), len=%u!" , msg .head .flag , msg .head .size );
305
+ Message val = *reinterpret_cast <Message*>(message);
306
+ if (val .head .flag != PUBLISHER) {
307
+ LOGW (" Message invalid(%d), len=%u!" , val .head .flag , val .head .size );
283
308
return -1 ;
284
309
}
285
- if (msg .head .size > 0 ) {
310
+ if (val .head .size > 0 ) {
286
311
for (auto & sub : vec) {
287
312
if (sub.head .topic == work.head .topic ) {
288
313
if (sub.active && sub.socket > 0 ) {
289
- size_t left = work.head .size ;
290
- size_t size = HEAD_SIZE + sz1;
291
- char * buff = (char *)&msg ;
314
+ left = work.head .size ;
315
+ size = HEAD_SIZE + sz1;
316
+ char * buff = (char *)&val ;
292
317
do {
293
- size_t len = ::send (sub.socket , buff, size, MSG_NOSIGNAL);
294
- if (len == 0 || (len < 0 && errno == EPIPE)) {
318
+ size_t sz = ::send (sub.socket , buff, size, MSG_NOSIGNAL);
319
+ if (sz == 0 || (sz < 0 && errno == EPIPE)) {
295
320
setOffline (works, sub.socket );
296
- LOGE (" Write to sock[%d], size %u failed!" , sub.socket , msg .head .size );
321
+ LOGE (" Write to sock[%d], size %u failed!" , sub.socket , val .head .size );
297
322
break ;
298
323
} else {
299
- if (len == HEAD_SIZE + sz1) {
324
+ if (sz == HEAD_SIZE + sz1) {
300
325
size = work.head .size - HEAD_SIZE - sz1;
301
- buff = msg .payload .content ;
326
+ buff = val .payload .content ;
302
327
}
303
- left -= len ;
328
+ left -= sz ;
304
329
}
305
330
} while (left > 0 );
306
- LOGI (" writes message to subscriber[%s:%u], size %u!" , sub.IP , sub.PORT , msg .head .size );
331
+ LOGI (" writes message to subscriber[%s:%u], size %u!" , sub.IP , sub.PORT , val .head .size );
307
332
} else {
308
333
LOGW (" No valid subscriber of topic %04x!" , sub.head .topic );
309
334
}
310
335
}
311
336
}
312
- Delete (msg .payload .content )
337
+ Delete (val .payload .content )
313
338
queue_pop (&g_msgQue);
314
339
} else {
315
- LOGW (" Message size(%u) invalid!" , msg .head .size );
340
+ LOGW (" Message size(%u) invalid!" , val .head .size );
316
341
}
317
342
} else {
318
343
LOGW (" MsgQue is null!" );
@@ -329,7 +354,7 @@ void Broker::setOffline(Networks& works, SOCKET socket)
329
354
if (wk.socket == socket) {
330
355
wk.active = false ;
331
356
if (wk.socket > 0 ) {
332
- close (wk.socket );
357
+ Close (wk.socket );
333
358
wk.socket = 0 ;
334
359
}
335
360
LOGI (" client %s:%u will delete later soon." , wk.IP , wk.PORT );
@@ -357,7 +382,7 @@ void Broker::checkAlive(Networks& works, bool* active)
357
382
}
358
383
}
359
384
for (auto it = works.begin (); it != works.end (); ) {
360
- if (it->second .size () == 0 ) {
385
+ if (it->second .empty () ) {
361
386
auto next = std::next (it);
362
387
works.erase (it);
363
388
it = next;
@@ -376,7 +401,7 @@ int Broker::broker()
376
401
while (m_active) {
377
402
FD_SET (m_socket, &fdset);
378
403
timeval timeout = { 3 , 0 };
379
- if (select ((int )(m_socket + 1 ), &fdset, NULL , NULL , &timeout) > 0 ) {
404
+ if (select ((int )(m_socket + 1 ), &fdset, nullptr , nullptr , &timeout) > 0 ) {
380
405
if (FD_ISSET (m_socket, &fdset)) {
381
406
struct sockaddr_in peer { };
382
407
auto socklen = static_cast <socklen_t >(sizeof (peer));
@@ -411,7 +436,7 @@ int Broker::broker()
411
436
continue ;
412
437
}
413
438
memset (&head, 0 , sizeof (head));
414
- ssize_t size = recv (sockNew, &head, sizeof (head), MSG_PEEK);
439
+ ssize_t size = recv (sockNew, reinterpret_cast < char *>( &head) , sizeof (head), MSG_PEEK);
415
440
if (size > 0 && ssid == head.ssid ) {
416
441
work.socket = sockNew;
417
442
work.head = head;
@@ -429,7 +454,7 @@ int Broker::broker()
429
454
} else {
430
455
if (0 == size || errno == EINVAL || (size < 0 && errno != EAGAIN)) {
431
456
LOGE (" Recv fail(%ld), ssid=%llu, close %d: %s" , size, head.ssid , sockNew, strerror (errno));
432
- close (sockNew);
457
+ Close (sockNew);
433
458
}
434
459
}
435
460
}
@@ -444,7 +469,7 @@ void Broker::exit()
444
469
{
445
470
m_active = false ;
446
471
if (m_socket > 0 ) {
447
- close (m_socket);
472
+ Close (m_socket);
448
473
m_socket = -1 ;
449
474
}
450
475
queue_deinit (&g_msgQue);
0 commit comments