Skip to content

Commit 1d92219

Browse files
committedJan 14, 2024
add wqueue.
1 parent 777943a commit 1d92219

13 files changed

+186
-67
lines changed
 

‎bin/.game_server.dll

0 Bytes
Binary file not shown.

‎bin/game_server.dll

0 Bytes
Binary file not shown.

‎bin/game_server.exe

1 KB
Binary file not shown.

‎bin/login_server.exe

1 KB
Binary file not shown.

‎build.bat

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ del *.pdb
88

99
set flags=/Z7 /W4 /wd4996 /wd4201 /wd4477 /nologo /diagnostics:column -FC -Gm- -GR-
1010

11-
cl %flags% ../login_server.c ../directory.c ../net_windows.c /Fe:login_server.exe -I../openssl-1.1/x64/include ../openssl-1.1/x64/lib/libcrypto.lib Ws2_32.lib
11+
cl %flags% ../login_server.c ../directory.c ../net_windows.c ../wqueue.c /Fe:login_server.exe -I../openssl-1.1/x64/include ../openssl-1.1/x64/lib/libcrypto.lib Ws2_32.lib
1212

1313
cl %flags% /LD ../game_server.c ../directory.c ../net_windows.c /Fe:game_server.dll /link -incremental:no /PDB:game_server_%random%.pdb /EXPORT:on_init /EXPORT:on_connection /EXPORT:on_request /EXPORT:on_response /EXPORT:on_disconnect /EXPORT:on_tick Ws2_32.lib
1414

15-
cl %flags% ../runner.c ../net_windows.c /Fe:game_server.exe /link /PDB:game_server.pdb Ws2_32.lib
15+
cl %flags% ../runner.c ../net_windows.c ../wqueue.c /Fe:game_server.exe /link /PDB:game_server.pdb Ws2_32.lib
1616

1717
popd

‎game_server.c

+20-25
Original file line numberDiff line numberDiff line change
@@ -601,26 +601,25 @@ void on_request(void **buf, int socket, void *request, size_t len)
601601
handle_request(state, conn);
602602
}
603603

604-
void on_response(void **buf, int socket)
604+
void on_response(void **buf)
605605
{
606606
assert(buf);
607607
struct state *state = *(struct state **) buf;
608608
assert(state);
609-
struct connection *conn = get_connection_from_socket(state, socket);
610-
if (!conn) {
611-
trace("answering a non connected client?" nl);
612-
return;
613-
}
614-
if (conn->sent < conn->to_send_count) {
615-
trace("sending %llu bytes of data" nl, conn->to_send_count - conn->sent);
616-
conn->sent += net_send(conn->socket,
617-
conn->to_send + conn->sent,
618-
conn->to_send_count - conn->sent);
619-
}
620-
// reset counters when all data has been sent.
621-
if (conn->sent >= conn->to_send_count) {
622-
conn->sent = 0;
623-
conn->to_send_count = 0;
609+
610+
for (size_t i = 0; i < countof(state->connections); i++) {
611+
struct connection *conn = state->connections + i;
612+
if (conn->sent < conn->to_send_count) {
613+
void *head = conn->to_send + conn->sent;
614+
unsigned long long to_send = conn->to_send_count - conn->sent;
615+
trace("sending %d bytes of data" nl, (s32) to_send);
616+
conn->sent += net_send(conn->socket, head, to_send);
617+
}
618+
// reset counters when all data has been sent.
619+
if (conn->sent >= conn->to_send_count) {
620+
conn->sent = 0;
621+
conn->to_send_count = 0;
622+
}
624623
}
625624
}
626625

@@ -656,23 +655,19 @@ int on_tick(void **buf)
656655

657656
state->d = 1000.0f;
658657
state->run_time += (double) state->d;
659-
// state->ticks++;
660658
// 1000/10 = 100
661659
state->ticks = (u64) (state->run_time / 100.0);
662660

663-
// trace("ticks %u" nl, state->ticks);
664-
665661
if (state->ticks == old_ticks)
666-
return 1;
667-
668-
// returns 17
669-
// trace("clock %d" nl, millis());
670-
// trace("tick" nl);
662+
return 0;
663+
664+
int flush_responses = 0;
671665

672666
for (size_t i = 0; i < countof(state->characters); i++) {
673667
struct character *character = state->characters + i;
674668
if (!character->active)
675669
continue;
670+
flush_responses = 1;
676671
switch (character->action_type) {
677672
case idle:
678673
// idle_update(state, character);
@@ -688,7 +683,7 @@ int on_tick(void **buf)
688683
}
689684
}
690685

691-
return 1;
686+
return flush_responses;
692687
}
693688

