Skip to content

Commit b9ce901

Browse files
committed
improve session manager
1 parent cf3875c commit b9ce901

File tree

1 file changed

+20
-12
lines changed

1 file changed

+20
-12
lines changed

pkg/services/sessionstorage/service.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"container/heap"
55
"slices"
66
"sync"
7+
"sync/atomic"
78
"time"
89

910
"github.com/negrel/assert"
@@ -13,7 +14,7 @@ import (
1314
"github.com/rs/zerolog"
1415
)
1516

16-
// Service define an in memory session storage.
17+
// Service define an in-memory session storage.
1718
type Service interface {
1819
// InsertSession stores given session in memory. If number of visitor session
1920
// exceed configured max session per visitor, this function returns false and
@@ -26,7 +27,7 @@ type Service interface {
2627
// IdentifySession updates stored session visitor id. Updated session and
2728
// boolean found flag are returned.
2829
IdentifySession(deviceId uint64, pageUri uri.Uri, visitorId string) (event.Session, bool)
29-
// WaitForSession retrieves stored session and returns it. If session is not
30+
// WaitSession retrieves stored session and returns it. If session is not
3031
// found, it waits until it is created or timeout.
3132
// Returned boolean flag is false if wait timed out and returned an empty
3233
// session.
@@ -46,12 +47,12 @@ func (e *sessionEntry) hasWaiter() bool {
4647
return e.wait != nil
4748
}
4849

49-
func (e *sessionEntry) isExpired() bool {
50-
return uint32(time.Now().Unix()) >= e.expiry
50+
func (e *sessionEntry) isExpired(now time.Time) bool {
51+
return uint32(now.Unix()) >= e.expiry
5152
}
5253

53-
func (e *sessionEntry) isValid() bool {
54-
return !e.hasWaiter() && !e.isExpired()
54+
func (e *sessionEntry) isValid(now time.Time) bool {
55+
return !e.hasWaiter() && !e.isExpired(now)
5556
}
5657

5758
// deviceData holds sessions entries and gc metadata associated to a single
@@ -74,6 +75,9 @@ type service struct {
7475

7576
// GC priority queue.
7677
gcQueue gcQueue
78+
79+
// Internal clock.
80+
now atomic.Pointer[time.Time]
7781
}
7882

7983
// ProvideService is a wire provider for in memory session storage.
@@ -90,13 +94,15 @@ func ProvideService(
9094
Dur("session_inactive_ttl", cfg.sessionInactiveTtl).
9195
Logger()
9296

97+
now := time.Now()
9398
service := &service{
9499
logger: logger,
95100
cfg: cfg,
96101
metrics: newMetrics(promRegistry),
97102
mu: sync.Mutex{},
98103
devices: make(map[uint64]*deviceData),
99104
gcQueue: gcQueue{},
105+
now: &now
100106
}
101107
heap.Init(&service.gcQueue)
102108

@@ -145,7 +151,7 @@ func (s *service) getSession(deviceId uint64, latestPath string) (*sessionEntry,
145151
func (s *service) getValidSessionEntry(deviceId uint64, latestPath string) *sessionEntry {
146152
assert.Locked(&s.mu)
147153
session, device, i := s.getSession(deviceId, latestPath)
148-
if session == nil || !session.isValid() {
154+
if session == nil || !session.isValid(*s.now.Load()) {
149155
return nil
150156
}
151157

@@ -314,7 +320,7 @@ func (s *service) WaitSession(deviceId uint64, pageUri uri.Uri, timeout time.Dur
314320
currentSession, deviceData, sessionIndex := s.getSession(deviceId, pageUri.Path())
315321

316322
// Valid session.
317-
if currentSession != nil && currentSession.isValid() {
323+
if currentSession != nil && currentSession.isValid(*s.now.Load()) {
318324
s.mu.Unlock()
319325
return currentSession.Session, true
320326
} else if timeout == time.Duration(0) { // Entry not found and timeout is 0s.
@@ -377,18 +383,21 @@ func (s *service) WaitSession(deviceId uint64, pageUri uri.Uri, timeout time.Dur
377383

378384
// session garbage collector loop.
379385
func (s *service) gcLoop() {
386+
tick := time.NewTicker(s.cfg.gcInterval)
387+
380388
for {
389+
now := <-tick.C
390+
391+
s.now.Store(&now)
381392
s.metrics.gcCycle.Inc()
382393

383394
// Wait until there is job in gcQueue.
384395
s.mu.Lock()
385396
if len(s.gcQueue) == 0 {
386397
s.mu.Unlock()
387-
time.Sleep(s.cfg.gcInterval)
388398
continue
389399
}
390400

391-
now := time.Now()
392401
nowTs := uint32(now.Unix())
393402

394403
// Peek job.
@@ -397,7 +406,6 @@ func (s *service) gcLoop() {
397406
// Job hasn't expired yet.
398407
if job.pExpiry > nowTs {
399408
s.mu.Unlock()
400-
time.Sleep(s.cfg.gcInterval)
401409
continue
402410
}
403411

@@ -443,5 +451,5 @@ func (s *service) gcLoop() {
443451
}
444452

445453
func (s *service) newExpiry() uint32 {
446-
return uint32(time.Now().Add(s.cfg.sessionInactiveTtl).Unix())
454+
return uint32(s.now.Load().Add(s.cfg.sessionInactiveTtl).Unix())
447455
}

0 commit comments

Comments
 (0)