Skip to content

Commit 074e7f9

Browse files
committed
fix review
1 parent 9c505cc commit 074e7f9

File tree

12 files changed

+244
-263
lines changed

12 files changed

+244
-263
lines changed

Diff for: internal/controller/eventbus/controller.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,16 @@ func (ctrl *controller) processHeartbeat(ctx context.Context, req *ctrlpb.Segmen
412412
Capacity: info.Capacity,
413413
EventLogID: block.EventlogID,
414414
Size: info.Size,
415-
State: eventlog.SegmentState(info.State),
416415
Number: info.EventNumber,
417416
FirstEventBornTime: time.UnixMilli(info.FirstEventBornTime),
418417
LastEventBornTime: time.UnixMilli(info.LastEventBornTime),
419418
}
419+
// block state transfer to segment state
420+
if info.State == "archiving" {
421+
seg.State = eventlog.StateFreezing
422+
} else if info.State == "archived" {
423+
seg.State = eventlog.StateFrozen
424+
}
420425
logArr = append(logArr, seg)
421426
segments[block.EventlogID.Key()] = logArr
422427
}

Diff for: internal/controller/eventbus/eventlog/segment.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ import (
2727
type SegmentState string
2828

2929
const (
30-
StateCreated = SegmentState("created")
31-
StateWorking = SegmentState("working")
32-
StateFrozen = SegmentState("frozen")
33-
StatePreFrozen = SegmentState("prefrozen")
34-
StateArchived = SegmentState("archived")
35-
StateExpired = SegmentState("expired")
30+
StateCreated = SegmentState("created")
31+
StateWorking = SegmentState("working")
32+
StateFrozen = SegmentState("frozen")
33+
StateFreezing = SegmentState("freezing")
34+
StateArchived = SegmentState("archived")
35+
StateExpired = SegmentState("expired")
3636
)
3737

3838
type Segment struct {
@@ -109,7 +109,7 @@ func (seg *Segment) isNeedUpdate(newSeg Segment) bool {
109109
}
110110

111111
func (seg *Segment) isPreFull() bool {
112-
return seg.State == StatePreFrozen
112+
return seg.State == StateFreezing
113113
}
114114

115115
func (seg *Segment) isFull() bool {

Diff for: internal/store/block/raft/appender.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ type Appender interface {
7676

7777
Stop(ctx context.Context)
7878
Bootstrap(ctx context.Context, blocks []Peer) error
79-
Frozen(ctx context.Context)
79+
Archive(ctx context.Context)
8080
Delete(ctx context.Context)
8181
Status() ClusterStatus
8282
}
@@ -187,7 +187,12 @@ func (a *appender) Bootstrap(ctx context.Context, blocks []Peer) error {
187187
return a.node.Bootstrap(peers)
188188
}
189189

190-
func (a *appender) Frozen(ctx context.Context) {
190+
func (a *appender) Archive(ctx context.Context) {
191+
a.appendMu.Lock()
192+
defer a.appendMu.Unlock()
193+
if a.actx.Archived() {
194+
return
195+
}
191196
if frag, err := a.raw.PrepareArchive(ctx, a.actx); err == nil {
192197
data, _ := block.MarshalFragment(ctx, frag)
193198
_ = a.node.Propose(ctx, data)

Diff for: internal/store/block/raw.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,18 @@ type Raw interface {
5555
Delete(context.Context) error
5656
}
5757

58-
type SegmentState string
58+
type State string
5959

6060
const (
61-
StateWorking = SegmentState("working")
62-
StateFrozen = SegmentState("frozen")
63-
StatePreFrozen = SegmentState("prefrozen")
61+
StateWorking = State("working")
62+
StateArchiving = State("archiving")
63+
StateArchived = State("archived")
6464
)
6565

6666
type Statistics struct {
6767
ID vanus.ID
6868
Capacity uint64
69-
Archived bool
70-
State SegmentState
69+
State State
7170
EntryNum uint32
7271
EntrySize uint64
7372
// FirstEntryStime is the millisecond timestamp when the first Entry will be written to Block.

Diff for: internal/store/segment/replica.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,12 @@ func (r *replica) Status() *metapb.SegmentHealthInfo {
9494
Capacity: int64(stat.Capacity),
9595
Size: int64(stat.EntrySize),
9696
EventNumber: int32(stat.EntryNum),
97-
IsFull: stat.Archived,
9897
State: string(stat.State),
9998
Leader: cs.Leader.Uint64(),
10099
Term: cs.Term,
101100
FirstEventBornTime: stat.FirstEntryStime,
102101
}
103-
if stat.Archived {
102+
if stat.State == block.StateArchived {
104103
info.LastEventBornTime = stat.LastEntryStime
105104
}
106105
return info

Diff for: internal/store/segment/server.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -386,14 +386,14 @@ func (s *server) runHeartbeat(_ context.Context) error {
386386
})
387387
}
388388

389-
if segment.State != string(block.StatePreFrozen) {
389+
if segment.State != "freezing" {
390390
break
391391
}
392392

393393
s.replicas.Range(func(key, value interface{}) bool {
394394
b, _ := value.(replica)
395395
if b.appender.Status().Leader.Equals(vanus.ID(segment.LeaderBlockId)) {
396-
b.appender.Frozen(context.Background())
396+
b.appender.Archive(context.Background())
397397
return false
398398
}
399399
return true
@@ -716,11 +716,10 @@ func (s *server) onBlockArchived(stat block.Statistics) {
716716
Capacity: int64(stat.Capacity),
717717
Size: int64(stat.EntrySize),
718718
EventNumber: int32(stat.EntryNum),
719-
IsFull: stat.Archived,
720719
State: string(stat.State),
721720
FirstEventBornTime: stat.FirstEntryStime,
722721
}
723-
if stat.Archived {
722+
if stat.State == block.StateArchived {
724723
info.LastEventBornTime = stat.LastEntryStime
725724
}
726725

Diff for: internal/store/vsb/block.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,10 @@ func (b *vsBlock) status() block.Statistics {
110110
return b.stat(m, indexes, block.StateWorking)
111111
}
112112

113-
func (b *vsBlock) stat(m meta, indexes []index.Index, state block.SegmentState) block.Statistics {
113+
func (b *vsBlock) stat(m meta, indexes []index.Index, state block.State) block.Statistics {
114114
s := block.Statistics{
115115
ID: b.id,
116116
Capacity: uint64(b.capacity),
117-
Archived: m.archived,
118117
State: state,
119118
EntryNum: uint32(m.entryNum),
120119
EntrySize: uint64(m.entryLength),

Diff for: internal/store/vsb/block_append.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (b *vsBlock) PrepareAppend(
105105
full := actx.size(b.dataOffset) >= b.capacity
106106
if full && b.lis != nil {
107107
m, indexes := makeSnapshot(b.actx, b.indexes)
108-
b.lis.OnArchived(b.stat(m, indexes, block.StatePreFrozen))
108+
b.lis.OnArchived(b.stat(m, indexes, block.StateArchiving))
109109
}
110110

111111
return seqs, frag, full, nil
@@ -203,7 +203,7 @@ func (b *vsBlock) CommitAppend(ctx context.Context, frags ...block.Fragment) (bo
203203
}()
204204

205205
if b.lis != nil {
206-
b.lis.OnArchived(b.stat(m, i, block.StateFrozen))
206+
b.lis.OnArchived(b.stat(m, i, block.StateArchived))
207207
}
208208
}
209209

Diff for: internal/store/vsb/block_append_test.go

-11
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ func TestVSBlock_Append(t *testing.T) {
107107
So(full, ShouldBeFalse)
108108

109109
stat := b.status()
110-
So(stat.Archived, ShouldBeFalse)
111110
So(stat.EntryNum, ShouldEqual, 0)
112111
So(stat.EntrySize, ShouldEqual, 0)
113112

@@ -116,7 +115,6 @@ func TestVSBlock_Append(t *testing.T) {
116115
So(archived, ShouldBeFalse)
117116

118117
stat = b.status()
119-
So(stat.Archived, ShouldBeFalse)
120118
So(stat.EntryNum, ShouldEqual, 1)
121119
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0)
122120

@@ -131,7 +129,6 @@ func TestVSBlock_Append(t *testing.T) {
131129
So(full, ShouldBeTrue)
132130

133131
stat = b.status()
134-
So(stat.Archived, ShouldBeFalse)
135132
So(stat.EntryNum, ShouldEqual, 1)
136133
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0)
137134

@@ -140,7 +137,6 @@ func TestVSBlock_Append(t *testing.T) {
140137
So(archived, ShouldBeFalse)
141138

142139
stat = b.status()
143-
So(stat.Archived, ShouldBeFalse)
144140
So(stat.EntryNum, ShouldEqual, 2)
145141
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
146142

@@ -161,7 +157,6 @@ func TestVSBlock_Append(t *testing.T) {
161157
So(full, ShouldBeFalse)
162158

163159
stat := b.status()
164-
So(stat.Archived, ShouldBeFalse)
165160
So(stat.EntryNum, ShouldEqual, 0)
166161
So(stat.EntrySize, ShouldEqual, 0)
167162

@@ -173,7 +168,6 @@ func TestVSBlock_Append(t *testing.T) {
173168
So(full, ShouldBeTrue)
174169

175170
stat = b.status()
176-
So(stat.Archived, ShouldBeFalse)
177171
So(stat.EntryNum, ShouldEqual, 0)
178172
So(stat.EntrySize, ShouldEqual, 0)
179173

@@ -182,7 +176,6 @@ func TestVSBlock_Append(t *testing.T) {
182176
So(archived, ShouldBeFalse)
183177

184178
stat = b.status()
185-
So(stat.Archived, ShouldBeFalse)
186179
So(stat.EntryNum, ShouldEqual, 2)
187180
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
188181

@@ -203,7 +196,6 @@ func TestVSBlock_Append(t *testing.T) {
203196
So(full, ShouldBeTrue)
204197

205198
stat := b.status()
206-
So(stat.Archived, ShouldBeFalse)
207199
So(stat.EntryNum, ShouldEqual, 0)
208200
So(stat.EntrySize, ShouldEqual, 0)
209201

@@ -212,7 +204,6 @@ func TestVSBlock_Append(t *testing.T) {
212204
So(archived, ShouldBeFalse)
213205

214206
stat = b.status()
215-
So(stat.Archived, ShouldBeFalse)
216207
So(stat.EntryNum, ShouldEqual, 2)
217208
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
218209

@@ -288,7 +279,6 @@ func TestVSBlock_Append(t *testing.T) {
288279
So(frag1.StartOffset(), ShouldEqual, vsbtest.EndEntryOffset)
289280

290281
stat := b.status()
291-
So(stat.Archived, ShouldBeFalse)
292282
So(stat.EntryNum, ShouldEqual, 0)
293283
So(stat.EntrySize, ShouldEqual, 0)
294284

@@ -297,7 +287,6 @@ func TestVSBlock_Append(t *testing.T) {
297287
So(archived, ShouldBeTrue)
298288

299289
stat = b.status()
300-
So(stat.Archived, ShouldBeTrue)
301290
So(stat.EntryNum, ShouldEqual, 2)
302291
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
303292

Diff for: internal/store/vsb/block_open_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ func TestVSBlock_Open(t *testing.T) {
6565

6666
stat := b.status()
6767
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
68-
So(stat.Archived, ShouldBeTrue)
6968
So(stat.EntryNum, ShouldEqual, 2)
7069
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
7170

@@ -104,7 +103,6 @@ func TestVSBlock_Open(t *testing.T) {
104103

105104
stat := b.status()
106105
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
107-
So(stat.Archived, ShouldBeTrue)
108106
So(stat.EntryNum, ShouldEqual, 2)
109107
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
110108

@@ -141,7 +139,6 @@ func TestVSBlock_Open(t *testing.T) {
141139

142140
stat := b.status()
143141
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
144-
So(stat.Archived, ShouldBeFalse)
145142
So(stat.EntryNum, ShouldEqual, 2)
146143
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
147144

0 commit comments

Comments
 (0)