Skip to content

Commit 12cc831

Browse files
Adrian Bradadrianbrad
Adrian Brad
authored andcommitted
feat(blocking): fix Reset() unbound memory growth potential
1 parent 7f65020 commit 12cc831

File tree

1 file changed

+42
-68
lines changed

1 file changed

+42
-68
lines changed

blocking.go

Lines changed: 42 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,9 @@ var _ Queue[any] = (*Blocking[any])(nil)
1818
// If there are no elements available the retrieve operations wait until
1919
// elements are added to the queue.
2020
type Blocking[T comparable] struct {
21-
// elements queue
22-
elements []T
23-
elementsIndex int
24-
25-
initialLen int
26-
27-
capacity *int
21+
initialElems []T
22+
elems []T
23+
capacity *int
2824

2925
// synchronization
3026
lock sync.RWMutex
@@ -45,20 +41,23 @@ func NewBlocking[T comparable](
4541
o.apply(&options)
4642
}
4743

44+
// Store initial elements
45+
initialElems := make([]T, len(elems))
46+
copy(initialElems, elems)
47+
4848
queue := &Blocking[T]{
49-
elements: elems,
50-
elementsIndex: 0,
51-
initialLen: len(elems),
52-
capacity: options.capacity,
53-
lock: sync.RWMutex{},
49+
elems: elems,
50+
initialElems: initialElems,
51+
capacity: options.capacity,
52+
lock: sync.RWMutex{},
5453
}
5554

5655
queue.notEmptyCond = sync.NewCond(&queue.lock)
5756
queue.notFullCond = sync.NewCond(&queue.lock)
5857

5958
if queue.capacity != nil {
60-
if len(queue.elements) > *queue.capacity {
61-
queue.elements = queue.elements[:*queue.capacity]
59+
if len(queue.elems) > *queue.capacity {
60+
queue.elems = queue.elems[:*queue.capacity]
6261
}
6362
}
6463

@@ -77,7 +76,7 @@ func (bq *Blocking[T]) OfferWait(elem T) {
7776
bq.notFullCond.Wait()
7877
}
7978

80-
bq.elements = append(bq.elements, elem)
79+
bq.elems = append(bq.elems, elem)
8180

8281
bq.notEmptyCond.Signal()
8382
}
@@ -92,22 +91,21 @@ func (bq *Blocking[T]) Offer(elem T) error {
9291
return ErrQueueIsFull
9392
}
9493

95-
bq.elements = append(bq.elements, elem)
94+
bq.elems = append(bq.elems, elem)
9695

9796
bq.notEmptyCond.Signal()
9897

9998
return nil
10099
}
101100

