Skip to content

Commit be5979d

Browse files
committed
[mod] change files 755 to 644, use threadpool, modify msg_que.
1 parent 3c2f921 commit be5979d

15 files changed

+71
-66
lines changed

CMakeLists.txt

100755100644
File mode changed.

CMakeSettings.json

100755100644
File mode changed.

src/CMakeLists.txt

100755100644
File mode changed.

src/common/Scadup.h

100755100644
+4-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#pragma once
2-
#include <algorithm>
3-
#include <chrono>
4-
#include <cmath>
52
#include <csignal>
63
#include <cstdlib>
74
#include <cstring>
8-
#include <deque>
95
#include <fcntl.h>
6+
#include <chrono>
7+
#include <algorithm>
8+
#include <cmath>
9+
#include <deque>
1010
#include <fstream>
1111
#include <functional>
1212
#include <iostream>

src/scadup/Broker.cpp

100755100644
+5-5
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ int Broker::setup(unsigned short port)
177177
return -3;
178178
}
179179

180-
queue_init(&g_msgQue);
180+
mq_init(&g_msgQue);
181181
m_socket = sock;
182182
m_active = true;
183183

@@ -305,13 +305,13 @@ int Broker::ProxyTask(Networks& works, const Network& work)
305305
}
306306
} while (left > 0);
307307
msg->head = work.head;
308-
queue_push(&g_msgQue, msg);
308+
mq_push(&g_msgQue, msg);
309309
setOffline(works, work.socket);
310310
std::vector<Network>& vec = works[SUBSCRIBER];// only SUBSCRIBER should be sent
311311
if (vec.empty()) {
312312
LOGW("No subscriber to publish!");
313313
}
314-
void* message = queue_front(&g_msgQue);
314+
void* message = mq_front(&g_msgQue);
315315
if (message != nullptr) {
316316
Message val = *reinterpret_cast<Message*>(message);
317317
if (val.head.flag != PUBLISHER) {
@@ -346,7 +346,7 @@ int Broker::ProxyTask(Networks& works, const Network& work)
346346
}
347347
}
348348
Delete(val.payload.content)
349-
queue_pop(&g_msgQue);
349+
mq_pop(&g_msgQue);
350350
} else {
351351
LOGW("Message size(%u) invalid!", val.head.size);
352352
}
@@ -481,5 +481,5 @@ void Broker::exit()
481481
Close(m_socket);
482482
m_socket = -1;
483483
}
484-
queue_deinit(&g_msgQue);
484+
mq_deinit(&g_msgQue);
485485
}

src/scadup/Publisher.cpp

100755100644
+3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ int Publisher::publish(uint32_t topic, const std::string& payload, ...)
5252
msg.head.size = static_cast<unsigned int>(msgLen);
5353
msg.head.topic = topic;
5454
msg.head.flag = PUBLISHER;
55+
msg.payload.status[0] = 'O';
56+
msg.payload.status[1] = 'K';
57+
msg.payload.status[2] = '\0';
5558
memcpy(message, &msg, sizeof(Message));
5659
memcpy(message + HEAD_SIZE + sizeof(Message::Payload::status), payload.c_str(), size);
5760

src/scadup/Subscriber.cpp

100755100644
+10-8
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@
22

33
#define LOG_TAG "Subscriber"
44
#include "../utils/logging.h"
5+
#include "../utils/threadpool.hpp"
6+
#include "../utils/TaskBase.h"
57

68
using namespace Scadup;
79
extern const char* GET_FLAG(G_ScaFlag x);
810

911
bool Subscriber::m_exit = false;
12+
threadpool g_threadpool{};
1013

1114
void Subscriber::setup(const char* ip, unsigned short port)
1215
{
1316
m_socket = socket2Broker(ip, port, m_ssid, 60);
14-
std::thread task([&](SOCKET sock, bool& exit) -> void {
17+
std::function<void(SOCKET, bool&)> func = [&](SOCKET sock, bool& exit) -> void {
1518
try {
1619
LOGI("start keep-alive task");
1720
keepAlive(sock, exit);
@@ -20,9 +23,9 @@ void Subscriber::setup(const char* ip, unsigned short port)
2023
} catch (...) {
2124
LOGE("Unknown exception in keep-alive task");
2225
}
23-
}, m_socket, std::ref(m_exit));
24-
if (task.joinable())
25-
task.detach();
26+
};
27+
g_threadpool.enqueue(func, m_socket, std::ref(m_exit));
28+
g_threadpool.start(3);
2629
}
2730

