Skip to content

Commit 5b3b241

Browse files
committed
feat: fix review comments
Signed-off-by: jyjiangkai <[email protected]>
1 parent 88c1479 commit 5b3b241

File tree

7 files changed

+61
-34
lines changed

7 files changed

+61
-34
lines changed

client/pkg/eventbus/eventbus.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -182,21 +182,21 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti
182182
}
183183

184184
if op.Policy.AccessMode() == api.ReadOnly {
185+
b.readableMu.RLock()
186+
defer b.readableMu.RUnlock()
185187
if len(b.readableLogs) == 0 {
186188
b.refreshReadableLogs(ctx)
187189
}
188-
b.readableMu.RLock()
189-
defer b.readableMu.RUnlock()
190190
if l, ok := b.readableLogs[logID]; ok {
191191
return l, nil
192192
}
193193
return nil, errors.ErrResourceNotFound.WithMessage("eventlog not found")
194194
} else if op.Policy.AccessMode() == api.ReadWrite {
195+
b.writableMu.RLock()
196+
defer b.writableMu.RUnlock()
195197
if len(b.writableLogs) == 0 {
196198
b.refreshWritableLogs(ctx)
197199
}
198-
b.writableMu.RLock()
199-
defer b.writableMu.RUnlock()
200200
if l, ok := b.writableLogs[logID]; ok {
201201
return l, nil
202202
}
@@ -217,23 +217,23 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev
217217
}
218218

