Skip to content

Commit

Permalink
refactored into a PacketConn-based server with a custom wire format
Browse files Browse the repository at this point in the history
  • Loading branch information
johnabass committed Sep 19, 2023
1 parent fafb39a commit e6c243d
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ require (
)

require (
github.com/billhathaway/consistentHash v0.0.0-20140718022140-addea16d2229 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/billhathaway/consistentHash v0.0.0-20140718022140-addea16d2229 h1:w1t+UCLwxXgpUcXAlm3IkvWHGJDfhIyNrzJmCUkJq7s=
github.com/billhathaway/consistentHash v0.0.0-20140718022140-addea16d2229/go.mod h1:YTos5xiYv+RiIsYn3pqdwe5OULySucMqiPes1OgC5pM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down Expand Up @@ -147,6 +149,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM=
github.com/spf13/afero v1.9.5/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb/UhQ=
github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA=
Expand All @@ -170,6 +174,8 @@ github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gt
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
11 changes: 11 additions & 0 deletions pkg/hashy/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package hashy

type Config struct {
Address string
Network string
MaxPacketSize int
MaxConcurrentRequests int

Datacenters map[Datacenter][]string
Vnodes int
}
23 changes: 23 additions & 0 deletions pkg/hashy/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package hashy

type Handler interface {
ServeHash(ResponseWriter, *Request)
}

type HandlerFunc func(ResponseWriter, *Request)

func (hf HandlerFunc) ServeHash(rw ResponseWriter, r *Request) { hf(rw, r) }

type DefaultHandler struct {
Datacenters map[Datacenter]Hasher
}

func (dh *DefaultHandler) ServeHash(rw ResponseWriter, r *Request) {
for _, name := range r.DeviceNames {
for dc, hasher := range dh.Datacenters {
// TODO: error handling
value, _ := hasher.Get(name.GetHashBytes())
rw.Add(name, dc, value)
}
}
}
36 changes: 36 additions & 0 deletions pkg/hashy/hasher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package hashy

import (
"github.com/billhathaway/consistentHash"
"go.uber.org/multierr"
)

type Hasher interface {
Get([]byte) (string, error)
}

func NewHasher(vnodes int, values []string) (Hasher, error) {
ch := consistentHash.New()
if err := ch.SetVnodeCount(vnodes); err != nil {
return nil, err
}

for _, v := range values {
ch.Add(v)
}

return ch, nil
}

func NewDatacenterHashers(cfg Config) (m map[Datacenter]Hasher, err error) {
m = make(map[Datacenter]Hasher, len(cfg.Datacenters))
for dc, values := range cfg.Datacenters {
h, hErr := NewHasher(cfg.Vnodes, values)
err = multierr.Append(err, hErr)
if hErr == nil {
m[dc] = h
}
}

return
}
58 changes: 58 additions & 0 deletions pkg/hashy/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package hashy

import (
"strings"

"github.com/ugorji/go/codec"
)

var msgpackHandle = codec.MsgpackHandle{}

type Datacenter string

type DeviceName string

func (dn DeviceName) GetHashBytes() []byte {
i := strings.IndexRune(string(dn), ':')
if i >= 0 {
return []byte(dn[i+1:])
}

return []byte(dn)
}

type DeviceNames []DeviceName

type DeviceHashes map[DeviceName]map[Datacenter][]string

func (dh *DeviceHashes) Add(name DeviceName, dc Datacenter, value string) {
if *dh == nil {
*dh = DeviceHashes{
name: map[Datacenter][]string{
dc: []string{value},
},
}

return
}

if datacenters, exists := (*dh)[name]; exists {
datacenters[dc] = append(datacenters[dc], value)
} else {
(*dh)[name] = map[Datacenter][]string{
dc: []string{value},
}
}
}

func UnmarshalBytes[T any](b []byte) (t T, err error) {
decoder := codec.NewDecoderBytes(b, &msgpackHandle)
err = decoder.Decode(&t)
return
}

func MarshalBytes[T any](t T) (b []byte, err error) {
encoder := codec.NewEncoderBytes(&b, &msgpackHandle)
err = encoder.Encode(t)
return
}
28 changes: 28 additions & 0 deletions pkg/hashy/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package hashy

import (
"context"
"net"
)

type Request struct {
RemoteAddr net.Addr
DeviceNames DeviceNames

ctx context.Context
}

func (r *Request) Context() context.Context {
if r.ctx == nil {
return context.Background()
}

return r.ctx
}

func NewRequest(names DeviceNames) *Request {
return &Request{
DeviceNames: names,
ctx: context.Background(),
}
}
52 changes: 52 additions & 0 deletions pkg/hashy/responseWriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package hashy

import (
"net"
"sync"
)

type Flusher interface {
Flush() error
}

// Writer defines the behavior of a packet-oriented writer.
type Writer interface {
// WriteTo has the same contract as net.PacketConn.WriteTo.
WriteTo([]byte, net.Addr) (int, error)
}

type syncWriter struct {
lock sync.Mutex
w Writer
}

func (sw *syncWriter) WriteTo(b []byte, a net.Addr) (int, error) {
defer sw.lock.Unlock()
sw.lock.Lock()
return sw.w.WriteTo(b, a)
}

type ResponseWriter interface {
Flusher
Add(DeviceName, Datacenter, string)
}

type responseWriter struct {
remoteAddr net.Addr
writer Writer
hashes DeviceHashes
}

func (rw *responseWriter) Add(name DeviceName, dc Datacenter, value string) {
rw.hashes.Add(name, dc, value)
}

func (rw *responseWriter) Flush() (err error) {
var message []byte
message, err = MarshalBytes(rw.hashes)
if err == nil {
_, err = rw.writer.WriteTo(message, rw.remoteAddr)
}

return
}
Loading

0 comments on commit e6c243d

Please sign in to comment.