2831
ssize_t Subscriber::subscribe(uint32_t topic, RECV_CALLBACK callback)
@@ -85,9 +88,6 @@ ssize_t Subscriber::subscribe(uint32_t topic, RECV_CALLBACK callback)
8588
Delete(body);
8689
return -5;
8790
} else {
88-
msg.payload.status[0] = 'O';
89-
msg.payload.status[1] = 'K';
90-
msg.payload.status[2] = '\0';
9191
auto* message = reinterpret_cast<Message*>(new char[size + len]);
9292
if (message != nullptr) {
9393
memcpy(message, &msg.head, HEAD_SIZE);
@@ -97,7 +97,8 @@ ssize_t Subscriber::subscribe(uint32_t topic, RECV_CALLBACK callback)
9797
message->payload.content[len - 1] = '\0';
9898
}
9999
if (callback != nullptr) {
100-
callback(*message);
100+
// std::function<RECV_CALLBACK> func = callback;
101+
g_threadpool.enqueue(callback, *message);
101102
}
102103
LOGI("message payload = [%s]-[%s]", message->payload.status, message->payload.content);
103104
Delete(message);
@@ -128,6 +129,7 @@ void Subscriber::keepAlive(SOCKET socket, bool& exit)
128129

129130
void Subscriber::quit()
130131
{
132+
g_threadpool.stop();
131133
Header head{};
132134
head.cmd = 0xff;
133135
::send(m_socket, reinterpret_cast<char*>(&head), HEAD_SIZE, 0);

src/utils/FileUtils.cpp

100755100644
File mode changed.

src/utils/logging.h

100755100644
File mode changed.

src/utils/msg_que.c

100755100644
+6-6
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313

1414
static pthread_mutex_t g_mutex;
1515

16-
void queue_init(struct MsgQue* q)
16+
void mq_init(struct MsgQue* q)
1717
{
1818
pthread_mutex_init(&g_mutex, NULL);
1919
q->head = q->tail = NULL;
2020
}
2121

22-
void queue_deinit(struct MsgQue* q)
22+
void mq_deinit(struct MsgQue* q)
2323
{
2424
if (q == NULL)
2525
return;
@@ -33,7 +33,7 @@ void queue_deinit(struct MsgQue* q)
3333
pthread_mutex_destroy(&g_mutex);
3434
}
3535

36-
int queue_push(struct MsgQue* q, void* x)
36+
int mq_push(struct MsgQue* q, void* x)
3737
{
3838
pthread_mutex_lock(&g_mutex);
3939
if (q == NULL) {
@@ -58,7 +58,7 @@ int queue_push(struct MsgQue* q, void* x)
5858
return 0;
5959
}
6060

61-
void queue_pop(struct MsgQue* q)
61+
void mq_pop(struct MsgQue* q)
6262
{
6363
pthread_mutex_lock(&g_mutex);
6464
if (q != NULL && q->head != NULL) {
@@ -74,7 +74,7 @@ void queue_pop(struct MsgQue* q)
7474
pthread_mutex_unlock(&g_mutex);
7575
}
7676

77-
void* queue_front(struct MsgQue* q)
77+
void* mq_front(struct MsgQue* q)
7878
{
7979
pthread_mutex_lock(&g_mutex);
8080
if (q == NULL || q->head == NULL) {
@@ -86,7 +86,7 @@ void* queue_front(struct MsgQue* q)
8686
return p;
8787
}
8888

89-
int queue_size(struct MsgQue* q)
89+
int mq_size(struct MsgQue* q)
9090
{
9191
pthread_mutex_lock(&g_mutex);
9292
if (q != NULL) {

src/utils/msg_que.h

+6-6
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ typedef struct MsgQue {
1111
INode* tail;
1212
} MsgQue;
1313

14-
void queue_init(struct MsgQue* q);
15-
int queue_push(struct MsgQue* q, void* x);
16-
void* queue_front(struct MsgQue* q);
17-
void queue_pop(struct MsgQue* q);
18-
int queue_size(struct MsgQue* q);
19-
void queue_deinit(struct MsgQue* q);
14+
void mq_init(struct MsgQue* q);
15+
int mq_push(struct MsgQue* q, void* x);
16+
void* mq_front(struct MsgQue* q);
17+
void mq_pop(struct MsgQue* q);
18+
int mq_size(struct MsgQue* q);
19+
void mq_deinit(struct MsgQue* q);
2020

2121
#endif

src/utils/threadpool.h

-36
This file was deleted.

src/utils/threadpool.cpp src/utils/threadpool.hpp

+37-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,38 @@
1-
#include "threadpool.h"
1+
2+
#ifndef THREADPOOL_HPP
3+
#define THREADPOOL_HPP
4+
5+
#include <vector>
6+
#include <queue>
7+
#include <thread>
8+
#include <functional>
9+
#include <mutex>
10+
#include <condition_variable>
11+
#include <atomic>
12+
#include <future>
13+
#include <stdexcept>
14+
15+
class threadpool {
16+
public:
17+
threadpool();
18+
~threadpool();
19+
20+
template<class F>
21+
void enqueue(F&& f);
22+
23+
template<class F, class... Args>
24+
std::future<typename std::result_of<F(Args...)>::type> enqueue(F&& f, Args&&... args);
25+
26+
void start(size_t threads);
27+
void stop();
28+
29+
private:
30+
std::vector<std::thread> m_workers = {};
31+
std::queue<std::function<void()>> m_tasks = {};
32+
std::mutex m_queueMutex{};
33+
std::condition_variable m_condition{};
34+
std::atomic<bool> m_stopPool;
35+
};
236

337
threadpool::threadpool() : m_stopPool(false) { }
438

@@ -68,3 +102,5 @@ void threadpool::stop()
68102
worker.join();
69103
}
70104
}
105+
106+
#endif // THREADPOOL_HPP

test/CMakeLists.txt

100755100644
File mode changed.

test/test.cpp

100755100644
File mode changed.

0 commit comments

Comments
 (0)