Skip to content

Commit 8a51e07

Browse files
authored
Merge pull request #33 from kpetremann/build_trigger_endpoint
feat(api): request to start a new build
2 parents cc1f6cd + 653b25a commit 8a51e07

File tree

5 files changed

+51
-11
lines changed

5 files changed

+51
-11
lines changed

cmd/data-aggregation-api/main.go

+18-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,20 @@ var (
4040
builtBy = "unknown"
4141
)
4242

43+
func dispatchSingleRequest(incoming <-chan struct{}) chan struct{} {
44+
outgoing := make(chan struct{})
45+
46+
go func() {
47+
defer close(outgoing)
48+
for range incoming {
49+
log.Info().Msg("Received new build request.")
50+
outgoing <- struct{}{}
51+
}
52+
}()
53+
54+
return outgoing
55+
}
56+
4357
func run() error {
4458
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
4559
defer stop()
@@ -67,10 +81,11 @@ func run() error {
6781
deviceRepo := device.NewSafeRepository()
6882
reports := report.NewRepository()
6983

70-
// TODO: be able to close goroutine when the context is closed (graceful shutdown)
71-
go job.StartBuildLoop(&deviceRepo, &reports)
84+
newBuildRequest := make(chan struct{})
85+
triggerNewBuild := dispatchSingleRequest(newBuildRequest)
7286

73-
if err := router.NewManager(&deviceRepo, &reports).ListenAndServe(ctx, config.Cfg.API.ListenAddress, config.Cfg.API.ListenPort); err != nil {
87+
go job.StartBuildLoop(&deviceRepo, &reports, triggerNewBuild)
88+
if err := router.NewManager(&deviceRepo, &reports, newBuildRequest).ListenAndServe(ctx, config.Cfg.API.ListenAddress, config.Cfg.API.ListenPort); err != nil {
7489
return fmt.Errorf("webserver error: %w", err)
7590
}
7691

internal/api/router/endpoints.go

+13
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,16 @@ func (m *Manager) getLastSuccessfulReport(w http.ResponseWriter, _ *http.Request
130130
w.Header().Set(contentType, applicationJSON)
131131
_, _ = w.Write(out)
132132
}
133+
134+
// triggerBuild enables the user to trigger a new build.
135+
//
136+
// It only accepts one build request at a time.
137+
func (m *Manager) triggerBuild(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
138+
w.Header().Set(contentType, applicationJSON)
139+
select {
140+
case m.newBuildRequest <- struct{}{}:
141+
_, _ = w.Write([]byte("{\"message\": \"new build request received\""))
142+
default:
143+
_, _ = w.Write([]byte("{\"message\": \"a build request is already pending\""))
144+
}
145+
}

internal/api/router/manager.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,20 @@ type DevicesRepository interface {
2727
}
2828

2929
type Manager struct {
30-
devices DevicesRepository
31-
reports *report.Repository
30+
devices DevicesRepository
31+
reports *report.Repository
32+
newBuildRequest chan<- struct{}
3233
}
3334

3435
// NewManager creates and initializes a new API manager.
35-
func NewManager(deviceRepo DevicesRepository, reports *report.Repository) *Manager {
36-
return &Manager{devices: deviceRepo, reports: reports}
36+
func NewManager(deviceRepo DevicesRepository, reports *report.Repository, restartRequest chan<- struct{}) *Manager {
37+
return &Manager{devices: deviceRepo, reports: reports, newBuildRequest: restartRequest}
3738
}
3839

3940
// ListenAndServe starts to serve Web API requests.
4041
func (m *Manager) ListenAndServe(ctx context.Context, address string, port int) error {
4142
defer func() {
43+
close(m.newBuildRequest)
4244
log.Warn().Msg("Shutdown.")
4345
}()
4446

@@ -57,6 +59,7 @@ func (m *Manager) ListenAndServe(ctx context.Context, address string, port int)
5759
router.GET("/v1/report/last", withAuth.Wrap(m.getLastReport))
5860
router.GET("/v1/report/last/complete", withAuth.Wrap(m.getLastCompleteReport))
5961
router.GET("/v1/report/last/successful", withAuth.Wrap(m.getLastSuccessfulReport))
62+
router.POST("/v1/build/trigger", withAuth.Wrap(m.triggerBuild))
6063

6164
listenSocket := fmt.Sprint(address, ":", port)
6265
log.Info().Msgf("Start webserver - listening on %s", listenSocket)

internal/job/job.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,9 @@ func RunBuild(reportCh chan report.Message) (map[string]*device.Device, report.S
144144
}
145145

146146
// StartBuildLoop starts the build in an infinite loop.
147-
func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Repository) {
147+
//
148+
// Closing the triggerNewBuild channel will stop the loop.
149+
func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Repository, triggerNewBuild <-chan struct{}) {
148150
metricsRegistry := metrics.NewRegistry()
149151
for {
150152
var wg sync.WaitGroup
@@ -189,6 +191,13 @@ func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Reposit
189191
close(reportCh)
190192
wg.Wait()
191193

192-
time.Sleep(config.Cfg.Build.Interval)
194+
select {
195+
case <-time.After(config.Cfg.Build.Interval):
196+
case _, ok := <-triggerNewBuild:
197+
if !ok {
198+
log.Info().Msg("triggerNewBuild channel closed, stopping build loop")
199+
return
200+
}
201+
}
193202
}
194203
}

internal/report/report.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func logMessage(msg Message) {
4343
// It ends when the channel is closed.
4444
// This function is concurrent-safe.
4545
func (r *Report) Watch(messageChan <-chan Message) {
46-
log.Info().Msg("Starting report dispatcher")
46+
log.Info().Msg("starting report dispatcher")
4747
r.StartTime = time.Now()
4848
for msg := range messageChan {
4949
logMessage(msg)
@@ -52,7 +52,7 @@ func (r *Report) Watch(messageChan <-chan Message) {
5252
r.mutex.Unlock()
5353
}
5454
r.EndTime = time.Now()
55-
log.Info().Msg("Stopping report dispatcher")
55+
log.Info().Msg("stopping report dispatcher")
5656
}
5757

5858
func (r *Report) ToJSON() ([]byte, error) {

0 commit comments

Comments
 (0)