694689
static struct connection *get_connection_from_socket(struct state *state, int socket)

‎login_server.c

+33-16
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "directory.h"
1818
#include "net.h"
19+
#include "wqueue.h"
1920

2021
typedef uint8_t byte;
2122

@@ -67,6 +68,7 @@ struct connection {
6768
RSA *rsa_key;
6869
};
6970

71+
static struct wqueue send_responses_worker = {0};
7072
static struct connection connections[32] = {0};
7173

7274
static u32 ip_to_u32(char *ip)
@@ -704,31 +706,46 @@ static void handle_event(int socket, enum net_event event, void *read, size_t le
704706
on_request(conn);
705707
} break;
706708

707-
case net_write: {
708-
if (conn->sent < conn->to_send_count) {
709-
trace("sending %d bytes of data" nl,
710-
(s32) (conn->to_send_count - conn->sent));
711-
conn->sent += net_send(conn->socket,
712-
conn->to_send + conn->sent,
713-
conn->to_send_count - conn->sent);
714-
}
715-
// reset counters when all data has been sent.
716-
if (conn->sent >= conn->to_send_count) {
717-
conn->sent = 0;
718-
conn->to_send_count = 0;
719-
}
720-
} break;
721-
722709
default:
723710
break;
724711
}
712+
// we assume we always need to reply with something
713+
// when we get a request
714+
wpush(&send_responses_worker, conn);
715+
}
716+
717+
static void send_responses(struct wqueue *q, void *w)
718+
{
719+
struct connection *conn = (struct connection *) w;
720+
721+
void *head = conn->to_send + conn->sent;
722+
unsigned long long to_send = conn->to_send_count - conn->sent;
723+
trace("sending %d bytes of data" nl, to_send);
724+
conn->sent += net_send(conn->socket, head, to_send);
725+
726+
// if we couldn't sent the entire response, re-add
727+
// this connection to the worker so we try
728+
// to flush later again.
729+
// NOTE(fmontenegro) do we want to try up to n times
730+
// and then dropping the connection?
731+
if (conn->sent < conn->to_send_count) {
732+
wpush(q, conn);
733+
return;
734+
}
735+
736+
// all data has been sent, reset the counters.
737+
conn->sent = 0;
738+
conn->to_send_count = 0;
725739
}
726740

727741
int main()
728742
{
729-
#define port 2106
743+
wstart(&send_responses_worker, send_responses);
744+
745+
unsigned short port = 2106;
730746
int socket = net_port(port);
731747
trace("login server, listening for connection on port %d" nl, port);
732748
net_listen(socket, handle_event);
749+
733750
return 0;
734751
}

‎net.h

+2-4
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ enum net_event {
55
net_closed,
66
// can read
77
net_read,
8-
// can write
9-
net_write,
108
};
119

1210
typedef void (net_handler)
@@ -20,8 +18,8 @@ int net_sock(char *path);
2018
void net_listen(int server, net_handler *handler);
2119
// try to send the full buffer. returns the amount
2220
// of bytes that was able to send. if it writes
23-
// less than the intended amount, wait for net_write
24-
// to keep sending past the bytes already sent.
21+
// less than the intended amount, you will have to
22+
// keep trying sending the rest of the data.
2523
unsigned long long net_send(int socket, void *buf, unsigned long long n);
2624
// close the socket.
2725
void net_close(int socket);

‎net_linux.c

