Skip to content

Commit f90ea26

Browse files
authored
fix: trigger offset bug (#583)
Signed-off-by: xdlbdy <[email protected]>
1 parent a5859a8 commit f90ea26

File tree

1 file changed

+7
-10
lines changed

1 file changed

+7
-10
lines changed

internal/trigger/offset/offset.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,13 @@ func (offset *SubscriptionOffset) GetCommit() info.ListOffsetInfo {
9999
}
100100

101101
type offsetTracker struct {
102-
maxOffset uint64
103-
initOffset uint64
104-
list *skiplist.SkipList
102+
maxOffset int64
103+
list *skiplist.SkipList
105104
}
106105

107106
func initOffset(initOffset uint64) *offsetTracker {
108107
return &offsetTracker{
109-
initOffset: initOffset,
110-
maxOffset: initOffset,
108+
maxOffset: int64(initOffset) - 1,
111109
list: skiplist.New(skiplist.GreaterThanFunc(func(lhs, rhs interface{}) int {
112110
v1, _ := lhs.(uint64)
113111
v2, _ := rhs.(uint64)
@@ -123,7 +121,9 @@ func initOffset(initOffset uint64) *offsetTracker {
123121

124122
func (o *offsetTracker) putOffset(offset uint64) {
125123
o.list.Set(offset, offset)
126-
o.maxOffset, _ = o.list.Back().Key().(uint64)
124+
if int64(offset) > o.maxOffset {
125+
o.maxOffset = int64(offset)
126+
}
127127
}
128128

129129
func (o *offsetTracker) commitOffset(offset uint64) {
@@ -132,10 +132,7 @@ func (o *offsetTracker) commitOffset(offset uint64) {
132132

133133
func (o *offsetTracker) offsetToCommit() uint64 {
134134
if o.list.Len() == 0 {
135-
if o.maxOffset == o.initOffset {
136-
return o.initOffset
137-
}
138-
return o.maxOffset + 1
135+
return uint64(o.maxOffset + 1)
139136
}
140137
return o.list.Front().Key().(uint64)
141138
}

0 commit comments

Comments
 (0)