Skip to content

Commit

Permalink
Add graph visualization with throughtput metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ShimmerGlass committed Sep 24, 2020
1 parent edacf6f commit d67c8e5
Show file tree
Hide file tree
Showing 28 changed files with 394 additions and 68 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
static:
cd mirror/server && statik -f -src ./public

proto:
cd mirror && protoc -I=. --go_out=. *.proto
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ go 1.15

require (
github.com/criteo/haproxy-spoe-go v0.0.0-20200316091946-77af26564f0f
github.com/emicklei/dot v0.14.0
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.4.2
github.com/google/cel-go v0.5.1
github.com/google/gopacket v1.1.18
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/common v0.13.0
github.com/rakyll/statik v0.1.7
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.4.0
google.golang.org/genproto v0.0.0-20200305110556-506484158171
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/emicklei/dot v0.14.0 h1:DJbbkKThQ0nW361NB79CqrWcKpYR1JoqJB3FcTUgBEU=
github.com/emicklei/dot v0.14.0/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
Expand Down Expand Up @@ -248,6 +250,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ=
github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func main() {
}

if cfg.ListenAddr != "" {
srv := server.New(cfg.ListenAddr)
srv := server.New(cfg.ListenAddr, cfg.Pipeline)
go func() {
err := srv.Run()
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions mirror/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@ type pipeline struct {
inner mirror.Module
}

func (p *pipeline) Context() *mirror.ModuleContext {
return p.inner.Context()
}

func (p *pipeline) Children() [][]mirror.Module {
return p.inner.Children()
}

func (p *pipeline) SetInput(c <-chan mirror.Request) {
p.inner.SetInput(c)
}
func (p *pipeline) Output() <-chan mirror.Request {
return p.inner.Output()
}
func (p *pipeline) UnmarshalJSON(b []byte) error {
ctx := mirror.ModuleContext{
Name: "pipeline",
ctx := &mirror.ModuleContext{
Name: "Pipeline",
Type: "virtual.pipeline",
}
m, err := registry.Create("control.seq", ctx, b)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion mirror/config/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ func CreateModule(b []byte) (mirror.Module, error) {
}
moduleIndex[mc.Type]++

ctx := mirror.ModuleContext{
ctx := &mirror.ModuleContext{
Type: mc.Type,
Name: name,
}
go ctx.Run()

return registry.Create(mc.Type, ctx, mc.Config)
}
Expand Down
92 changes: 92 additions & 0 deletions mirror/graph/graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package graph

import (
"fmt"
"io"
"os/exec"
"strings"

"github.com/emicklei/dot"
"github.com/shimmerglass/http-mirror-pipeline/mirror"
)

func FromModule(m mirror.Module) *dot.Graph {
g := dot.NewGraph(dot.Directed)
g.Attr("nodesep", "2")
processNode(g, nil, m)
return g
}

func GraphSVG(g *dot.Graph, w io.Writer) error {
fmt.Println(g.String())
cmd := exec.Command("dot", "-Tsvg")
cmd.Stdin = strings.NewReader(g.String())
cmd.Stdout = w

return cmd.Run()
}

func processNode(g *dot.Graph, parents []dot.Node, m mirror.Module) []dot.Node {
ctx := m.Context()
current := g.Node(ctx.Name)
role := ctx.Role()

if role != "virtual" {
current.Attr("label", dot.HTML(
fmt.Sprintf(`
%s<BR />
<FONT point-size="11"><B>Throughput:</B>&nbsp;&nbsp;&nbsp;%d/s</FONT><BR />
<FONT point-size="10">%s</FONT>
`, ctx.Name, ctx.RPS, ctx.Type),
))
}

current.Attr("width", "3")

switch ctx.Role() {
case "virtual":
current.Attr("shape", "underline")
case "source":
current.Attr("shape", "invhouse")
current.Attr("color", "#2D232F")
current.Attr("fillcolor", "#634B66")
current.Attr("fontcolor", "#FFFFFF")
current.Attr("style", "filled")
case "sink":
current.Attr("shape", "invhouse")
current.Attr("color", "#144667")
current.Attr("fillcolor", "#2176AE")
current.Attr("fontcolor", "#FFFFFF")
current.Attr("style", "filled")
default:
current.Attr("shape", "hexagon")
current.Attr("color", "#94A0B3")
current.Attr("fillcolor", "#D2D7DF")
current.Attr("fontcolor", "#000000")
current.Attr("style", "filled")
}

for _, p := range parents {
g.Edge(p, current)
}

children := m.Children()
leafs := []dot.Node{}

for _, group := range children {
parents := []dot.Node{current}
for i, child := range group {
parents = processNode(g, parents, child)

if i == len(group)-1 {
leafs = append(leafs, parents...)
}
}
}

if len(leafs) == 0 {
leafs = append(leafs, current)
}

return leafs
}
38 changes: 37 additions & 1 deletion mirror/module.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,46 @@
package mirror

import (
"strings"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
RequestsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "requets_total",
Help: "The total number of requets handled by the module",
}, []string{"module"})
)

type ModuleContext struct {
Name string
Type string
Name string
RPS int
requestCounter uint64
}

func (c *ModuleContext) Role() string {
return strings.Split(c.Type, ".")[0]
}

func (c *ModuleContext) HandledRequest() {
atomic.AddUint64(&c.requestCounter, 1)
RequestsTotal.WithLabelValues(c.Name).Inc()
}

func (c *ModuleContext) Run() {
for range time.Tick(time.Second) {
c.RPS = int(atomic.SwapUint64(&c.requestCounter, 0))
}
}

type Module interface {
Context() *ModuleContext
SetInput(<-chan Request)
Output() <-chan Request
Children() [][]Module
}
15 changes: 11 additions & 4 deletions mirror/modules/control/decouple.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/shimmerglass/http-mirror-pipeline/mirror"
"github.com/shimmerglass/http-mirror-pipeline/mirror/modules"
"github.com/shimmerglass/http-mirror-pipeline/mirror/registry"
log "github.com/sirupsen/logrus"
)
Expand All @@ -34,12 +33,12 @@ type DecoupleConfig struct {
}

type Decouple struct {
ctx mirror.ModuleContext
ctx *mirror.ModuleContext
out chan mirror.Request
quiet bool
}

func NewDecouple(ctx mirror.ModuleContext, cfg []byte) (mirror.Module, error) {
func NewDecouple(ctx *mirror.ModuleContext, cfg []byte) (mirror.Module, error) {
c := DecoupleConfig{}
err := json.Unmarshal(cfg, &c)
if err != nil {
Expand All @@ -55,6 +54,14 @@ func NewDecouple(ctx mirror.ModuleContext, cfg []byte) (mirror.Module, error) {
return mod, nil
}

func (m *Decouple) Context() *mirror.ModuleContext {
return m.ctx
}

func (m *Decouple) Children() [][]mirror.Module {
return nil
}

func (m *Decouple) Output() <-chan mirror.Request {
return m.out
}
Expand All @@ -64,7 +71,7 @@ func (m *Decouple) SetInput(c <-chan mirror.Request) {

go func() {
for r := range c {
modules.RequestsTotal.WithLabelValues(m.ctx.Name).Inc()
m.ctx.HandledRequest()
select {
case m.out <- r:
default:
Expand Down
18 changes: 17 additions & 1 deletion mirror/modules/control/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ func init() {
}

type Fanout struct {
ctx *mirror.ModuleContext
out chan mirror.Request
modules []mirror.Module
in []chan mirror.Request

outClosed uint32
}

func NewFanout(ctx mirror.ModuleContext, cfg []byte) (mirror.Module, error) {
func NewFanout(ctx *mirror.ModuleContext, cfg []byte) (mirror.Module, error) {
mod := &Fanout{
ctx: ctx,
out: make(chan mirror.Request),
}

Expand All @@ -41,9 +43,23 @@ func NewFanout(ctx mirror.ModuleContext, cfg []byte) (mirror.Module, error) {
go mod.consume(sub)
}

mod.modules = mods

return mod, nil
}

func (m *Fanout) Context() *mirror.ModuleContext {
return m.ctx
}

func (m *Fanout) Children() [][]mirror.Module {
res := [][]mirror.Module{}
for _, m := range m.modules {
res = append(res, []mirror.Module{m})
}
return res
}

func (m *Fanout) Output() <-chan mirror.Request {
return m.out
}
Expand Down
2 changes: 1 addition & 1 deletion mirror/modules/control/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestFanout(t *testing.T) {
mod, err := NewFanout(mirror.ModuleContext{}, []byte(`[
mod, err := NewFanout(&mirror.ModuleContext{}, []byte(`[
{"type": "control.identity", "config": {}},
{"type": "control.identity", "config": {}}
]`))
Expand Down
19 changes: 13 additions & 6 deletions mirror/modules/control/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package control

import (
"github.com/shimmerglass/http-mirror-pipeline/mirror"
"github.com/shimmerglass/http-mirror-pipeline/mirror/modules"
"github.com/shimmerglass/http-mirror-pipeline/mirror/registry"
)

Expand All @@ -15,25 +14,33 @@ func init() {
}

type Identity struct {
ctx mirror.ModuleContext
out chan mirror.Request
modules []mirror.Module
ctx *mirror.ModuleContext
out chan mirror.Request
}

func NewIdentity(ctx mirror.ModuleContext, cfg []byte) (mirror.Module, error) {
func NewIdentity(ctx *mirror.ModuleContext, cfg []byte) (mirror.Module, error) {
return &Identity{
out: make(chan mirror.Request),
ctx: ctx,
}, nil
}

func (m *Identity) Context() *mirror.ModuleContext {
return m.ctx
}

func (m *Identity) Children() [][]mirror.Module {
return nil
}

func (m *Identity) Output() <-chan mirror.Request {
return m.out
}

func (m *Identity) SetInput(c <-chan mirror.Request) {
go func() {
for r := range c {
modules.RequestsTotal.WithLabelValues(m.ctx.Name).Inc()
m.ctx.HandledRequest()
m.out <- r
}
close(m.out)
Expand Down
Loading

0 comments on commit d67c8e5

Please sign in to comment.