+1-3
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ void net_listen(int server, net_handler *handler)
126126
break;
127127
}
128128
event.data.fd = client;
129-
event.events = EPOLLIN | EPOLLOUT | EPOLLET;
129+
event.events = EPOLLIN | EPOLLET;
130130
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client, &event) == -1) {
131131
print_err("failed to add new client to epoll");
132132
close(client);
@@ -155,8 +155,6 @@ void net_listen(int server, net_handler *handler)
155155
handler(events[i].data.fd, net_read, read_buf, (size_t) received);
156156
}
157157
}
158-
if (events[i].events & EPOLLOUT)
159-
handler(events[i].data.fd, net_write, 0, 0);
160158
}
161159
}
162160

‎net_windows.c

+2-9
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ void net_listen(int server, net_handler *handler)
5555
static SOCKET clients[max_clients];
5656
static unsigned char read_buf[8192] = {0};
5757

58-
int highest = 0;
58+
SOCKET highest = 0;
5959

6060
if (!handler) {
6161
printf("net error: no socket request handler provided.\n");
@@ -67,9 +67,7 @@ void net_listen(int server, net_handler *handler)
6767

6868
while (1) {
6969
fd_set readfds;
70-
fd_set writefds;
7170
FD_ZERO(&readfds);
72-
FD_ZERO(&writefds);
7371
FD_SET(server, &readfds);
7472

7573
highest = server;
@@ -83,15 +81,14 @@ void net_listen(int server, net_handler *handler)
8381
int optlen = sizeof(optval);
8482
if (getsockopt(clients[i], SOL_SOCKET, SO_ERROR, (char*) &optval, &optlen) != SOCKET_ERROR && optval == 0) {
8583
FD_SET(clients[i], &readfds);
86-
FD_SET(clients[i], &writefds);
8784
if (clients[i] > highest)
8885
highest = clients[i];
8986
} else {
9087
clients[i] = INVALID_SOCKET;
9188
}
9289
}
9390

94-
int activity = select(highest + 1, &readfds, &writefds, 0, 0);
91+
int activity = select((int) (highest + 1), &readfds, 0, 0, 0);
9592
if (activity == SOCKET_ERROR) {
9693
printf("select error %d\n", WSAGetLastError());
9794
break;
@@ -153,10 +150,6 @@ void net_listen(int server, net_handler *handler)
153150
}
154151
}
155152
}
156-
157-
// write.
158-
if (FD_ISSET(clients[i], &writefds))
159-
handler((int) clients[i], net_write, 0, 0);
160153
}
161154
}
162155
}

‎runner.c

+20-8
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
#endif
66

77
#include "net.h"
8+
#include "wqueue.h"
89

910
typedef int on_init(void **buf);
1011
typedef void on_connection(void **buf, int socket);
1112
typedef void on_request(void **buf, int socket, void *request, size_t len);
12-
typedef void on_response(void **buf, int socket);
13+
typedef void on_response(void **buf);
1314
typedef void on_disconnect(void **buf, int socket);
1415
typedef int on_tick(void **buf);
1516

@@ -26,6 +27,8 @@ struct state {
2627
on_response *on_response;
2728
on_disconnect *on_disconnect;
2829
on_tick *on_tick;
30+
31+
struct wqueue send_responses_worker;
2932

3033
#ifdef _WIN32
3134
FILETIME lib_write_time;
@@ -92,7 +95,7 @@ static void load_lib_if_required(void)
9295
#endif
9396
}
9497

95-
void handle_event(int socket, enum net_event event, void *read, size_t len)
98+
static void handle_event(int socket, enum net_event event, void *read, size_t len)
9699
{
97100
load_lib_if_required();
98101

@@ -109,26 +112,33 @@ void handle_event(int socket, enum net_event event, void *read, size_t len)
109112
state.on_request(&state.buf, socket, read, len);
110113
break;
111114

112-
case net_write:
113-
state.on_response(&state.buf, socket);
114-
break;
115-
116115
default:
117116
break;
118117
}
118+
119+
wpush(&state.send_responses_worker, 0);
119120
}
120121

121-
DWORD timer_thread(LPVOID param)
122+
static DWORD timer_thread(LPVOID param)
122123
{
123124
param = param;
124125
while (1) {
125126
Sleep(1000);
126127
load_lib_if_required();
127-
state.on_tick(&state.buf);
128+
if (state.on_tick(&state.buf))
129+
wpush(&state.send_responses_worker, 0);
128130
}
129131
return 0;
130132
}
131133

