Skip to content

Commit cc41549

Browse files
authored
Merge pull request #24 from StoneYunZhao/zhaoyun/peek
Add PeekChan function
2 parents 02dd623 + 2cb4338 commit cc41549

File tree

2 files changed

+90
-0
lines changed

2 files changed

+90
-0
lines changed

diskqueue.go

+13
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (l LogLevel) String() string {
4747
type Interface interface {
4848
Put([]byte) error
4949
ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
50+
PeekChan() <-chan []byte // this is expected to be an *unbuffered* channel
5051
Close() error
5152
Delete() error
5253
Depth() int64
@@ -91,6 +92,9 @@ type diskQueue struct {
9192
// exposed via ReadChan()
9293
readChan chan []byte
9394

95+
// exposed via PeekChan()
96+
peekChan chan []byte
97+
9498
// internal channels
9599
depthChan chan int64
96100
writeChan chan []byte
@@ -115,6 +119,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
115119
minMsgSize: minMsgSize,
116120
maxMsgSize: maxMsgSize,
117121
readChan: make(chan []byte),
122+
peekChan: make(chan []byte),
118123
depthChan: make(chan int64),
119124
writeChan: make(chan []byte),
120125
writeResponseChan: make(chan error),
@@ -152,6 +157,10 @@ func (d *diskQueue) ReadChan() <-chan []byte {
152157
return d.readChan
153158
}
154159

160+
func (d *diskQueue) PeekChan() <-chan []byte {
161+
return d.peekChan
162+
}
163+
155164
// Put writes a []byte to the queue
156165
func (d *diskQueue) Put(data []byte) error {
157166
d.RLock()
@@ -648,6 +657,7 @@ func (d *diskQueue) ioLoop() {
648657
var err error
649658
var count int64
650659
var r chan []byte
660+
var p chan []byte
651661

652662
syncTicker := time.NewTicker(d.syncTimeout)
653663

@@ -676,13 +686,16 @@ func (d *diskQueue) ioLoop() {
676686
}
677687
}
678688
r = d.readChan
689+
p = d.peekChan
679690
} else {
680691
r = nil
692+
p = nil
681693
}
682694

683695
select {
684696
// the Go channel spec dictates that nil channel operations (read or write)
685697
// in a select are skipped, we set r to d.readChan only when there is data to read
698+
case p <- dataRead:
686699
case r <- dataRead:
687700
count++
688701
// moveForward sets needSync flag if a file is removed

diskqueue_test.go

+77
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,83 @@ func TestDiskQueueRoll(t *testing.T) {
130130
}
131131
}
132132

133+
func TestDiskQueuePeek(t *testing.T) {
134+
l := NewTestLogger(t)
135+
dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix()))
136+
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
137+
if err != nil {
138+
panic(err)
139+
}
140+
defer os.RemoveAll(tmpDir)
141+
msg := bytes.Repeat([]byte{0}, 10)
142+
ml := int64(len(msg))
143+
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
144+
defer dq.Close()
145+
NotNil(t, dq)
146+
Equal(t, int64(0), dq.Depth())
147+
148+
t.Run("roll", func(t *testing.T) {
149+
for i := 0; i < 10; i++ {
150+
err := dq.Put(msg)
151+
Nil(t, err)
152+
Equal(t, int64(i+1), dq.Depth())
153+
}
154+
155+
for i := 10; i > 0; i-- {
156+
Equal(t, msg, <-dq.PeekChan())
157+
Equal(t, int64(i), dq.Depth())
158+
159+
Equal(t, msg, <-dq.ReadChan())
160+
Equal(t, int64(i-1), dq.Depth())
161+
}
162+
163+
Nil(t, dq.Empty())
164+
})
165+
166+
t.Run("peek-read", func(t *testing.T) {
167+
for i := 0; i < 10; i++ {
168+
err := dq.Put(msg)
169+
Nil(t, err)
170+
Equal(t, int64(i+1), dq.Depth())
171+
}
172+
173+
for i := 10; i > 0; i-- {
174+
Equal(t, msg, <-dq.PeekChan())
175+
Equal(t, int64(i), dq.Depth())
176+
177+
Equal(t, msg, <-dq.PeekChan())
178+
Equal(t, int64(i), dq.Depth())
179+
180+
Equal(t, msg, <-dq.ReadChan())
181+
Equal(t, int64(i-1), dq.Depth())
182+
}
183+
184+
Nil(t, dq.Empty())
185+
})
186+
187+
t.Run("read-peek", func(t *testing.T) {
188+
for i := 0; i < 10; i++ {
189+
err := dq.Put(msg)
190+
Nil(t, err)
191+
Equal(t, int64(i+1), dq.Depth())
192+
}
193+
194+
for i := 10; i > 1; i-- {
195+
Equal(t, msg, <-dq.PeekChan())
196+
Equal(t, int64(i), dq.Depth())
197+
198+
Equal(t, msg, <-dq.ReadChan())
199+
Equal(t, int64(i-1), dq.Depth())
200+
201+
Equal(t, msg, <-dq.PeekChan())
202+
Equal(t, int64(i-1), dq.Depth())
203+
}
204+
205+
Nil(t, dq.Empty())
206+
})
207+
208+
}
209+
133210
func assertFileNotExist(t *testing.T, fn string) {
134211
f, err := os.OpenFile(fn, os.O_RDONLY, 0600)
135212
Equal(t, (*os.File)(nil), f)

0 commit comments

Comments
 (0)