forked from criteo/data-aggregation-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.go
85 lines (68 loc) · 2.62 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package router
import (
"context"
"fmt"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"github.com/criteo/data-aggregation-api/internal/api/auth"
"github.com/criteo/data-aggregation-api/internal/config"
"github.com/criteo/data-aggregation-api/internal/convertor/device"
"github.com/criteo/data-aggregation-api/internal/report"
"github.com/julienschmidt/httprouter"
)
const shutdownTimeout = 5 * time.Second
type DevicesRepository interface {
Set(devices map[string]*device.Device)
ListAFKEnabledDevicesJSON() ([]byte, error)
IsAFKEnabledJSON(hostname string) ([]byte, error)
GetAllDevicesOpenConfigJSON() ([]byte, error)
GetDeviceOpenConfigJSON(hostname string) ([]byte, error)
}
type Manager struct {
devices DevicesRepository
reports *report.Repository
restartRequest chan<- struct{}
}
// NewManager creates and initializes a new API manager.
func NewManager(deviceRepo DevicesRepository, reports *report.Repository, restartRequest chan<- struct{}) *Manager {
return &Manager{devices: deviceRepo, reports: reports, restartRequest: restartRequest}
}
// ListenAndServe starts to serve Web API requests.
func (m *Manager) ListenAndServe(ctx context.Context, address string, port int) error {
defer func() {
close(m.restartRequest)
log.Warn().Msg("Shutdown.")
}()
withAuth, err := auth.NewBasicAuth(ctx, config.Cfg.Authentication)
if err != nil {
return err
}
router := httprouter.New()
router.GET("/metrics", prometheusMetrics(promhttp.Handler()))
router.GET("/api/version", getVersion)
router.GET("/api/health", healthCheck)
router.GET("/v1/devices/:hostname/afk_enabled", withAuth.Wrap(m.getAFKEnabled))
router.GET("/v1/devices/:hostname/openconfig", withAuth.Wrap(m.getDeviceOpenConfig))
router.GET("/v1/report/last", withAuth.Wrap(m.getLastReport))
router.GET("/v1/report/last/complete", withAuth.Wrap(m.getLastCompleteReport))
router.GET("/v1/report/last/successful", withAuth.Wrap(m.getLastSuccessfulReport))
router.POST("/v1/build/trigger", withAuth.Wrap(m.triggerBuild))
listenSocket := fmt.Sprint(address, ":", port)
log.Info().Msgf("Start webserver - listening on %s", listenSocket)
httpServer := http.Server{Addr: listenSocket, Handler: router}
// TODO: handle http failure! with a channel
go func() {
if err := httpServer.ListenAndServe(); err != nil {
log.Error().Err(err).Msg("stopped to listen and serve")
}
}()
<-ctx.Done()
ctxCancel, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
if err := httpServer.Shutdown(ctxCancel); err != nil {
log.Error().Err(err).Send()
}
return nil
}