134+
static void send_responses(struct wqueue *q, void *w)
135+
{
136+
q = q;
137+
w = w;
138+
load_lib_if_required();
139+
state.on_response(&state.buf);
140+
}
141+
132142
int main()
133143
{
134144
if (!lib_load())
@@ -138,6 +148,8 @@ int main()
138148
return 0;
139149

140150
CreateThread(0, 0, timer_thread, 0, 0, 0);
151+
152+
wstart(&state.send_responses_worker, send_responses);
141153

142154
int socket = net_port(7777);
143155
net_listen(socket, handle_event);

‎wqueue.c

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#include <windows.h>
2+
#include "wqueue.h"
3+
4+
static unsigned long wqueue_thread(void *p)
5+
{
6+
struct wqueue *q = (struct wqueue *) p;
7+
while (1) {
8+
int close = 0;
9+
EnterCriticalSection(&q->lock);
10+
close = q->closed && !q->wcount;
11+
LeaveCriticalSection(&q->lock);
12+
if (close)
13+
break;
14+
15+
WaitForSingleObject(q->wpending, INFINITE);
16+
EnterCriticalSection(&q->lock);
17+
void *work = q->work[0];
18+
for (size_t i = 1; i < q->wcount; i++)
19+
q->work[i - 1] = q->work[i];
20+
q->wcount--;
21+
LeaveCriticalSection(&q->lock);
22+
q->worker((void *) q, work);
23+
}
24+
25+
CloseHandle(q->wpending);
26+
CloseHandle(q->thread);
27+
DeleteCriticalSection(&q->lock);
28+
29+
return 0;
30+
}
31+
32+
void wstart(struct wqueue *q, worker *worker)
33+
{
34+
InitializeCriticalSection(&q->lock);
35+
q->closed = 0;
36+
q->worker = worker;
37+
q->wpending = CreateEvent(0, FALSE, FALSE, 0);
38+
q->thread = CreateThread(0, 0, wqueue_thread, q, 0, 0);
39+
}
40+
41+
void wpush(struct wqueue *q, void *work)
42+
{
43+
if (q->closed)
44+
return;
45+
EnterCriticalSection(&q->lock);
46+
q->work[q->wcount++] = work;
47+
LeaveCriticalSection(&q->lock);
48+
SetEvent(q->wpending);
49+
}
50+
51+
void wclose(struct wqueue *q)
52+
{
53+
if (q->closed)
54+
return;
55+
EnterCriticalSection(&q->lock);
56+
q->closed = 1;
57+
LeaveCriticalSection(&q->lock);
58+
}
59+
60+
#ifdef run_wqueue
61+
// example on how to use these functions
62+
#include <stdio.h>
63+
64+
void do_work(struct wqueue *q, void *w)
65+
{
66+
static int tmp = 42;
67+
int value = *(int *) w;
68+
if (value == 2) {
69+
fprintf(stderr, "adding new value because we found 2\n");
70+
wpush(q, &tmp);
71+
}
72+
fprintf(stderr, "new work! value is %d\n", value);
73+
}
74+
75+
int main()
76+
{
77+
struct wqueue q = {0};
78+
wstart(&q, do_work);
79+
80+
int values[] = {1, 2, 3, 4, 5};
81+
for (int i = 0; i < 5; i++) {
82+
wpush(&q, values + i);
83+
// wclose(&q);
84+
Sleep(1000);
85+
}
86+
fprintf(stderr, "all done!\n");
87+
}
88+
#endif

‎wqueue.h

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#include <windows.h>
2+
3+
struct wqueue;
4+
typedef void (worker)(struct wqueue *q, void *work);
5+
6+
struct wqueue {
7+
CRITICAL_SECTION lock;
8+
HANDLE wpending;
9+
HANDLE thread;
10+
worker *worker;
11+
void *work[256];
12+
size_t wcount;
13+
int closed;
14+
};
15+
16+
void wstart(struct wqueue *q, worker *worker);
17+
void wpush(struct wqueue *q, void *work);
18+
void wclose(struct wqueue *q);

0 commit comments

Comments
 (0)
Please sign in to comment.