Skip to content

Commit 9033026

Browse files
committed
improve session manager
1 parent cf3875c commit 9033026

File tree

1 file changed

+19
-12
lines changed

1 file changed

+19
-12
lines changed

pkg/services/sessionstorage/service.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"container/heap"
55
"slices"
66
"sync"
7+
"sync/atomic"
78
"time"
89

910
"github.com/negrel/assert"
@@ -13,7 +14,7 @@ import (
1314
"github.com/rs/zerolog"
1415
)
1516

16-
// Service define an in memory session storage.
17+
// Service define an in-memory session storage.
1718
type Service interface {
1819
// InsertSession stores given session in memory. If number of visitor session
1920
// exceed configured max session per visitor, this function returns false and
@@ -26,7 +27,7 @@ type Service interface {
2627
// IdentifySession updates stored session visitor id. Updated session and
2728
// boolean found flag are returned.
2829
IdentifySession(deviceId uint64, pageUri uri.Uri, visitorId string) (event.Session, bool)
29-
// WaitForSession retrieves stored session and returns it. If session is not
30+
// WaitSession retrieves stored session and returns it. If session is not
3031
// found, it waits until it is created or timeout.
3132
// Returned boolean flag is false if wait timed out and returned an empty
3233
// session.
@@ -46,12 +47,12 @@ func (e *sessionEntry) hasWaiter() bool {
4647
return e.wait != nil
4748
}
4849

49-
func (e *sessionEntry) isExpired() bool {
50-
return uint32(time.Now().Unix()) >= e.expiry
50+
func (e *sessionEntry) isExpired(now time.Time) bool {
51+
return uint32(now.Unix()) >= e.expiry
5152
}
5253

53-
func (e *sessionEntry) isValid() bool {
54-
return !e.hasWaiter() && !e.isExpired()
54+
func (e *sessionEntry) isValid(now time.Time) bool {
55+
return !e.hasWaiter() && !e.isExpired(now)
5556
}
5657

5758
// deviceData holds sessions entries and gc metadata associated to a single
@@ -74,6 +75,9 @@ type service struct {
7475

7576
// GC priority queue.
7677
gcQueue gcQueue
78+
79+
// Internal clock.
80+
now atomic.Pointer[time.Time]
7781
}
7882

7983
// ProvideService is a wire provider for in memory session storage.
@@ -100,6 +104,7 @@ func ProvideService(
100104
}
101105
heap.Init(&service.gcQueue)
102106

107+
go service.clock()
103108
go service.gcLoop()
104109

105110
logger.Info().Msg("in memory session storage configured")
@@ -145,7 +150,7 @@ func (s *service) getSession(deviceId uint64, latestPath string) (*sessionEntry,
145150
func (s *service) getValidSessionEntry(deviceId uint64, latestPath string) *sessionEntry {
146151
assert.Locked(&s.mu)
147152
session, device, i := s.getSession(deviceId, latestPath)
148-
if session == nil || !session.isValid() {
153+
if session == nil || !session.isValid(*s.now.Load()) {
149154
return nil
150155
}
151156

@@ -314,7 +319,7 @@ func (s *service) WaitSession(deviceId uint64, pageUri uri.Uri, timeout time.Dur
314319
currentSession, deviceData, sessionIndex := s.getSession(deviceId, pageUri.Path())
315320

316321
// Valid session.
317-
if currentSession != nil && currentSession.isValid() {
322+
if currentSession != nil && currentSession.isValid(*s.now.Load()) {
318323
s.mu.Unlock()
319324
return currentSession.Session, true
320325
} else if timeout == time.Duration(0) { // Entry not found and timeout is 0s.
@@ -377,18 +382,21 @@ func (s *service) WaitSession(deviceId uint64, pageUri uri.Uri, timeout time.Dur
377382

378383
// session garbage collector loop.
379384
func (s *service) gcLoop() {
385+
tick := time.NewTicker(s.cfg.gcInterval)
386+
380387
for {
388+
now := <-tick.C
389+
390+
s.now.Store(&now)
381391
s.metrics.gcCycle.Inc()
382392

383393
// Wait until there is job in gcQueue.
384394
s.mu.Lock()
385395
if len(s.gcQueue) == 0 {
386396
s.mu.Unlock()
387-
time.Sleep(s.cfg.gcInterval)
388397
continue
389398
}
390399

391-
now := time.Now()
392400
nowTs := uint32(now.Unix())
393401

394402
// Peek job.
@@ -397,7 +405,6 @@ func (s *service) gcLoop() {
397405
// Job hasn't expired yet.
398406
if job.pExpiry > nowTs {
399407
s.mu.Unlock()
400-
time.Sleep(s.cfg.gcInterval)
401408
continue
402409
}
403410

@@ -443,5 +450,5 @@ func (s *service) gcLoop() {
443450
}
444451

445452
func (s *service) newExpiry() uint32 {
446-
return uint32(time.Now().Add(s.cfg.sessionInactiveTtl).Unix())
453+
return uint32(s.now.Load().Add(s.cfg.sessionInactiveTtl).Unix())
447454
}

0 commit comments

Comments
 (0)