Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ShimmerGlass committed Aug 30, 2020
0 parents commit dd1efe0
Show file tree
Hide file tree
Showing 23 changed files with 2,472 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/http-mirror-pipeline
/config.json
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
proto:
cd mirror && protoc -I=. -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf --gogoslick_out=Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types:. \
*.proto
97 changes: 97 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# http-mirror-pipeline

http-mirror-pipeline is a tool to mirror HTTP request for continuous testing and benchmarking, replayable logging...

Its modular architecture makes it easy to build powerful pipelines that fit specific needs.

## Example pipelines

## Modules

### Source

#### source.haproxy_spoe

Uses HAProxy SPOE to receive request data. See test/haproxy for an example HAProxy configuration.
Example:

```json
{
"type": "source.haproxy_spoe",
"config": {
"listen_addr": "127.0.0.1:9999"
}
}
```

| Param | Value |
| ------------- | ---------------------------------------- |
| `listen_addr` | Can be `<ip>:<port>` or `@<socket_file>` |

### Sinks

#### sink.http

Send the incomming requests to the specified host.

Example:

```json
{
"type": "sink.http",
"config": {
"timeout": "1s",
"target_url": "http://127.0.0.1:8002"
}
}
```

| Param | Value |
| ------------ | ----------------------------------------------------------- |
| `target_url` | URL to send the requests to. The path in the URL is ignored |
| `timeout` | Requests timeout. Ex: `1s`, `200ms`, `1m30s` |

#### sink.file

Writes the requests in a file

Example:

```json
{
"type": "sink.file",
"config": {
"path": "/tmp/my_file",
"format": "json"
}
}
```

| Param | Value |
| -------- | ----------------------------------------- |
| `path` | Path of the file |
| `format` | How to encode requests. `json` or `proto` |

### Control

#### control.fanout

Duplicates messages to all its children. This allows to both write requests to a file and send them in http for example.

Example:

```json
{
"type": "control.fanout",
"config": [
{
"type": "sink.file",
"config": { ... }
},
{
"type": "sink.http",
"config": { ... }
}
]
}
```
Binary file added docs/kafka.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/simple.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 11 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/shimmerglass/http-mirror-pipeline

go 1.14

require (
github.com/criteo/haproxy-spoe-go v0.0.0-20200316091946-77af26564f0f
github.com/gogo/protobuf v1.2.1
github.com/prometheus/common v0.13.0
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.4.0
)
409 changes: 409 additions & 0 deletions go.sum

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"flag"
"os"

"github.com/shimmerglass/http-mirror-pipeline/mirror/config"
_ "github.com/shimmerglass/http-mirror-pipeline/mirror/modules/control"
_ "github.com/shimmerglass/http-mirror-pipeline/mirror/modules/sink"
_ "github.com/shimmerglass/http-mirror-pipeline/mirror/modules/source"
log "github.com/sirupsen/logrus"
)

func main() {
cfgPath := flag.String("c", "config.json", "Config file path")
flag.Parse()

log.SetLevel(log.DebugLevel)

f, err := os.Open(*cfgPath)
if err != nil {
log.Fatalf("cannot open config file: %s", err)
}

module, err := config.Create(f)
if err != nil {
log.Fatal(err)
}

for range module.Output() {
}
}
19 changes: 19 additions & 0 deletions mirror/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package config

import (
"io"
"io/ioutil"

"github.com/shimmerglass/http-mirror-pipeline/mirror"
"github.com/shimmerglass/http-mirror-pipeline/mirror/modules/control"
"github.com/shimmerglass/http-mirror-pipeline/mirror/registry"
)

func Create(r io.Reader) (mirror.Module, error) {
cfg, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}

return registry.Create(control.SeqName, cfg)
}
6 changes: 6 additions & 0 deletions mirror/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package mirror

type Module interface {
SetInput(<-chan Request)
Output() <-chan Request
}
66 changes: 66 additions & 0 deletions mirror/modules/control/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package control

import (
"encoding/json"

"github.com/shimmerglass/http-mirror-pipeline/mirror"
"github.com/shimmerglass/http-mirror-pipeline/mirror/registry"
)

const (
FanoutName = "control.fanout"
)

func init() {
registry.Register(FanoutName, NewFanout)
}

type FanoutConfigEl struct {
Type string `json:"type"`
Config json.RawMessage
}

type Fanout struct {
out chan mirror.Request
modules []mirror.Module
in []chan mirror.Request
}

func NewFanout(cfg []byte) (mirror.Module, error) {
mod := &Fanout{
out: make(chan mirror.Request),
}

c := []FanoutConfigEl{}
err := json.Unmarshal(cfg, &c)
if err != nil {
return nil, err
}

for _, m := range c {
sub, err := registry.Create(m.Type, []byte(m.Config))
if err != nil {
return nil, err
}

in := make(chan mirror.Request)
mod.in = append(mod.in, in)
sub.SetInput(in)
}

return mod, nil
}

func (m *Fanout) Output() <-chan mirror.Request {
return m.out
}

func (m *Fanout) SetInput(c <-chan mirror.Request) {
go func() {
for r := range c {
for _, i := range m.in {
i <- r
}
}
}()
}
66 changes: 66 additions & 0 deletions mirror/modules/control/seq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package control

import (
"encoding/json"

"github.com/shimmerglass/http-mirror-pipeline/mirror"
"github.com/shimmerglass/http-mirror-pipeline/mirror/registry"
)

const (
SeqName = "control.seq"
)

func init() {
registry.Register(SeqName, NewSeq)
}

type SeqConfigEl struct {
Type string `json:"type"`
Config json.RawMessage
}

type Seq struct {
out chan mirror.Request
modules []mirror.Module
}

func NewSeq(cfg []byte) (mirror.Module, error) {
mod := &Seq{
out: make(chan mirror.Request),
}

c := []SeqConfigEl{}
err := json.Unmarshal(cfg, &c)
if err != nil {
return nil, err
}

var lastOut <-chan mirror.Request
for _, m := range c {
sub, err := registry.Create(m.Type, []byte(m.Config))
if err != nil {
return nil, err
}

if lastOut != nil {
sub.SetInput(lastOut)
}

lastOut = sub.Output()
}

return mod, nil
}

func (m *Seq) Output() <-chan mirror.Request {
return m.out
}

func (m *Seq) SetInput(c <-chan mirror.Request) {
if len(m.modules) == 0 {
return
}

m.modules[0].SetInput(c)
}
Loading

0 comments on commit dd1efe0

Please sign in to comment.