219219
if op.Policy.AccessMode() == api.ReadOnly {
220+
b.readableMu.RLock()
221+
defer b.readableMu.RUnlock()
220222
if len(b.readableLogs) == 0 {
221223
b.refreshReadableLogs(ctx)
222224
}
223225
eventlogs := make([]api.Eventlog, 0)
224-
b.readableMu.RLock()
225-
defer b.readableMu.RUnlock()
226226
for _, el := range b.readableLogs {
227227
eventlogs = append(eventlogs, el)
228228
}
229229
return eventlogs, nil
230230
} else if op.Policy.AccessMode() == api.ReadWrite {
231+
b.writableMu.RLock()
232+
defer b.writableMu.RUnlock()
231233
if len(b.writableLogs) == 0 {
232234
b.refreshWritableLogs(ctx)
233235
}
234236
eventlogs := make([]api.Eventlog, 0)
235-
b.writableMu.RLock()
236-
defer b.writableMu.RUnlock()
237237
for _, el := range b.writableLogs {
238238
eventlogs = append(eventlogs, el)
239239
}

client/pkg/eventlog/eventlog_impl.go

+30-13
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"encoding/base64"
2121
"encoding/binary"
2222
"io"
23-
"sort"
2423
"sync"
2524
"time"
2625

@@ -142,9 +141,12 @@ func (l *eventlog) Close(ctx context.Context) {
142141
}
143142

144143
func (l *eventlog) Writer() LogWriter {
144+
l.writerMu.RLock()
145145
if l.logWriter != nil {
146+
l.writerMu.RUnlock()
146147
return l.logWriter
147148
}
149+
l.writerMu.RUnlock()
148150
l.writerMu.Lock()
149151
defer l.writerMu.Unlock()
150152
if l.logWriter != nil {
@@ -157,9 +159,12 @@ func (l *eventlog) Writer() LogWriter {
157159
}
158160

159161
func (l *eventlog) Reader(cfg ReaderConfig) LogReader {
162+
l.readerMu.RLock()
160163
if l.logReader != nil {
164+
l.readerMu.RUnlock()
161165
return l.logReader
162166
}
167+
l.readerMu.RUnlock()
163168
l.readerMu.Lock()
164169
defer l.readerMu.Unlock()
165170
if l.logWriter != nil {
@@ -214,15 +219,23 @@ func (l *eventlog) QueryOffsetByTime(ctx context.Context, timestamp int64) (int6
214219
return segs[0].startOffset, nil
215220
}
216221

222+
// LastEntryStime
223+
// time.UnixMilli
217224
tailSeg := fetchTailSegment(ctx, segs)
218-
if tailSeg.lastEventBornAt.Before(t) {
225+
if tailSeg.firstEventBornAt.Before(t) {
219226
// the target offset maybe in newer segment, refresh immediately
220227
l.refreshReadableSegments(ctx)
221228
segs = l.fetchReadableSegments(ctx)
222229
}
223230

224231
for idx := range segs {
225232
seg := segs[idx]
233+
// lastEventBornAt will be reported only when the segment is full,
234+
// if it is equal to 0, the current segment is the latest segment.
235+
if seg.lastEventBornAt == time.UnixMilli(0) {
236+
target = seg
237+
break
238+
}
226239
if !t.Before(seg.firstEventBornAt) && !t.After(seg.lastEventBornAt) {
227240
target = seg
228241
break
@@ -348,6 +361,10 @@ func (l *eventlog) selectReadableSegment(ctx context.Context, offset int64) (*se
348361
return nil, errors.ErrNotReadable
349362
}
350363
var target *segment
364+
target = fetchHeadSegment(ctx, segs)
365+
if offset < target.StartOffset() {
366+
return nil, errors.ErrOffsetUnderflow
367+
}
351368
target = fetchTailSegment(ctx, segs)
352369
if offset == target.EndOffset() {
353370
return nil, errors.ErrOffsetOnEnd
@@ -356,17 +373,17 @@ func (l *eventlog) selectReadableSegment(ctx context.Context, offset int64) (*se
356373
return nil, errors.ErrOffsetOverflow
357374
}
358375

359-
target = fetchHeadSegment(ctx, segs)
360-
if offset < target.StartOffset() {
361-
return nil, errors.ErrOffsetUnderflow
362-
}
363-
364-
segmentNum := len(l.readableSegments)
365-
n := sort.Search(segmentNum, func(i int) bool {
366-
return l.readableSegments[uint64(i)].EndOffset() > offset
367-
})
368-
if n < segmentNum {
369-
return l.readableSegments[uint64(n)], nil
376+
// look from back to front
377+
// TODO(jiangkai): implement a better search algorithm
378+
for {
379+
if target.StartOffset() <= offset && target.EndOffset() > offset {
380+
return target, nil
381+
} else {
382+
if _, ok := l.readableSegments[target.previousSegmentId]; !ok {
383+
break
384+
}
385+
target = l.readableSegments[target.previousSegmentId]
386+
}
370387
}
371388
return nil, errors.ErrNotReadable
372389
}

internal/store/block/raw.go

-11
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,6 @@ const (
6363
StateArchived = State("archived")
6464
)
6565

66-
func (s State) ToSegmentState() string {
67-
switch s {
68-
case StateArchiving:
69-
return "freezing"
70-
case StateArchived:
71-
return "frozen"
72-
default:
73-
return string(s)
74-
}
75-
}
76-
7766
type Statistics struct {
7867
ID vanus.ID
7968
Capacity uint64

internal/store/segment/replica.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (r *replica) Status() *metapb.SegmentHealthInfo {
9494
Capacity: int64(stat.Capacity),
9595
Size: int64(stat.EntrySize),
9696
EventNumber: int32(stat.EntryNum),
97-
State: stat.State.ToSegmentState(),
97+
State: toSegmentState(stat.State),
9898
Leader: cs.Leader.Uint64(),
9999
Term: cs.Term,
100100
FirstEventBornTime: stat.FirstEntryStime,

internal/store/segment/server.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ func (s *server) onBlockArchived(stat block.Statistics) {
724724
Capacity: int64(stat.Capacity),
725725
Size: int64(stat.EntrySize),
726726
EventNumber: int32(stat.EntryNum),
727-
State: stat.State.ToSegmentState(),
727+
State: toSegmentState(stat.State),
728728
FirstEventBornTime: stat.FirstEntryStime,
729729
}
730730
if stat.State == block.StateArchived {
@@ -837,3 +837,14 @@ func (s *server) checkState() error {
837837
}
838838
return nil
839839
}
840+
841+
func toSegmentState(state block.State) string {
842+
switch state {
843+
case block.StateArchiving:
844+
return "freezing"
845+
case block.StateArchived:
846+
return "frozen"
847+
default:
848+
return string(state)
849+
}
850+
}

internal/store/vsb/block_append_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ func TestVSBlock_Append(t *testing.T) {
116116
So(archived, ShouldBeFalse)
117117

118118
stat = b.status()
119+
So(stat.State, ShouldEqual, block.StateWorking)
119120
So(stat.EntryNum, ShouldEqual, 1)
120121
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0)
121122

@@ -130,6 +131,7 @@ func TestVSBlock_Append(t *testing.T) {
130131
So(full, ShouldBeTrue)
131132

132133
stat = b.status()
134+
So(stat.State, ShouldEqual, block.StateWorking)
133135
So(stat.EntryNum, ShouldEqual, 1)
134136
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0)
135137

@@ -138,6 +140,7 @@ func TestVSBlock_Append(t *testing.T) {
138140
So(archived, ShouldBeFalse)
139141

140142
stat = b.status()
143+
So(stat.State, ShouldEqual, block.StateWorking)
141144
So(stat.EntryNum, ShouldEqual, 2)
142145
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
143146

@@ -158,6 +161,7 @@ func TestVSBlock_Append(t *testing.T) {
158161
So(full, ShouldBeFalse)
159162

160163
stat := b.status()
164+
So(stat.State, ShouldEqual, block.StateWorking)
161165
So(stat.EntryNum, ShouldEqual, 0)
162166
So(stat.EntrySize, ShouldEqual, 0)
163167

@@ -169,6 +173,7 @@ func TestVSBlock_Append(t *testing.T) {
169173
So(full, ShouldBeTrue)
170174

171175
stat = b.status()
176+
So(stat.State, ShouldEqual, block.StateWorking)
172177
So(stat.EntryNum, ShouldEqual, 0)
173178
So(stat.EntrySize, ShouldEqual, 0)
174179

@@ -177,6 +182,7 @@ func TestVSBlock_Append(t *testing.T) {
177182
So(archived, ShouldBeFalse)
178183

179184
stat = b.status()
185+
So(stat.State, ShouldEqual, block.StateWorking)
180186
So(stat.EntryNum, ShouldEqual, 2)
181187
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
182188

@@ -197,6 +203,7 @@ func TestVSBlock_Append(t *testing.T) {
197203
So(full, ShouldBeTrue)
198204

199205
stat := b.status()
206+
So(stat.State, ShouldEqual, block.StateWorking)
200207
So(stat.EntryNum, ShouldEqual, 0)
201208
So(stat.EntrySize, ShouldEqual, 0)
202209

@@ -205,6 +212,7 @@ func TestVSBlock_Append(t *testing.T) {
205212
So(archived, ShouldBeFalse)
206213

207214
stat = b.status()
215+
So(stat.State, ShouldEqual, block.StateWorking)
208216
So(stat.EntryNum, ShouldEqual, 2)
209217
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
210218

@@ -288,6 +296,7 @@ func TestVSBlock_Append(t *testing.T) {
288296
So(archived, ShouldBeTrue)
289297

290298
stat = b.status()
299+
So(stat.State, ShouldEqual, block.StateWorking)
291300
So(stat.EntryNum, ShouldEqual, 2)
292301
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
293302

internal/store/vsb/block_open_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func TestVSBlock_Open(t *testing.T) {
140140
So(err, ShouldBeNil)
141141

142142
stat := b.status()
143+
So(stat.State, ShouldEqual, block.StateWorking)
143144
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
144145
So(stat.EntryNum, ShouldEqual, 2)
145146
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

0 commit comments

Comments
 (0)