Skip to content

Commit 2a12280

Browse files
author
gas
committed
check implementation of the HLC
1 parent deb7bba commit 2a12280

File tree

5 files changed

+81
-55
lines changed

5 files changed

+81
-55
lines changed

README.md

+6
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,9 @@ To test the clocks implementations run `make test`.
1111

1212
### API
1313
*TODO*
14+
15+
### References
16+
- [Lamport clock](https://lamport.azurewebsites.net/pubs/time-clocks.pdf)
17+
- [Vector clock](https://fileadmin.cs.lth.se/cs/Personal/Amr_Ergawy/dist-algos-papers/4.pdf)
18+
- [Hybrid logical clock](https://cse.buffalo.edu/tech-reports/2014-04.pdf)
19+
- [Martin Fowler blog](https://martinfowler.com/articles/patterns-of-distributed-systems/hybrid-clock.html)

hlc/hlc.go

+28-17
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,27 @@ import (
55
"time"
66
)
77

8+
func getWallTimeMs() int64 {
9+
return time.Now().UnixNano() / 1e6
10+
}
11+
812
type Hybrid struct {
9-
mu sync.Mutex
10-
Time int64 // Usually time in ms
11-
Ticks int64
13+
mu sync.RWMutex
14+
WallClockTimeMs int64
15+
Ticks int64
1216
}
1317

1418
func New(systemTime, ticks int64) *Hybrid {
1519
return &Hybrid{
16-
Time: systemTime,
17-
Ticks: ticks,
20+
WallClockTimeMs: systemTime,
21+
Ticks: ticks,
1822
}
1923
}
2024

2125
func NewNow(ticks int64) *Hybrid {
2226
return &Hybrid{
23-
Time: time.Now().UnixNano() / 1000,
24-
Ticks: ticks,
27+
WallClockTimeMs: getWallTimeMs(),
28+
Ticks: ticks,
2529
}
2630
}
2731

@@ -38,25 +42,26 @@ func (h *Hybrid) AddTicks(ticks int64) {
3842
func (h *Hybrid) Now() Hybrid {
3943
h.mu.Lock()
4044
defer h.mu.Unlock()
41-
currentTime := time.Now().UnixNano() / 1000
42-
if h.Time >= currentTime {
45+
currentTime := getWallTimeMs()
46+
// NOTE: check in case clock goes backwards fue to synchronization
47+
if h.WallClockTimeMs >= currentTime {
4348
h.addTicks(1)
4449
} else {
45-
h.Time = currentTime
50+
h.WallClockTimeMs = currentTime
4651
h.Ticks = 0
4752
}
4853
return Hybrid{
49-
Time: h.Time,
50-
Ticks: h.Ticks,
54+
WallClockTimeMs: h.WallClockTimeMs,
55+
Ticks: h.Ticks,
5156
}
5257
}
5358

5459
func Compare(a, b *Hybrid) int {
55-
if (a.Time == b.Time) && (a.Ticks == b.Ticks) {
60+
if (a.WallClockTimeMs == b.WallClockTimeMs) && (a.Ticks == b.Ticks) {
5661
return 0
5762
}
58-
if (a.Time == b.Time && a.Ticks > b.Ticks) ||
59-
(a.Time > b.Time) {
63+
if (a.WallClockTimeMs == b.WallClockTimeMs && a.Ticks > b.Ticks) ||
64+
(a.WallClockTimeMs > b.WallClockTimeMs) {
6065
return 1
6166
}
6267
return -1
@@ -65,7 +70,7 @@ func Compare(a, b *Hybrid) int {
6570
func max(times ...*Hybrid) *Hybrid {
6671
maxTime := times[0]
6772
for _, time := range times {
68-
cmp := Compare(maxTime, time)
73+
cmp := Compare(time, maxTime)
6974
if cmp == 1 {
7075
maxTime = time
7176
}
@@ -79,6 +84,12 @@ func (h *Hybrid) Tick(requestTime *Hybrid) {
7984
hybridNow := NewNow(-1)
8085
latestTime := max(hybridNow, requestTime, h)
8186
latestTime.addTicks(1)
82-
h.Time = latestTime.Time
87+
h.WallClockTimeMs = latestTime.WallClockTimeMs
8388
h.Ticks = latestTime.Ticks
8489
}
90+
91+
func (h *Hybrid) GetCompactTimestampMs() int64 {
92+
h.mu.RLock()
93+
defer h.mu.RUnlock()
94+
return (h.WallClockTimeMs >> 16 << 16) | (h.Ticks << 48 >> 48)
95+
}

hlc/hlc_test.go

+36-35
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package hlc
22

33
import (
4-
"time"
5-
// "sync"
4+
"sync"
65
"testing"
6+
"time"
77
)
88

99
func TestCompare(t *testing.T) {
@@ -26,37 +26,38 @@ func TestCompare(t *testing.T) {
2626
}
2727
}
2828

29-
// func TestConcurrentAdd(t *testing.T) {
30-
// lt := New("a", 0)
31-
// wg := sync.WaitGroup{}
32-
// wg.Add(10)
33-
// for i := 0; i < 10; i++ {
34-
// go func() {
35-
// defer wg.Done()
36-
// lt.AddTicks(1)
37-
// }()
38-
// }
39-
// wg.Wait()
40-
// now := lt.Now()
41-
// if now.Ticks < 10 {
42-
// t.Fatal("There must be exactly 10 ticks")
43-
// }
44-
// }
29+
func TestConcurrentAdd(t *testing.T) {
30+
currentWallTime := getWallTimeMs()
31+
c1 := New(currentWallTime, 0)
32+
c2 := New(currentWallTime, 1)
33+
wg := sync.WaitGroup{}
34+
wg.Add(10)
35+
for i := 0; i < 10; i++ {
36+
go func() {
37+
defer wg.Done()
38+
c1.AddTicks(1)
39+
}()
40+
}
41+
wg.Wait()
42+
cmp := Compare(c1, c2)
43+
if cmp < 1 {
44+
t.Fatal("Timestamp with more ticks must be greater")
45+
}
46+
}
4547

46-
// func TestConcurrentTick(t *testing.T) {
47-
// lt := New("a", 0)
48-
// req := New("b", 0)
49-
// wg := sync.WaitGroup{}
50-
// wg.Add(10)
51-
// for i := 0; i < 10; i++ {
52-
// go func() {
53-
// defer wg.Done()
54-
// lt.Tick(req)
55-
// }()
56-
// }
57-
// wg.Wait()
58-
// now := lt.Now()
59-
// if now.Ticks < 10 {
60-
// t.Fatal("There must be exactly 10 ticks")
61-
// }
62-
// }
48+
func TestConcurrentTick(t *testing.T) {
49+
c := NewNow(0)
50+
req := New(getWallTimeMs()+1000, 10)
51+
wg := sync.WaitGroup{}
52+
wg.Add(10)
53+
for i := 0; i < 10; i++ {
54+
go func() {
55+
defer wg.Done()
56+
c.Tick(req)
57+
}()
58+
}
59+
wg.Wait()
60+
if c.Ticks < 20 {
61+
t.Fatal("There must be exactly 20 ticks")
62+
}
63+
}

vector/vector.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,18 @@ func (v *Vector) Tick(requestTime *Vector) error {
6262
}
6363

6464
func Compare(a, b *Vector) int {
65-
equal, less := 0, 0
65+
equal, less, greater := 0, 0, 0
6666
for i := range a.Ticks {
6767
if a.Ticks[i] == b.Ticks[i] {
6868
equal++
6969
} else if a.Ticks[i] < b.Ticks[i] {
7070
less++
71+
} else {
72+
greater++
7173
}
7274
}
73-
if len(a.Ticks) == equal {
74-
return 0
75+
if len(a.Ticks) == equal || (greater > 0 && less > 0 && greater == less) {
76+
return 0 // == concurrent
7577
}
7678
if len(a.Ticks) == (equal + less) {
7779
return -1

vector/vector_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ func TestCompare(t *testing.T) {
1717
if cmp < 1 {
1818
t.Fatal("Original timestamp now must be higher than it's copy")
1919
}
20+
_, c1 := New(1, 3)
21+
c1.AddTicks(1)
22+
cmp = Compare(c, c1)
23+
if cmp != 0 {
24+
t.Fatalf("Timestamps must be concurrent: %v != %v", c, c1)
25+
}
2026
}
2127

2228
func TestConcurrentAdd(t *testing.T) {

0 commit comments

Comments
 (0)