Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): request to start a new build #33

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions cmd/data-aggregation-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ var (
builtBy = "unknown"
)

func dispatchSingleRequest(incoming chan struct{}) chan bool {
outgoing := make(chan bool)

go func() {
defer close(outgoing)
for range incoming {
log.Info().Msg("Received new build request.")
outgoing <- true
}
}()

return outgoing
}

func run() error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
Expand Down Expand Up @@ -67,10 +81,11 @@ func run() error {
deviceRepo := device.NewSafeRepository()
reports := report.NewRepository()

// TODO: be able to close goroutine when the context is closed (graceful shutdown)
go job.StartBuildLoop(&deviceRepo, &reports)
newBuildRequest := make(chan struct{})
triggerNewBuild := dispatchSingleRequest(newBuildRequest)

if err := router.NewManager(&deviceRepo, &reports).ListenAndServe(ctx, config.Cfg.API.ListenAddress, config.Cfg.API.ListenPort); err != nil {
go job.StartBuildLoop(&deviceRepo, &reports, triggerNewBuild)
if err := router.NewManager(&deviceRepo, &reports, newBuildRequest).ListenAndServe(ctx, config.Cfg.API.ListenAddress, config.Cfg.API.ListenPort); err != nil {
return fmt.Errorf("webserver error: %w", err)
}

Expand Down
13 changes: 13 additions & 0 deletions internal/api/router/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,16 @@ func (m *Manager) getLastSuccessfulReport(w http.ResponseWriter, _ *http.Request
w.Header().Set(contentType, applicationJSON)
_, _ = w.Write(out)
}

// triggerBuild enables the user to trigger a new build.
//
// It only accepts one build request at a time.
func (m *Manager) triggerBuild(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
w.Header().Set(contentType, applicationJSON)
select {
case m.restartRequest <- struct{}{}:
_, _ = w.Write([]byte("{\"message\": \"new build request received\""))
default:
_, _ = w.Write([]byte("{\"message\": \"a build request is already pending\""))
}
}
11 changes: 7 additions & 4 deletions internal/api/router/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ type DevicesRepository interface {
}

type Manager struct {
devices DevicesRepository
reports *report.Repository
devices DevicesRepository
reports *report.Repository
restartRequest chan<- struct{}
}

// NewManager creates and initializes a new API manager.
func NewManager(deviceRepo DevicesRepository, reports *report.Repository) *Manager {
return &Manager{devices: deviceRepo, reports: reports}
func NewManager(deviceRepo DevicesRepository, reports *report.Repository, restartRequest chan<- struct{}) *Manager {
return &Manager{devices: deviceRepo, reports: reports, restartRequest: restartRequest}
}

// ListenAndServe starts to serve Web API requests.
func (m *Manager) ListenAndServe(ctx context.Context, address string, port int) error {
defer func() {
close(m.restartRequest)
log.Warn().Msg("Shutdown.")
}()

Expand All @@ -57,6 +59,7 @@ func (m *Manager) ListenAndServe(ctx context.Context, address string, port int)
router.GET("/v1/report/last", withAuth.Wrap(m.getLastReport))
router.GET("/v1/report/last/complete", withAuth.Wrap(m.getLastCompleteReport))
router.GET("/v1/report/last/successful", withAuth.Wrap(m.getLastSuccessfulReport))
router.POST("/v1/build/trigger", withAuth.Wrap(m.triggerBuild))

listenSocket := fmt.Sprint(address, ":", port)
log.Info().Msgf("Start webserver - listening on %s", listenSocket)
Expand Down
13 changes: 11 additions & 2 deletions internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ func RunBuild(reportCh chan report.Message) (map[string]*device.Device, report.S
}

// StartBuildLoop starts the build in an infinite loop.
func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Repository) {
//
// Closing the triggerNewBuild channel will stop the loop.
func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Repository, triggerNewBuild <-chan bool) {
metricsRegistry := metrics.NewRegistry()
for {
var wg sync.WaitGroup
Expand Down Expand Up @@ -189,6 +191,13 @@ func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Reposit
close(reportCh)
wg.Wait()

time.Sleep(config.Cfg.Build.Interval)
select {
case <-time.After(config.Cfg.Build.Interval):
case c := <-triggerNewBuild:
if !c {
log.Info().Msg("triggerNewBuild channel closed, stopping build loop")
return
}
}
}
}
4 changes: 2 additions & 2 deletions internal/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func logMessage(msg Message) {
// It ends when the channel is closed.
// This function is concurrent-safe.
func (r *Report) Watch(messageChan <-chan Message) {
log.Info().Msg("Starting report dispatcher")
log.Info().Msg("starting report dispatcher")
r.StartTime = time.Now()
for msg := range messageChan {
logMessage(msg)
Expand All @@ -52,7 +52,7 @@ func (r *Report) Watch(messageChan <-chan Message) {
r.mutex.Unlock()
}
r.EndTime = time.Now()
log.Info().Msg("Stopping report dispatcher")
log.Info().Msg("stopping report dispatcher")
}

func (r *Report) ToJSON() ([]byte, error) {
Expand Down
Loading