102-
// Reset sets the queue elements index to 0. The queue will be in its initial
103-
// state.
101+
// Reset sets the queue to its initial state with the original elements.
104102
func (bq *Blocking[T]) Reset() {
105103
bq.lock.Lock()
106104
defer bq.lock.Unlock()
107105

108-
bq.elementsIndex = 0
109-
110-
bq.elements = bq.elements[:bq.initialLen]
106+
// Restore initial elements
107+
bq.elems = make([]T, len(bq.initialElems))
108+
copy(bq.elems, bq.initialElems)
111109

112110
bq.notEmptyCond.Broadcast()
113111
}
@@ -117,29 +115,24 @@ func (bq *Blocking[T]) Reset() {
117115
// GetWait removes and returns the head of the elements queue.
118116
// If no element is available it waits until the queue
119117
// has an element available.
120-
//
121-
// It does not actually remove elements from the elements slice, but
122-
// it's incrementing the underlying index.
123118
func (bq *Blocking[T]) GetWait() (v T) {
124119
bq.lock.Lock()
125120
defer bq.lock.Unlock()
126121

127-
defer bq.notFullCond.Signal()
128-
129-
idx := bq.getNextIndexOrWait()
122+
for bq.isEmpty() {
123+
bq.notEmptyCond.Wait()
124+
}
130125

131-
elem := bq.elements[idx]
126+
elem := bq.elems[0]
127+
bq.elems = bq.elems[1:]
132128

133-
bq.elementsIndex++
129+
bq.notFullCond.Signal()
134130

135131
return elem
136132
}
137133

138134
// Get removes and returns the head of the elements queue.
139135
// If no element is available it returns an ErrNoElementsAvailable error.
140-
//
141-
// It does not actually remove elements from the elements slice, but
142-
// it's incrementing the underlying index.
143136
func (bq *Blocking[T]) Get() (v T, _ error) {
144137
bq.lock.Lock()
145138
defer bq.lock.Unlock()
@@ -154,9 +147,9 @@ func (bq *Blocking[T]) Clear() []T {
154147

155148
defer bq.notFullCond.Broadcast()
156149

157-
removed := bq.elements[bq.elementsIndex:]
158-
159-
bq.elementsIndex += len(removed)
150+
removed := make([]T, len(bq.elems))
151+
copy(removed, bq.elems)
152+
bq.elems = bq.elems[:0]
160153

161154
return removed
162155
}
@@ -199,9 +192,7 @@ func (bq *Blocking[T]) Peek() (v T, _ error) {
199192
return v, ErrNoElementsAvailable
200193
}
201194

202-
elem := bq.elements[bq.elementsIndex]
203-
204-
return elem, nil
195+
return bq.elems[0], nil
205196
}
206197

207198
// PeekWait retrieves but does not return the head of the queue.
@@ -215,7 +206,7 @@ func (bq *Blocking[T]) PeekWait() T {
215206
bq.notEmptyCond.Wait()
216207
}
217208

218-
elem := bq.elements[bq.elementsIndex]
209+
elem := bq.elems[0]
219210

220211
// send the not empty signal again in case any remove method waits.
221212
bq.notEmptyCond.Signal()
@@ -228,16 +219,16 @@ func (bq *Blocking[T]) Size() int {
228219
bq.lock.RLock()
229220
defer bq.lock.RUnlock()
230221

231-
return bq.size()
222+
return len(bq.elems)
232223
}
233224

234225
// Contains returns true if the queue contains the given element.
235226
func (bq *Blocking[T]) Contains(elem T) bool {
236227
bq.lock.RLock()
237228
defer bq.lock.RUnlock()
238229

239-
for i := range bq.elements[bq.elementsIndex:] {
240-
if bq.elements[i] == elem {
230+
for _, e := range bq.elems {
231+
if e == elem {
241232
return true
242233
}
243234
}
@@ -255,20 +246,9 @@ func (bq *Blocking[T]) IsEmpty() bool {
255246

256247
// ===================================Helpers==================================
257248

258-
// getNextIndexOrWait returns the next available index of the elements slice.
259-
func (bq *Blocking[T]) getNextIndexOrWait() int {
260-
if !bq.isEmpty() {
261-
return bq.elementsIndex
262-
}
263-
264-
bq.notEmptyCond.Wait()
265-
266-
return bq.getNextIndexOrWait()
267-
}
268-
269249
// isEmpty returns true if the queue is empty.
270250
func (bq *Blocking[T]) isEmpty() bool {
271-
return bq.elementsIndex >= len(bq.elements)
251+
return len(bq.elems) == 0
272252
}
273253

274254
// isFull returns true if the queue is full.
@@ -277,43 +257,37 @@ func (bq *Blocking[T]) isFull() bool {
277257
return false
278258
}
279259

280-
return len(bq.elements)-bq.elementsIndex >= *bq.capacity
260+
return len(bq.elems) >= *bq.capacity
281261
}
282262

283263
func (bq *Blocking[T]) size() int {
284-
return len(bq.elements) - bq.elementsIndex
264+
return len(bq.elems)
285265
}
286266

287267
func (bq *Blocking[T]) get() (v T, _ error) {
288-
defer bq.notFullCond.Signal()
289-
290268
if bq.isEmpty() {
291269
return v, ErrNoElementsAvailable
292270
}
293271

294-
elem := bq.elements[bq.elementsIndex]
272+
elem := bq.elems[0]
273+
bq.elems = bq.elems[1:]
295274

296-
bq.elementsIndex++
275+
bq.notFullCond.Signal()
297276

298277
return elem, nil
299278
}
300279

301280
// MarshalJSON serializes the Blocking queue to JSON.
302281
func (bq *Blocking[T]) MarshalJSON() ([]byte, error) {
303282
bq.lock.RLock()
283+
defer bq.lock.RUnlock()
304284

305-
if bq.IsEmpty() {
306-
bq.lock.RUnlock()
285+
if bq.isEmpty() {
307286
return []byte("[]"), nil
308287
}
309288

310-
// Extract elements from `elements` starting at `elementsIndex`.
311-
elements := bq.elements[bq.elementsIndex:]
312-
313-
bq.lock.RUnlock()
314-
315289
// Marshal the slice of elements into JSON.
316-
data, err := json.Marshal(elements)
290+
data, err := json.Marshal(bq.elems)
317291
if err != nil {
318292
return nil, fmt.Errorf("failed to marshal blocking queue: %w", err)
319293
}

0 commit comments

Comments
 (0)