diff --git a/client.go b/client.go index 5abc186..7eb229f 100644 --- a/client.go +++ b/client.go @@ -40,11 +40,12 @@ type client struct { retryInterval time.Duration retrySize int httpClient *http.Client - msgs chan Event - events []Event + msgs chan *Event + events []*Event retries chan *Payload - quit chan struct{} - shutdown chan struct{} + quitCh chan struct{} + shutdownCh chan struct{} + flushCh chan struct{} mtx sync.Mutex } @@ -60,15 +61,16 @@ func New(key string, opts ...Option) Client { maxRetry: 3, retryInterval: time.Second * 1, retrySize: 1000, - quit: make(chan struct{}), - shutdown: make(chan struct{}), + quitCh: make(chan struct{}, 1), + shutdownCh: make(chan struct{}, 1), + flushCh: make(chan struct{}, 1), } c.httpClient = &http.Client{ Timeout: c.timeout, } - c.msgs = make(chan Event, c.bufferSize) - c.events = make([]Event, 0, c.bufferSize) + c.msgs = make(chan *Event, c.bufferSize) + c.events = []*Event{} /*make(, 0, c.bufferSize)*/ c.retries = make(chan *Payload, c.retrySize) for _, opt := range opts { @@ -80,14 +82,27 @@ func New(key string, opts ...Option) Client { return c } +func (c *client) addEvent(event *Event) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.events = append(c.events, event) + + if len(c.events) == c.bufferSize { + c.flushCh <- struct{}{} + } +} + func (c *client) loop() { - defer close(c.shutdown) + defer close(c.shutdownCh) tick := time.NewTicker(c.interval) defer tick.Stop() for { select { + case <-c.flushCh: + c.flush() case payload := <-c.retries: if err := c.sendBatch(payload); err != nil { if payload.Attempts > c.maxRetry { @@ -99,16 +114,12 @@ func (c *client) loop() { c.retries <- payload } case event := <-c.msgs: - c.events = append(c.events, event) - - if len(c.events) == c.bufferSize { - c.flush() - } + c.addEvent(event) case <-tick.C: c.flush() - case <-c.quit: + case <-c.quitCh: log.Debug().Msg("exit requested - draining messages") // Drain the msg channel, we have to close it first so no more @@ -116,11 +127,7 @@ func (c *client) loop() { close(c.msgs) for event := range c.msgs { - c.events = append(c.events, event) - - if len(c.events) == cap(c.events) { - c.flush() - } + c.addEvent(event) } c.flush() @@ -142,16 +149,16 @@ func (c *client) loop() { func (c *client) Close() (err error) { defer func() { - // Always recover, a panic could be raised if `c`.quit was closed which + // Always recover, a panic could be raised if `c`.quitCh was closed which // means the method was called more than once. if recover() != nil { err = ErrClosed } }() - close(c.quit) + close(c.quitCh) - <-c.shutdown + <-c.shutdownCh return } @@ -208,12 +215,12 @@ func (c *client) sendBatch(payload *Payload) error { return nil } -func (c *client) flush() error { +func (c *client) getBatchEvents() []*Event { c.mtx.Lock() defer c.mtx.Unlock() if len(c.events) == 0 { - return nil + return []*Event{} } end := c.batchSize @@ -221,10 +228,20 @@ func (c *client) flush() error { end = length } - var events []Event + var events []*Event events, c.events = c.events[0:end], c.events[end:] + return events +} + +func (c *client) flush() error { + events := c.getBatchEvents() + + if len(events) == 0 { + return nil + } + reqPayload := &RequestPayload{ APIKey: c.key, Events: events, @@ -262,13 +279,11 @@ func (c *client) Enqueue(event *Event) (err error) { } }() - c.msgs <- *event - - if len(c.msgs) == cap(c.msgs) { - go func() { - err = c.flush() - }() + if len(c.msgs) == (cap(c.msgs) - 1) { + c.flushCh <- struct{}{} } + c.msgs <- event + return } diff --git a/client_test.go b/client_test.go index 331a052..4e27d0c 100644 --- a/client_test.go +++ b/client_test.go @@ -44,8 +44,8 @@ func TestClient(t *testing.T) { c := New( "foo", WithURL(ts.URL), - WithTimeout(time.Second*2), - WithInterval(time.Second*5), + WithTimeout(time.Second*1), + WithInterval(time.Millisecond*100), WithBatchSize(2), WithBufferSize(2), WithMaxRetry(2), @@ -107,8 +107,8 @@ func TestClientWithRetry(t *testing.T) { c := New( "foo", WithURL(ts.URL), - WithTimeout(time.Second*2), - WithInterval(time.Second*5), + WithTimeout(time.Second*1), + WithInterval(time.Millisecond*100), WithBatchSize(2), WithBufferSize(2), WithMaxRetry(2), @@ -151,8 +151,8 @@ func TestClientDroppedMessage(t *testing.T) { c := New( "foo", WithURL(ts.URL), - WithTimeout(time.Second*2), - WithInterval(time.Second*5), + WithTimeout(time.Second*1), + WithInterval(time.Millisecond*100), WithBatchSize(2), WithBufferSize(2), WithMaxRetry(2), @@ -208,7 +208,7 @@ func TestClientWithMultipleEvents(t *testing.T) { case 0: assert.Equal(t, 2, len(msg.Events)) case 1: - assert.Equal(t, 1, len(msg.Events)) + assert.Equal(t, 2, len(msg.Events)) } })) defer ts.Close() @@ -216,12 +216,12 @@ func TestClientWithMultipleEvents(t *testing.T) { c := New( "foo", WithURL(ts.URL), - WithTimeout(time.Second*2), - WithInterval(time.Second*5), + WithTimeout(time.Second*1), + WithInterval(time.Millisecond*500), WithBatchSize(2), - WithBufferSize(2), + WithBufferSize(3), WithMaxRetry(2), - WithRetryInterval(time.Second*5), + WithRetryInterval(time.Millisecond*100), ) defer c.Close() @@ -267,6 +267,20 @@ func TestClientWithMultipleEvents(t *testing.T) { }) assert.NoError(t, err) + err = c.Enqueue(&Event{ + UserID: "f892be22-8f8e-445d-83b0-af199b9a5c72", + DeviceID: "0a16e988-8f70-4877-bdc6-08997832cfff", + Timestamp: 1643367217, + EventType: "user.created", + Platform: "ios", + OSName: "iOS", + OSVersion: "15.2.1", + DeviceModel: "iPhone13,3", + Language: "fr-FR", + InsertID: "a5461410-6b12-4a7a-905d-166cc00af4b2", + }) + assert.NoError(t, err) + wg.Wait() } @@ -298,12 +312,12 @@ func TestClientClose(t *testing.T) { c := New( "foo", WithURL(ts.URL), - WithTimeout(time.Second*2), - WithInterval(time.Second*10), + WithTimeout(time.Second*1), + WithInterval(time.Second*2), WithBatchSize(2), WithBufferSize(2), WithMaxRetry(2), - WithRetryInterval(time.Second*5), + WithRetryInterval(time.Second*2), ) err := c.Enqueue(&Event{ @@ -324,3 +338,81 @@ func TestClientClose(t *testing.T) { wg.Wait() } + +func TestClientGetBatchEvents(t *testing.T) { + + c := &client{ + timeout: time.Second * 1, + interval: time.Second * 10, + batchSize: 2, + bufferSize: 4, + maxRetry: 3, + retryInterval: time.Second * 1, + retrySize: 1000, + } + + c.events = []*Event{ + { + UserID: "f892be22-8f8e-445d-83b0-af199b9a5c71", + }, + { + UserID: "f892be22-8f8e-445d-83b0-af199b9a5c72", + }, + { + UserID: "f892be22-8f8e-445d-83b0-af199b9a5c73", + }, + } + + events := c.getBatchEvents() + + assert.Equal(t, 2, len(events)) + + events = c.getBatchEvents() + + assert.Equal(t, 1, len(events)) +} + +/* +func TestClientRace(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + })) + defer ts.Close() + + c := New( + "foo", + WithURL(ts.URL), + WithTimeout(time.Second*2), + WithInterval(time.Second*10), + WithRetryInterval(time.Second*5), + ).(*client) + + for i := 0; i < 2000; i++ { + now := time.Now().Unix() + id := uuid.New().String() + + err := c.Enqueue(&Event{ + UserID: "f892be22-8f8e-445d-83b0-af199b9a5c72", + DeviceID: "0a16e988-8f70-4877-bdc6-08997832cfff", + Timestamp: now, + EventType: "user.created", + Platform: "ios", + OSName: "iOS", + OSVersion: "15.2.1", + DeviceModel: "iPhone13,3", + Language: "fr-FR", + InsertID: id, + }) + assert.NoError(t, err) + } + + assert.NoError(t, c.Close()) + +wait: + for { + if len(c.msgs) == 0 && len(c.events) == 0 && len(c.retries) == 0 { + break wait + } + } +} +*/ diff --git a/go.mod b/go.mod index d30c8ea..e47a068 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 71b76a0..da78856 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= diff --git a/payload.go b/payload.go index b1de0b2..5265748 100644 --- a/payload.go +++ b/payload.go @@ -8,7 +8,7 @@ import "fmt" type RequestPayload struct { APIKey string `json:"api_key"` - Events []Event `json:"events"` + Events []*Event `json:"events"` Options *PayloadOptions `json:"options,omitempty"` }