-
Notifications
You must be signed in to change notification settings - Fork 11
Description
Hi there, I spotted a few problems with the current implementation which led me to write a specification for a possible v2. Here it is
Table of Contents
New public API
There are only two available clients: TCP or UDP.
TLS becomes an optional configuration which can be injected to either the TCP or UDP client.
No more Connect()
method. Connection must be lazily handled by the client.
It is up to the caller's responsability to call Close()
when required.
examples below.
A non TLS TCP client
c, err := NewTCPClient(
"example.com:12345",
)
if err != nil {
// do something
}
err = c.WithTimeout(
100 * time.Millisecond,
)
if err != nil {
// do something
}
A non TLS UDP client
c, err := NewUDPClient(
"example.com:12345",
)
if err != nil {
// do something
}
err = c.WithTimeout(
100 * time.Millisecond,
)
if err != nil {
// do something
}
A TLS enabled TCP client
c, err := NewTCPClient(
"example.com:12345",
)
if err != nil {
// do something
}
err = c.WithTimeout(
100 * time.Millisecond,
)
if err != nil {
// do something
}
err = c.WithTLS(
"path/to/cert",
"path/to/key",
SKIP_CERT_VERIFICATION, // or NO_SKIP_CERT_VERIFICATION
)
if err != nil {
// do something
}
A TLS enabled UDP client
c, err := NewUDPClient(
"example.com:12345",
)
if err != nil {
// do something
}
c.WithTimeout(
100 * time.Millisecond,
)
err = c.WithTLS(
"path/to/cert",
"path/to/key",
SKIP_CERT_VERIFICATION, // or NO_SKIP_CERT_VERIFICATION
)
if err != nil {
// do something
}
If that's too complex for users to implement all these steps for TLS client it is trivial to provide a helper function such as.
func NewTCPClientBuilder(addr, cert, key string, skip bool, timeout time.Duration) (*Client, error) {
c, err := NewTCPClient(addr)
if err != nil {
return nil, err
}
err = c.WithTimeout(timeout)
if err != nil {
return nil, err
}
err = c.WithTLS(cert, key, skip)
if err != nil {
return nil, err
}
return c, nil
}
So it can be used like this:
c, err := NewTCPClientBuilder(....)
// handle error
defer c.Close()
c.Send(&Event{....})
(The same thing can be done for a UDP builder)
New Event type
The current Event
type is:
type Event struct {
Ttl float32
Time time.Time
Tags []string
Host string // Defaults to os.Hostname()
State string
Service string
Metric interface{} // Could be Int, Float32, Float64
Description string
Attributes map[string]string
}
There are two problems with it:
-
Event.Ttl
should be atime.Duration
so it is easier to express how long
anEvent
will survive in Riemman's index -
Event.Metric
generates all sort of complexities when marshalling and
requires using Golang's reflection which is complex and slow
The new proposed Event
is :
type Event struct {
TTL time.Duration
Time time.Time
Tags []string
Host string
State string
Service string
MetricInt64 *int64
MetricFloat32 *float32
MetricFloat64 *float64
Description string
Attributes map[string]string
}
According to Riemman's proto definition an event only supports one of the the following types:
optional sint64 metric_sint64 = 13;
optional double metric_d = 14;
optional float metric_f = 15;
So I propose to directly map :
Event.MetricInt64
toEvent.metric_sint64
Event.MetricFloat32
toEvent.metric_d
Event.MetricFloat64
toEvent.metric_f
And let the user decide which must be used and accept the possible loss that goes along with it for example when sending an uint64
.
By doing this we avoid using Golang's reflection and simplify the marshaling process by using the first non nil
value among Metric(Int64|Float32|Float64)
.
Unless there is a high added value to sort Event.Attributes
I propose not to sort them and send them as is.
Event.Host
can be defaulted to os.Hostname()
but we should state in the documentation that this will generate an openat
syscall (at least on
Linux), more specifically : openat(AT_FDCWD, "/proc/sys/kernel/hostname", O_RDONLY|O_CLOEXEC ...)
for every new Event
.
A buffered sender
I propose to always send events in bulk. The idea behind this buffered (bulked) sender is to reduce roundtrips between Riemman and the sender as much as
possible in order to maximize thoughput while still keeping a pretty high send rate.
Here is a prototype implementation below:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
bf := newBufferedSender()
for i := 0; i < 1e4; i++ {
bf.Send(
&event{ID: i},
)
time.Sleep(
time.Duration(rand.Intn(200)) * time.Microsecond,
)
}
bf.Stop()
}
type event struct {
ID int
}
type bf struct {
in chan *event
wg *sync.WaitGroup
}
func newBufferedSender() *bf {
bf := &bf{
in: make(chan *event, 1e6),
wg: new(sync.WaitGroup),
}
bf.wg.Add(1)
go bf.start()
return bf
}
func (bf *bf) start() {
var buff []*event
resetBuff := func() {
buff = make([]*event, 0, 1e2)
}
send := func(ticked bool) {
fmt.Printf(
"Sending buff with %d events, ticked: %t\n",
len(buff), ticked,
)
time.Sleep(
1 * time.Millisecond,
)
}
tkr := time.NewTicker(
20 * time.Millisecond,
)
defer func() {
fmt.Println("Stop ticking")
tkr.Stop()
fmt.Printf(
"Draining buffer which contains %d events\n",
len(buff),
)
time.Sleep(100 * time.Millisecond)
fmt.Println("Exiting")
bf.wg.Done()
}()
resetBuff()
for {
select {
case e, open := <-bf.in:
if !open {
return
}
buff = append(buff, e)
if len(buff) == cap(buff) {
send(false)
resetBuff()
}
case <-tkr.C:
if len(buff) <= 0 {
continue
}
send(true)
resetBuff()
}
}
}
func (bf *bf) Stop() {
close(bf.in)
bf.wg.Wait()
}
func (bf *bf) Send(e *event) {
bf.in <- e
}
What do you think ?
:)