-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
209 lines (189 loc) · 6.07 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package main
import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"os"
"os/exec"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type SafeUpdatingSlice struct {
sync.Mutex
slice []string
length chan int
}
func main() {
nodes := SafeUpdatingSlice{length: make(chan int)}
// Configure and parse arguments
port := flag.Int("port", 27780, "port on which to listen for POSTs")
interval := flag.Duration("interval", 30*time.Second, "how frequently to run Ansible, regardless of buffer length")
batchSize := flag.Int("batch-size", 100, "how full the node buffer must be to trigger a non-timed push")
playbook := flag.String("playbook", "main.yaml", "Ansible playbook to run against nodes")
debug := flag.Bool("debug", false, "sets log level to debug")
flag.Parse()
// Initialize logging
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
// Write our logs to stderr, leaving stdout for Ansible messages
log.Logger = log.Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: time.RFC3339})
if *debug {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
} else {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}
// Configure HTTP server
server := &http.Server{Addr: fmt.Sprintf(":%d", *port)}
http.HandleFunc("/Node", func(w http.ResponseWriter, r *http.Request) {
respondNodePost(w, r, &nodes)
})
// Create a waitgroup for Ansible child processes
var wg sync.WaitGroup
// Launch the node-slice watcher and HTTP server
go watchNodes(&nodes, *interval, *batchSize, playbook, &wg)
go func() {
log.Info().Msgf("Awaiting HTTP POST requests on %s...", server.Addr)
err := server.ListenAndServe()
if !errors.Is(err, http.ErrServerClosed) {
log.Fatal().Msgf("HTTP server error: %v", err)
os.Exit(1)
}
log.Debug().Msg("HTTP server shutdown complete")
}()
// Exit cleanly when an OS interrupt signal is received
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT)
log.Info().Msg("Interrupt (^C) to exit")
log.Info().Msgf("Caught OS signal %v, exiting once all Ansible runs finish...", <-sigs)
// Shut down HTTP server
ctx, release := context.WithTimeout(context.Background(), 10*time.Second)
defer release()
err := server.Shutdown(ctx)
if err != nil {
log.Error().Msgf("HTTP server shutdown error: %v", err)
log.Info().Msg("Forcibly closing HTTP server")
server.Close()
}
// Process any nodes left in the buffer
nodes.Lock()
nodeLen := len(nodes.slice)
nodes.Unlock()
if nodeLen > 0 {
log.Info().Msgf("%d nodes remain in buffer!", nodeLen)
runAnsiblePlaybook(playbook, &nodes, &wg)
}
// Stop handling OS signals, allowing for an unclean exit if interrupted again
signal.Stop(sigs)
// Ensure all Ansible runs have finished (we might be interrupted by an OS signal instead)
wg.Wait()
log.Info().Msg("Exited cleanly")
}
func respondNodePost(w http.ResponseWriter, r *http.Request, nodes *SafeUpdatingSlice) {
// TODO: Log errors from this?
if r.Method == http.MethodPost {
// Validate POSTed data; should be of
// Content-Type: application/x-www-form-urlencoded
err := r.ParseForm()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, err.Error())
return
}
nodeName := r.FormValue("data")
if nodeName == "" {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "data field must not be empty")
return
}
// Add our node to the slice, and send a length update to anyone watching
nodes.Lock()
nodes.slice = append(nodes.slice, nodeName)
numNodes := len(nodes.slice)
nodes.Unlock()
log.Debug().Msgf("Buffer updated: %d nodes", numNodes)
nodes.length <- numNodes
fmt.Fprintf(w, "Acknowledged")
} else {
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintf(w, "This endpoint must be POSTed to")
}
}
func watchNodes(nodes *SafeUpdatingSlice, interval time.Duration, batchSize int, playbook *string, wg *sync.WaitGroup) {
// Register a SIGHUP handler
sighup := make(chan os.Signal, 1)
signal.Notify(sighup, syscall.SIGHUP)
// And a timer
timer := time.NewTicker(interval)
// Launch Ansible against current set of nodes, when either:
// - Slice has reached batch size
// - Interval has expired
// - SIGHUP is received
for {
select {
case nodeLen := <-nodes.length:
log.Debug().Msg("Caught a buffer update!")
if nodeLen >= batchSize {
timer.Reset(interval)
runAnsiblePlaybook(playbook, nodes, wg)
} else {
log.Debug().Msgf("Buffer now contains %d nodes; not launching yet", nodeLen)
}
case <-timer.C:
log.Debug().Msg("Caught a timer tick!")
nodes.Lock()
nodeLen := len(nodes.slice)
nodes.Unlock()
if nodeLen > 0 {
runAnsiblePlaybook(playbook, nodes, wg)
} else {
log.Debug().Msg("No nodes in buffer; skipping launch")
}
case <-sighup:
timer.Reset(interval)
log.Debug().Msg("Caught a SIGHUP!")
nodes.Lock()
nodeLen := len(nodes.slice)
nodes.Unlock()
if nodeLen > 0 {
runAnsiblePlaybook(playbook, nodes, wg)
} else {
log.Debug().Msg("No nodes in buffer; skipping launch")
}
}
}
}
func runAnsiblePlaybook(playbook *string, nodes *SafeUpdatingSlice, wg *sync.WaitGroup) {
log.Info().Msgf("Launching Ansible against %v", nodes.slice)
nodes.Lock()
// Compose our Ansible launch command, in exec form
// A trailing comma is necessary for a single node, and fine for multiple nodes
ansibleArgs := []string{*playbook, "--inventory", strings.Join(nodes.slice, ",") + ","}
// Clear node list, since we've launched Ansible
nodes.slice = nil
nodes.Unlock()
// Parallelize our Ansible runs
wg.Add(1)
go ansibleHost(&ansibleArgs, wg)
}
func ansibleHost(args *[]string, wg *sync.WaitGroup) {
defer wg.Done()
// Launch Ansible
log.Debug().Msgf("Launching Ansible with %v", *args)
ansible := exec.Command("ansible-playbook", *args...)
// Don't die when the main process is SIGINT'd
ansible.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// Print all Ansible messages to stdout, since we use stderr for our own logging
ansible.Stdout = os.Stdout
ansible.Stderr = os.Stdout
if err := ansible.Run(); err != nil {
log.Error().Err(err).Msg("An Ansible error occurred!")
}
}