-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathserver.go
More file actions
150 lines (139 loc) · 4.42 KB
/
server.go
File metadata and controls
150 lines (139 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Package persistentconn implements the persistent script protocol that splunk core uses
// to communicate with app's persistent REST endpoint. This package handles basic routing and request/response
// handling.
package persistentconn
import (
"bufio"
"container/list"
"io"
"log"
"net/http"
"os"
"sync"
)
// Server represents the persistentconn server that handles request
// and writes response back to the client
type Server struct {
requestChan chan Request
responseChan chan Response
responseQueue *list.List
registry *handlerRegistry
resposneQueueLock *sync.Mutex
}
// NewServer creates a persistentconn server
func NewServer() *Server {
return &Server{
requestChan: make(chan Request),
responseChan: make(chan Response),
responseQueue: list.New(),
registry: &handlerRegistry{},
resposneQueueLock: new(sync.Mutex),
}
}
// Handle registers a handler function for a given path (or path pattern).
// A path pattern is in the format of "<component>/:<param_1>/<component>/..." and a path component
// starting with ":" indicates it's a parameter which will be inferred from the actual path in the request
// E.g. if the registered path pattern is "entity/:name/data" and the
// path in the request is "entity/hello/data", then the key-value pair {"name": "hello"} will be stored
// in the request's params which can be later referenced inside of the handler.
func (s *Server) Handle(path string, handler Handler, allowedMethods ...string) {
s.registry.register(path, handler, allowedMethods)
}
// Run starts a persistentconn server and starts handling request sent from
// client (with splunkd as the middle layer)
func (s *Server) Run() {
go s.handleRequest()
go s.processResponse()
s.startProcessingInputPackets(os.Stdin)
}
// startProcessingInputPackets starts a separate goroutine that reads request sent from client
// and is the entrypoint of a server process
func (s *Server) startProcessingInputPackets(input io.Reader) {
for {
inPacket, err := ReadPacket(input)
if err != nil {
if err == io.EOF {
continue
}
log.Fatal(err)
}
s.parseRequest(inPacket)
}
}
// handleRequest takes request that comes in and find the corresponding handler
func (s *Server) handleRequest() {
for req := range s.requestChan {
s.resposneQueueLock.Lock()
elem := s.responseQueue.PushBack(struct{}{})
s.resposneQueueLock.Unlock()
// handle request in a goroutine
go func(req Request, slot *list.Element) {
var resp Response
if req.isInit {
resp = Response{isInit: true}
} else {
handler := s.registry.getHandler(req)
handlerResponse, err := handler(req)
if err != nil {
handlerResponse.StatusCode = http.StatusInternalServerError
handlerResponse.Body = err.Error()
}
resp = handlerResponse
}
// TODO: replace all print statements with proper logging
// fmt.Printf("Finished handling - response - status: %d - body: %s\n", resp.StatusCode, resp.Body)
slot.Value = resp
s.responseChan <- resp
}(req, elem)
}
}
// processResponse proccesses response from handler and sent the response back to the client
func (s *Server) processResponse() {
for range s.responseChan {
flushedCount, err := s.flushResponses(os.Stdout)
if err != nil {
// fmt.Println("Failed to flush response - Error:", err)
continue
}
if flushedCount != 0 {
// fmt.Printf("Flushed %d responses\n", flushedCount)
}
}
}
// flushResponses go through responses in the response queue of the server, and it flushes consecutive
// responses starting from the front of the queue in batch to ensure that responses are synchronized in the same
// order as the corresonding requests.
func (s *Server) flushResponses(output io.Writer) (int, error) {
s.resposneQueueLock.Lock()
defer s.resposneQueueLock.Unlock()
// prepare response data to flush to stdout
elem := s.responseQueue.Front()
flushedElList := make([]*list.Element, 0)
writer := bufio.NewWriter(output)
for {
if elem == nil {
break
}
resp, ok := elem.Value.(Response)
if !ok {
break
}
data := resp.getRawData()
_, err := writer.WriteString(data)
if err != nil {
return 0, err
}
flushedEl := elem
flushedElList = append(flushedElList, flushedEl)
elem = elem.Next()
}
err := writer.Flush()
if err != nil {
return 0, err
}
// clean up flushed element from the queue
for _, flushedEl := range flushedElList {
s.responseQueue.Remove(flushedEl)
}
return len(flushedElList), nil
}