Skip to content

Commit

Permalink
Fix thread_pool items
Browse files Browse the repository at this point in the history
  • Loading branch information
gammasoft71 committed Nov 3, 2024
1 parent 7dd8e78 commit d7edfbf
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 18 deletions.
40 changes: 29 additions & 11 deletions src/xtd.core/include/xtd/threading/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "wait_callback.h"
#include "wait_or_timer_callback.h"
#include "../core_export.h"
#include "../new_ptr.h"
#include "../static.h"
#include "../time_span.h"
#include "../types.h"
Expand Down Expand Up @@ -59,21 +60,38 @@ namespace xtd {
template<typename callback_t>
struct thread_item : public object {
thread_item() = default;
thread_item(const callback_t& callback) : callback(callback) {}
thread_item(const callback_t& callback, std::any state) : callback(callback), state(state) {}
thread_item(const callback_t& callback, std::any state, wait_handle& wait_object, int32 milliseconds_timeout_interval, bool execute_only_once) : callback(callback), state(state), wait_object(&wait_object), milliseconds_timeout_interval(milliseconds_timeout_interval), execute_only_once(execute_only_once) {}
thread_item(thread_item&&) = default;
thread_item(const thread_item&) = default;
thread_item& operator =(thread_item&&) = default;
thread_item& operator =(const thread_item&) = default;
thread_item(const callback_t& callback) : data {xtd::new_ptr<sdata>(callback)} {}
thread_item(const callback_t& callback, std::any state) : data {xtd::new_ptr<sdata>(callback, state)} {}
thread_item(const callback_t& callback, std::any state, wait_handle& wait_object, int32 milliseconds_timeout_interval, bool execute_only_once) : data {xtd::new_ptr<sdata>(callback, state, &wait_object, milliseconds_timeout_interval, execute_only_once)} {}

callback_t callback;
std::any state;
wait_handle* wait_object = null;
int32 milliseconds_timeout_interval;
bool execute_only_once = true;
bool unregistered = false;
struct sdata {
sdata() = default;
sdata(sdata&&) = default;
sdata(const sdata&) = default;
sdata& operator =(sdata&&) = default;
sdata& operator =(const sdata&) = default;
sdata(const callback_t& callback) : callback {callback} {}
sdata(const callback_t& callback, std::any state) : callback {callback}, state {state} {}
sdata(const callback_t& callback, std::any state, wait_handle* wait_object, int32 milliseconds_timeout_interval, bool execute_only_once) : callback {callback}, state {state}, wait_object {wait_object}, milliseconds_timeout_interval {milliseconds_timeout_interval}, execute_only_once {execute_only_once} {}

callback_t callback;
std::any state;
wait_handle* wait_object = null;
int32 milliseconds_timeout_interval;
bool execute_only_once = true;
bool unregistered = false;
};

ptr<sdata> data = xtd::new_ptr<sdata>();

void run() {
do {
this->callback(state);
} while (!execute_only_once);
this->callback(data->state);
} while (!data->execute_only_once);
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/xtd.core/src/xtd/threading/registered_wait_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ bool registered_wait_handle::unregister() {
if (item_ == 0) return false;

auto item = reinterpret_cast<thread_pool::thread_pool_asynchronous_io_item*>(item_);
item->unregistered = true;
return item->wait_object->signal();
item->data->unregistered = true;
return item->data->wait_object->signal();
}

bool registered_wait_handle::unregister(wait_handle& wait_object) {
if (item_ == 0) return false;

auto item = reinterpret_cast<thread_pool::thread_pool_asynchronous_io_item*>(item_);
item->unregistered = true;
item->data->unregistered = true;
return wait_object.signal();
}
8 changes: 4 additions & 4 deletions src/xtd.core/src/xtd/threading/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ void thread_pool::asynchronous_io_run() {
}

do {
auto wait_result = wait_handle::wait_any({&static_data_.close_asynchronous_io_threads_manual_reset_event, item.wait_object}, item.milliseconds_timeout_interval);
auto wait_result = wait_handle::wait_any({&static_data_.close_asynchronous_io_threads_manual_reset_event, item.data->wait_object}, item.data->milliseconds_timeout_interval);
if (wait_result == 0) break;
auto timeout = wait_result == wait_handle::wait_timeout;
if (!item.unregistered) item.callback(item.state, timeout);
} while (!item.execute_only_once && !item.unregistered);
if (!item.data->unregistered) item.data->callback(item.data->state, timeout);
} while (!item.data->execute_only_once && !item.data->unregistered);
}
}

Expand Down Expand Up @@ -222,6 +222,6 @@ void thread_pool::run() {
item = static_data_.thread_pool_items.back();
static_data_.thread_pool_items.pop_back();
}
item.callback(item.state);
item.data->callback(item.data->state);
}
}

0 comments on commit d7edfbf

Please sign in to comment.