1+ import core.time : msecs;
2+ import std.parallelism : totalCPUs;
3+ import std.process : environment;
4+
5+ import juptune.core.util ,
6+ juptune.core.ds ,
7+ juptune.event,
8+ juptune.event.fiber,
9+ juptune.http;
10+
11+ import tests.common : log;
12+ import tests.plaintext;
13+
14+ /+ +++ Constant config ++++/
15+
16+ enum SOCKET_BACKLOG_PER_THREAD = 1000 ;
17+ enum FIBER_CALL_STACK_BYTES = 1024 * 100 ;
18+ enum HTTP_READ_BUFFER_BYTES = 1024 ;
19+ enum HTTP_WRITE_BUFFER_BYTES = 1024 ;
20+
21+ enum HTTP_CONFIG = Http1Config()
22+ .withReadTimeout(1000. msecs)
23+ .withWriteTimeout(1000. msecs);
24+
25+ static assert (
26+ HTTP_READ_BUFFER_BYTES + HTTP_WRITE_BUFFER_BYTES < FIBER_CALL_STACK_BYTES / 4 ,
27+ " To be safe, please ensure the buffer bytes are only a quarter the size of a fiber call stack."
28+ );
29+
30+ /+ +++ Globals ++++/
31+
32+ __gshared TcpSocket server; // Currently there's no mechanism to directly pass data to new threads, so global state has to be used.
33+
34+ /+ +++ Functions ++++/
35+
36+ void main ()
37+ {
38+ auto loop = EventLoop(
39+ EventLoopConfig()
40+ .withFiberAllocatorConfig(
41+ FiberAllocatorConfig()
42+ .withBlockStackSize(FIBER_CALL_STACK_BYTES )
43+ )
44+ );
45+
46+ // open() and listen() can't be ran outside of an event loop thread, so currently this is the janky way to setup the server.
47+ loop.addNoGCThread(() @nogc nothrow {
48+ server.open().resultAssert;
49+ server.listen(" 0.0.0.0:8080" , SOCKET_BACKLOG_PER_THREAD * totalCPUs).resultAssert;
50+ juptuneEventLoopCancelThread();
51+ });
52+ loop.join();
53+
54+ // Then we can setup the proper loop threads.
55+ foreach (i; 0 .. totalCPUs)
56+ loop.addGCThread(&router);
57+ loop.join();
58+ }
59+
60+ // Juptune currently does not provide higher-level server features out of the box, so we have
61+ // to hand-make a custom router.
62+ //
63+ // This is realistic in the sense that building a custom router is a completely valid, supported pattern
64+ // for people who want/need something very specialised.
65+ //
66+ // This is unrealistic in the sense that once Juptune has a native router, the native router would
67+ // almost certainly be used in a case like this (but since that's a TODO, this will have to do for now).
68+ void router () nothrow
69+ {
70+ try
71+ {
72+ enum Route
73+ {
74+ FAILSAFE ,
75+ plaintext,
76+ }
77+
78+ enum Method
79+ {
80+ FAILSAFE ,
81+ get
82+ }
83+
84+ union RouteInput
85+ {
86+ PlainTextHeaderInput plaintext;
87+ }
88+
89+ while (! juptuneEventLoopIsThreadCanceled())
90+ {
91+ TcpSocket client;
92+
93+ auto result = server.accept(client);
94+ if (result.isError)
95+ {
96+ log(" error accepting socket: " , result);
97+ continue ;
98+ }
99+
100+ result = async(function () nothrow {
101+ auto client = juptuneEventLoopGetContext! TcpSocket ;
102+ scope (exit) if (client.isOpen)
103+ auto _ = client.close();
104+
105+ Http1MessageSummary readSummary, writeSummary;
106+ do
107+ {
108+ if (! client.isOpen)
109+ return ;
110+
111+ // Read & Write primitives
112+ ubyte [HTTP_READ_BUFFER_BYTES ] readBuffer;
113+ ubyte [HTTP_WRITE_BUFFER_BYTES ] writeBuffer;
114+ auto reader = Http1Reader(client, readBuffer, HTTP_CONFIG );
115+ auto writer = Http1Writer(client, writeBuffer, HTTP_CONFIG );
116+
117+ // Routing state
118+ Route route;
119+ Method method;
120+ RouteInput input;
121+
122+ // Error handling
123+ uint errorCode;
124+ string errorMsg;
125+ void setError (uint code, string msg)
126+ {
127+ if (errorMsg ! is null )
128+ return ;
129+ errorCode = code;
130+ errorMsg = msg;
131+ }
132+
133+ // Parse request line
134+ {
135+ Http1RequestLine requestLine;
136+ auto result = reader.readRequestLine(requestLine);
137+ if (result.isError)
138+ {
139+ log(" readRequestLine() failed: " , result.error, " : " , result.context.slice);
140+ return ;
141+ }
142+
143+ requestLine.access((scope methodString, scope uri){
144+ switch (methodString)
145+ {
146+ case " GET" :
147+ method = Method.get ;
148+ break ;
149+
150+ default :
151+ setError(405 , " Unexpected method" );
152+ break ;
153+ }
154+
155+ switch (uri.path)
156+ {
157+ case " /plaintext" :
158+ route = Route.plaintext;
159+ break ;
160+
161+ default :
162+ setError(404 , " Not found" );
163+ break ;
164+ }
165+ });
166+ }
167+
168+ // Read headers
169+ bool foundEndOfHeaders;
170+ while (! foundEndOfHeaders)
171+ {
172+ auto result = reader.checkEndOfHeaders(foundEndOfHeaders);
173+ if (result.isError)
174+ {
175+ log(" checkEndOfHeaders() failed: " , result);
176+ return ;
177+ }
178+ else if (foundEndOfHeaders)
179+ break ;
180+
181+ Http1Header header;
182+ result = reader.readHeader(header);
183+ if (result.isError)
184+ {
185+ log(" readHeader() failed: " , result);
186+ return ;
187+ }
188+
189+ // Since we're using a custom router, we have the luxury of handling/ignoring headers during routing rather
190+ // than stuffing them all into a hashmap, and doing the processing post-routing.
191+ header.access((scope name, scope value){
192+ final switch (route) with (Route)
193+ {
194+ case FAILSAFE : break ;
195+
196+ case plaintext:
197+ break ;
198+ }
199+ });
200+ }
201+
202+ // Read body
203+ Http1BodyChunk chunk;
204+ do {
205+ chunk = Http1BodyChunk();
206+ auto result = reader.readBody(chunk);
207+ if (result.isError)
208+ {
209+ log(" readBody() failed: " , result);
210+ return ;
211+ }
212+
213+ // Likewise, we only need to deal with body data in certain routes, so we can ignore them in others.
214+ chunk.access((scope data){
215+ final switch (route) with (Route)
216+ {
217+ case FAILSAFE : break ;
218+
219+ case plaintext:
220+ break ;
221+ }
222+ });
223+ } while (chunk.hasDataLeft);
224+
225+ // Finish reading the message, and either dispatch it to a handler, or report an error back.
226+ auto result = reader.finishMessage(readSummary);
227+ if (result.isError)
228+ {
229+ log(" finishMessage() failed: " , result);
230+ return ;
231+ }
232+
233+ if (errorMsg ! is null )
234+ {
235+ import tests.common : putServerAndDate;
236+ result = writer.putResponseLine(Http1Version.http11, errorCode, errorMsg).then! (
237+ () => writer.putServerAndDate(),
238+ () => writer.finishHeaders(),
239+ () => writer.finishBody(),
240+ () => writer.finishTrailers(),
241+ () => writer.finishMessage(writeSummary),
242+ );
243+ if (result.isError)
244+ {
245+ log(" finishing a message [error variant] failed: " , result);
246+ return ;
247+ }
248+ continue ;
249+ }
250+
251+ final switch (route) with (Route)
252+ {
253+ case FAILSAFE : break ;
254+
255+ case plaintext:
256+ handlePlainText(input.plaintext, writer, writeSummary);
257+ break ;
258+ }
259+ } while (! readSummary.connectionClosed && ! writeSummary.connectionClosed);
260+ }, client, &asyncMoveSetter! TcpSocket );
261+ if (result.isError)
262+ {
263+ log(" error calling async(): " , result);
264+ continue ;
265+ }
266+ }
267+ }
268+ catch (Throwable ex) // @suppress(dscanner.suspicious.catch_em_all)
269+ {
270+ import std.exception : assumeWontThrow;
271+ log(" uncaught exception: " , ex.msg).assumeWontThrow;
272+ debug log(ex.info).assumeWontThrow;
273+ }
274+ }
0 commit comments