Skip to content

Commit f1fe790

Browse files
committed
Merge pull request #34 from PreetamJinka/v0.0.1-rev6
v0.0.1-rev6
2 parents 8ea5920 + a3110e4 commit f1fe790

28 files changed

+925
-676
lines changed

.gitmodules

Lines changed: 0 additions & 9 deletions
This file was deleted.

api/api.go

Lines changed: 123 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,138 @@
11
package api
22

33
import (
4+
"encoding/json"
5+
"net"
46
"net/http"
57

6-
"github.com/PreetamJinka/cistern/state/metrics"
8+
"github.com/VividCortex/siesta"
9+
10+
"github.com/PreetamJinka/cistern/device"
11+
"github.com/PreetamJinka/cistern/state/flows"
712
"github.com/PreetamJinka/cistern/state/series"
813
)
914

10-
type ApiServer struct {
11-
addr string
12-
hostRegistry *metrics.HostRegistry
13-
engine *series.Engine
15+
type APIServer struct {
16+
addr string
17+
deviceRegistry *device.Registry
18+
seriesEngine *series.Engine
1419
}
1520

16-
func NewApiServer(address string, reg *metrics.HostRegistry, engine *series.Engine) *ApiServer {
17-
return &ApiServer{
18-
addr: address,
19-
hostRegistry: reg,
20-
engine: engine,
21+
func NewAPIServer(address string, deviceRegistry *device.Registry, seriesEngine *series.Engine) *APIServer {
22+
return &APIServer{
23+
addr: address,
24+
deviceRegistry: deviceRegistry,
25+
seriesEngine: seriesEngine,
2126
}
2227
}
2328

24-
func (s *ApiServer) Run() {
25-
http.Handle("/hosts", hostStatus(s.hostRegistry))
26-
http.Handle("/metrics", hostMetrics(s.hostRegistry))
27-
http.Handle("/metricstates", metricStates(s.hostRegistry))
28-
http.Handle("/metricseries", metricSeries(s.engine))
29+
func (s *APIServer) Run() {
30+
service := siesta.NewService("/")
31+
32+
service.AddPre(func(w http.ResponseWriter, r *http.Request) {
33+
w.Header().Set("Access-Control-Allow-Origin", "*")
34+
})
35+
36+
service.AddPost(func(c siesta.Context, w http.ResponseWriter, r *http.Request, q func()) {
37+
resp := c.Get(responseKey)
38+
err, _ := c.Get(errorKey).(string)
39+
40+
if resp == nil && err == "" {
41+
return
42+
}
43+
44+
enc := json.NewEncoder(w)
45+
enc.Encode(APIResponse{
46+
Data: resp,
47+
Error: err,
48+
})
49+
})
50+
51+
service.Route("GET", "/", "Default page", func(c siesta.Context, w http.ResponseWriter, r *http.Request) {
52+
c.Set(responseKey, "Welcome to the Cistern API!")
53+
})
54+
55+
service.Route("GET", "/devices", "Lists sources", func(c siesta.Context, w http.ResponseWriter, r *http.Request) {
56+
type ipHostname struct {
57+
IP string `json:"ip"`
58+
Hostname string `json:"hostname,omitempty"`
59+
}
60+
61+
devices := []ipHostname{}
62+
63+
for _, dev := range s.deviceRegistry.Devices() {
64+
devices = append(devices, ipHostname{
65+
IP: dev.IP().String(),
66+
Hostname: dev.Hostname(),
67+
})
68+
}
69+
70+
c.Set(responseKey, devices)
71+
})
72+
73+
service.Route("GET", "/devices/:device/metrics",
74+
"Lists metrics for a device",
75+
func(c siesta.Context, w http.ResponseWriter, r *http.Request) {
76+
var params siesta.Params
77+
device := params.String("device", "", "Device name")
78+
err := params.Parse(r.Form)
79+
if err != nil {
80+
c.Set(errorKey, err.Error())
81+
return
82+
}
83+
84+
address := net.ParseIP(*device)
85+
dev, present := s.deviceRegistry.Lookup(address)
86+
if !present {
87+
c.Set(errorKey, "device not found")
88+
return
89+
}
90+
91+
c.Set(responseKey, dev.Metrics())
92+
})
93+
94+
service.Route("GET", "/devices/:device/flows",
95+
"Lists top flows for a device",
96+
func(c siesta.Context, w http.ResponseWriter, r *http.Request) {
97+
var params siesta.Params
98+
device := params.String("device", "", "Device name")
99+
err := params.Parse(r.Form)
100+
if err != nil {
101+
c.Set(errorKey, err.Error())
102+
return
103+
}
104+
105+
address := net.ParseIP(*device)
106+
dev, present := s.deviceRegistry.Lookup(address)
107+
if !present {
108+
c.Set(errorKey, "device not found")
109+
return
110+
}
111+
112+
type flowsResponse struct {
113+
ByBytes []flows.Flow `json:"byBytes"`
114+
ByPackets []flows.Flow `json:"byPackets"`
115+
}
116+
117+
topTalkers := dev.TopTalkers()
118+
if topTalkers == nil {
119+
c.Set(errorKey, "No active flows")
120+
return
121+
}
122+
123+
resp := flowsResponse{
124+
ByBytes: topTalkers.ByBytes(),
125+
ByPackets: topTalkers.ByPackets(),
126+
}
127+
128+
c.Set(responseKey, resp)
129+
})
130+
131+
service.Route("GET", "/series/query",
132+
"Lists metrics for a device",
133+
s.querySeriesRoute())
134+
135+
http.Handle("/", service)
136+
29137
go http.ListenAndServe(s.addr, nil)
30138
}

api/context_keys.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package api
2+
3+
const (
4+
responseKey = "response"
5+
errorKey = "error"
6+
)

api/hosts.go

Lines changed: 0 additions & 118 deletions
This file was deleted.

api/response.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package api
2+
3+
type APIResponse struct {
4+
Data interface{} `json:"data,omitempty"`
5+
Error string `json:"error,omitempty"`
6+
}

api/series_query.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package api
2+
3+
import (
4+
"encoding/json"
5+
"net/http"
6+
"time"
7+
8+
"github.com/PreetamJinka/catena"
9+
"github.com/VividCortex/siesta"
10+
)
11+
12+
func (s *APIServer) querySeriesRoute() func(siesta.Context, http.ResponseWriter, *http.Request) {
13+
return func(c siesta.Context, w http.ResponseWriter, r *http.Request) {
14+
var params siesta.Params
15+
downsample := params.Int64("downsample", 0, "A downsample value of averages N points at a time")
16+
err := params.Parse(r.Form)
17+
if err != nil {
18+
c.Set(errorKey, err.Error())
19+
return
20+
}
21+
22+
var descs []catena.QueryDesc
23+
24+
dec := json.NewDecoder(r.Body)
25+
err = dec.Decode(&descs)
26+
if err != nil {
27+
c.Set(errorKey, err.Error())
28+
return
29+
}
30+
31+
now := time.Now().Unix()
32+
for i, desc := range descs {
33+
if desc.Start <= 0 {
34+
desc.Start += now
35+
}
36+
37+
if desc.End <= 0 {
38+
desc.End += now
39+
}
40+
41+
descs[i] = desc
42+
}
43+
44+
resp := s.seriesEngine.Query(descs)
45+
46+
if *downsample <= 1 {
47+
c.Set(responseKey, resp)
48+
return
49+
}
50+
51+
for i, series := range resp.Series {
52+
pointIndex := 0
53+
seenPoints := 1
54+
currentPartition := series.Points[0].Timestamp / *downsample
55+
for j, p := range series.Points {
56+
if j == 0 {
57+
continue
58+
}
59+
60+
if p.Timestamp / *downsample == currentPartition {
61+
series.Points[pointIndex].Value += p.Value
62+
seenPoints++
63+
} else {
64+
currentPartition = p.Timestamp / *downsample
65+
series.Points[pointIndex].Value /= float64(seenPoints)
66+
pointIndex++
67+
seenPoints = 1
68+
series.Points[pointIndex] = p
69+
}
70+
71+
if j == len(series.Points) {
72+
series.Points[pointIndex].Value /= float64(seenPoints)
73+
}
74+
}
75+
76+
series.Points = series.Points[:pointIndex]
77+
resp.Series[i] = series
78+
}
79+
80+
c.Set(responseKey, resp)
81+
}
82+
}

0 commit comments

Comments
 (0)