@@ -71,15 +71,39 @@ void mpmc_destroy(MPMCQueue<T> *q) {
7171}
7272
7373
74+ template <typename T>
75+ bool mpmc_internal_grow (MPMCQueue<T> *q) {
76+ mutex_lock (&q->mutex );
77+ i32 old_size = q->mask +1 ;
78+ i32 new_size = old_size*2 ;
79+ resize_array_raw (&q->nodes , q->allocator , old_size, new_size);
80+ if (q->nodes == nullptr ) {
81+ GB_PANIC (" Unable to resize enqueue: %td -> %td" , old_size, new_size);
82+ mutex_unlock (&q->mutex );
83+ return false ;
84+ }
85+ resize_array_raw (&q->indices , q->allocator , old_size, new_size);
86+ if (q->indices == nullptr ) {
87+ GB_PANIC (" Unable to resize enqueue: %td -> %td" , old_size, new_size);
88+ mutex_unlock (&q->mutex );
89+ return false ;
90+ }
91+ mpmc_internal_init_indices (q->indices , old_size, new_size);
92+ q->mask = new_size-1 ;
93+ mutex_unlock (&q->mutex );
94+ return true ;
95+ }
96+
7497template <typename T>
7598i32 mpmc_enqueue (MPMCQueue<T> *q, T const &data) {
7699 GB_ASSERT (q->mask != 0 );
77100
78101 i32 head_idx = q->head_idx .load (std::memory_order_relaxed);
79102
80103 for (;;) {
81- auto node = &q->nodes [head_idx & q->mask ];
82- auto node_idx_ptr = &q->indices [head_idx & q->mask ];
104+ i32 index = head_idx & q->mask ;
105+ auto node = &q->nodes [index];
106+ auto node_idx_ptr = &q->indices [index];
83107 i32 node_idx = node_idx_ptr->load (std::memory_order_acquire);
84108 i32 diff = node_idx - head_idx;
85109
@@ -91,24 +115,9 @@ i32 mpmc_enqueue(MPMCQueue<T> *q, T const &data) {
91115 return q->count .fetch_add (1 , std::memory_order_release);
92116 }
93117 } else if (diff < 0 ) {
94- mutex_lock (&q->mutex );
95- i32 old_size = q->mask +1 ;
96- i32 new_size = old_size*2 ;
97- resize_array_raw (&q->nodes , q->allocator , old_size, new_size);
98- if (q->nodes == nullptr ) {
99- GB_PANIC (" Unable to resize enqueue: %td -> %td" , old_size, new_size);
100- mutex_unlock (&q->mutex );
101- return -1 ;
102- }
103- resize_array_raw (&q->indices , q->allocator , old_size, new_size);
104- if (q->indices == nullptr ) {
105- GB_PANIC (" Unable to resize enqueue: %td -> %td" , old_size, new_size);
106- mutex_unlock (&q->mutex );
118+ if (!mpmc_internal_grow (q)) {
107119 return -1 ;
108120 }
109- mpmc_internal_init_indices (q->indices , old_size, new_size);
110- q->mask = new_size-1 ;
111- mutex_unlock (&q->mutex );
112121 } else {
113122 head_idx = q->head_idx .load (std::memory_order_relaxed);
114123 }
0 commit comments