From e6c243db096f9aa1d52896d7b43155612f85dd8a Mon Sep 17 00:00:00 2001 From: johnabass Date: Tue, 19 Sep 2023 14:34:16 -0700 Subject: [PATCH] refactored into a PacketConn-based server with a custom wire format --- go.mod | 3 + go.sum | 6 ++ pkg/hashy/config.go | 11 +++ pkg/hashy/handler.go | 23 +++++ pkg/hashy/hasher.go | 36 +++++++ pkg/hashy/message.go | 58 ++++++++++++ pkg/hashy/request.go | 28 ++++++ pkg/hashy/responseWriter.go | 52 +++++++++++ pkg/hashy/server.go | 182 ++++++++++++++++++++++++++++++++++++ 9 files changed, 399 insertions(+) create mode 100644 pkg/hashy/config.go create mode 100644 pkg/hashy/handler.go create mode 100644 pkg/hashy/hasher.go create mode 100644 pkg/hashy/message.go create mode 100644 pkg/hashy/request.go create mode 100644 pkg/hashy/responseWriter.go create mode 100644 pkg/hashy/server.go diff --git a/go.mod b/go.mod index 32801b4..c67122a 100644 --- a/go.mod +++ b/go.mod @@ -10,16 +10,19 @@ require ( ) require ( + github.com/billhathaway/consistentHash v0.0.0-20140718022140-addea16d2229 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.4.2 // indirect + github.com/ugorji/go/codec v1.2.11 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.17.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index a363dd1..9871142 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/billhathaway/consistentHash v0.0.0-20140718022140-addea16d2229 h1:w1t+UCLwxXgpUcXAlm3IkvWHGJDfhIyNrzJmCUkJq7s= +github.com/billhathaway/consistentHash v0.0.0-20140718022140-addea16d2229/go.mod h1:YTos5xiYv+RiIsYn3pqdwe5OULySucMqiPes1OgC5pM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -147,6 +149,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM= github.com/spf13/afero v1.9.5/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb/UhQ= github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA= @@ -170,6 +174,8 @@ github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gt github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= +github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= +github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/hashy/config.go b/pkg/hashy/config.go new file mode 100644 index 0000000..386a2cc --- /dev/null +++ b/pkg/hashy/config.go @@ -0,0 +1,11 @@ +package hashy + +type Config struct { + Address string + Network string + MaxPacketSize int + MaxConcurrentRequests int + + Datacenters map[Datacenter][]string + Vnodes int +} diff --git a/pkg/hashy/handler.go b/pkg/hashy/handler.go new file mode 100644 index 0000000..45a5d84 --- /dev/null +++ b/pkg/hashy/handler.go @@ -0,0 +1,23 @@ +package hashy + +type Handler interface { + ServeHash(ResponseWriter, *Request) +} + +type HandlerFunc func(ResponseWriter, *Request) + +func (hf HandlerFunc) ServeHash(rw ResponseWriter, r *Request) { hf(rw, r) } + +type DefaultHandler struct { + Datacenters map[Datacenter]Hasher +} + +func (dh *DefaultHandler) ServeHash(rw ResponseWriter, r *Request) { + for _, name := range r.DeviceNames { + for dc, hasher := range dh.Datacenters { + // TODO: error handling + value, _ := hasher.Get(name.GetHashBytes()) + rw.Add(name, dc, value) + } + } +} diff --git a/pkg/hashy/hasher.go b/pkg/hashy/hasher.go new file mode 100644 index 0000000..80bae61 --- /dev/null +++ b/pkg/hashy/hasher.go @@ -0,0 +1,36 @@ +package hashy + +import ( + "github.com/billhathaway/consistentHash" + "go.uber.org/multierr" +) + +type Hasher interface { + Get([]byte) (string, error) +} + +func NewHasher(vnodes int, values []string) (Hasher, error) { + ch := consistentHash.New() + if err := ch.SetVnodeCount(vnodes); err != nil { + return nil, err + } + + for _, v := range values { + ch.Add(v) + } + + return ch, nil +} + +func NewDatacenterHashers(cfg Config) (m map[Datacenter]Hasher, err error) { + m = make(map[Datacenter]Hasher, len(cfg.Datacenters)) + for dc, values := range cfg.Datacenters { + h, hErr := NewHasher(cfg.Vnodes, values) + err = multierr.Append(err, hErr) + if hErr == nil { + m[dc] = h + } + } + + return +} diff --git a/pkg/hashy/message.go b/pkg/hashy/message.go new file mode 100644 index 0000000..8212c2c --- /dev/null +++ b/pkg/hashy/message.go @@ -0,0 +1,58 @@ +package hashy + +import ( + "strings" + + "github.com/ugorji/go/codec" +) + +var msgpackHandle = codec.MsgpackHandle{} + +type Datacenter string + +type DeviceName string + +func (dn DeviceName) GetHashBytes() []byte { + i := strings.IndexRune(string(dn), ':') + if i >= 0 { + return []byte(dn[i+1:]) + } + + return []byte(dn) +} + +type DeviceNames []DeviceName + +type DeviceHashes map[DeviceName]map[Datacenter][]string + +func (dh *DeviceHashes) Add(name DeviceName, dc Datacenter, value string) { + if *dh == nil { + *dh = DeviceHashes{ + name: map[Datacenter][]string{ + dc: []string{value}, + }, + } + + return + } + + if datacenters, exists := (*dh)[name]; exists { + datacenters[dc] = append(datacenters[dc], value) + } else { + (*dh)[name] = map[Datacenter][]string{ + dc: []string{value}, + } + } +} + +func UnmarshalBytes[T any](b []byte) (t T, err error) { + decoder := codec.NewDecoderBytes(b, &msgpackHandle) + err = decoder.Decode(&t) + return +} + +func MarshalBytes[T any](t T) (b []byte, err error) { + encoder := codec.NewEncoderBytes(&b, &msgpackHandle) + err = encoder.Encode(t) + return +} diff --git a/pkg/hashy/request.go b/pkg/hashy/request.go new file mode 100644 index 0000000..bfd8edc --- /dev/null +++ b/pkg/hashy/request.go @@ -0,0 +1,28 @@ +package hashy + +import ( + "context" + "net" +) + +type Request struct { + RemoteAddr net.Addr + DeviceNames DeviceNames + + ctx context.Context +} + +func (r *Request) Context() context.Context { + if r.ctx == nil { + return context.Background() + } + + return r.ctx +} + +func NewRequest(names DeviceNames) *Request { + return &Request{ + DeviceNames: names, + ctx: context.Background(), + } +} diff --git a/pkg/hashy/responseWriter.go b/pkg/hashy/responseWriter.go new file mode 100644 index 0000000..1db4211 --- /dev/null +++ b/pkg/hashy/responseWriter.go @@ -0,0 +1,52 @@ +package hashy + +import ( + "net" + "sync" +) + +type Flusher interface { + Flush() error +} + +// Writer defines the behavior of a packet-oriented writer. +type Writer interface { + // WriteTo has the same contract as net.PacketConn.WriteTo. + WriteTo([]byte, net.Addr) (int, error) +} + +type syncWriter struct { + lock sync.Mutex + w Writer +} + +func (sw *syncWriter) WriteTo(b []byte, a net.Addr) (int, error) { + defer sw.lock.Unlock() + sw.lock.Lock() + return sw.w.WriteTo(b, a) +} + +type ResponseWriter interface { + Flusher + Add(DeviceName, Datacenter, string) +} + +type responseWriter struct { + remoteAddr net.Addr + writer Writer + hashes DeviceHashes +} + +func (rw *responseWriter) Add(name DeviceName, dc Datacenter, value string) { + rw.hashes.Add(name, dc, value) +} + +func (rw *responseWriter) Flush() (err error) { + var message []byte + message, err = MarshalBytes(rw.hashes) + if err == nil { + _, err = rw.writer.WriteTo(message, rw.remoteAddr) + } + + return +} diff --git a/pkg/hashy/server.go b/pkg/hashy/server.go new file mode 100644 index 0000000..31618f4 --- /dev/null +++ b/pkg/hashy/server.go @@ -0,0 +1,182 @@ +package hashy + +import ( + "context" + "errors" + "net" + "sync/atomic" +) + +const ( + DefaultServerAddr = ":8080" + DefaultServerNetwork = "udp" + DefaultServerMaxConcurrentRequests = 10 + DefaultServerMaxPacketSize = 1500 +) + +const ( + serverStateNotStarted uint64 = iota + serverStateRunning + serverStateClosed +) + +var ( + ErrServerNotStarted = errors.New("hashy: Server not started") + ErrServerRunning = errors.New("hashy: Server already running") + ErrServerClosed = errors.New("hashy: Server closed") +) + +type Server struct { + Addr string + Network string + Handler Handler + MaxPacketSize int + MaxConcurrentRequests int + + state atomic.Uint64 + ctx context.Context + cancel context.CancelFunc + conn net.PacketConn + writer Writer +} + +func NewServer(cfg Config) (*Server, error) { + return &Server{ + Addr: cfg.Address, + Network: cfg.Network, + MaxPacketSize: cfg.MaxPacketSize, + MaxConcurrentRequests: cfg.MaxConcurrentRequests, + }, nil +} + +func (s *Server) listen() error { + var ( + address = s.Addr + network = s.Network + ) + + if len(address) == 0 { + address = DefaultServerAddr + } + + if len(network) == 0 { + network = DefaultServerNetwork + } + + conn, err := net.ListenPacket(network, address) + if err != nil { + return err + } + + s.conn = conn + s.writer = &syncWriter{ + w: s.conn, + } + + return nil +} + +func (s *Server) newSemaphore() chan struct{} { + depth := s.MaxConcurrentRequests + if depth < 1 { + depth = DefaultServerMaxConcurrentRequests + } + + return make(chan struct{}, depth) +} + +func (s *Server) newPacketBuffer() []byte { + size := s.MaxPacketSize + if size < 1 { + size = DefaultServerMaxPacketSize + } + + return make([]byte, size) +} + +func (s *Server) serve() error { + var ( + semaphore = s.newSemaphore() + packet = s.newPacketBuffer() + ) + + for { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + + case semaphore <- struct{}{}: + // continue + } + + n, remoteAddr, err := s.conn.ReadFrom(packet) + if err != nil { + return err + } + + deviceNames, err := UnmarshalBytes[DeviceNames](packet[0:n]) + if err != nil { + return err + } + + request := NewRequest(deviceNames) + request.RemoteAddr = remoteAddr + request.ctx = s.ctx + rw := &responseWriter{ + remoteAddr: remoteAddr, + writer: s.writer, + } + + go s.handle(semaphore, rw, request) + } + + return ErrServerClosed +} + +func (s *Server) handle(semaphore <-chan struct{}, rw ResponseWriter, request *Request) { + defer func() { + rw.Flush() + <-semaphore + }() + + s.Handler.ServeHash(rw, request) +} + +func (s *Server) ListenAndServe() error { + if !s.state.CompareAndSwap(serverStateNotStarted, serverStateRunning) { + switch s.state.Load() { + case serverStateRunning: + return ErrServerRunning + + default: + return ErrServerClosed + } + } + + err := s.listen() + if err != nil { + return err + } + + s.ctx, s.cancel = context.WithCancel(context.Background()) + return s.serve() +} + +func (s *Server) Close() error { + return s.Shutdown(context.Background()) +} + +func (s *Server) Shutdown(ctx context.Context) error { + if !s.state.CompareAndSwap(serverStateRunning, serverStateClosed) { + switch s.state.Load() { + case serverStateNotStarted: + return ErrServerNotStarted + + default: + return ErrServerClosed + } + } + + s.cancel() + return s.conn.Close() +}