diff --git a/Orange/widgets/data/owimpute.py b/Orange/widgets/data/owimpute.py index be9847837da..bc054a6ace2 100644 --- a/Orange/widgets/data/owimpute.py +++ b/Orange/widgets/data/owimpute.py @@ -287,6 +287,7 @@ def set_default_method(self, index): @Inputs.data @check_sql_input def set_data(self, data): + self.cancel() self.closeContext() self.varmodel[:] = [] self._variable_imputation_state = {} # type: VariableState @@ -304,6 +305,7 @@ def set_data(self, data): @Inputs.learner def set_learner(self, learner): + self.cancel() self.learner = learner or self.default_learner imputer = self.create_imputer(Method.Model) button = self.default_button_group.button(Method.Model) @@ -380,8 +382,8 @@ def impute_one(method, var, data): w.doneAll.connect(self.__commit_finish) w.progressChanged.connect(self.__progress_changed) self.__task = Task(futures, w) - self.progressBarInit(processEvents=False) - self.setBlocking(True) + self.progressBarInit() + self.setInvalidated(True) @Slot() def __commit_finish(self): @@ -430,7 +432,7 @@ def create_data(attributes, class_vars): return None self.__task = None - self.setBlocking(False) + self.setInvalidated(False) self.progressBarFinished() attributes = [] @@ -458,18 +460,22 @@ def __progress_changed(self, n, d): self.progressBarSet(100. * n / d) def cancel(self): + self.__cancel(wait=False) + + def __cancel(self, wait=False): if self.__task is not None: task, self.__task = self.__task, None task.cancel() task.watcher.doneAll.disconnect(self.__commit_finish) task.watcher.progressChanged.disconnect(self.__progress_changed) - concurrent.futures.wait(task.futures) - task.watcher.flush() + if wait: + concurrent.futures.wait(task.futures) + task.watcher.flush() self.progressBarFinished() - self.setBlocking(False) + self.setInvalidated(False) def onDeleteWidget(self): - self.cancel() + self.__cancel(wait=True) super().onDeleteWidget() def send_report(self): diff --git a/Orange/widgets/evaluate/owtestlearners.py b/Orange/widgets/evaluate/owtestlearners.py index 31e2866ac43..d82796b4894 100644 --- a/Orange/widgets/evaluate/owtestlearners.py +++ b/Orange/widgets/evaluate/owtestlearners.py @@ -7,7 +7,6 @@ import copy from functools import partial, reduce -import concurrent.futures from concurrent.futures import Future from collections import OrderedDict, namedtuple from typing import Any, Optional, List, Dict, Callable @@ -16,7 +15,7 @@ from AnyQt import QtGui from AnyQt.QtGui import QStandardItem -from AnyQt.QtCore import Qt, QSize, QThread, QMetaObject, Q_ARG +from AnyQt.QtCore import Qt, QSize, QThread from AnyQt.QtCore import pyqtSlot as Slot from Orange.base import Learner @@ -34,8 +33,8 @@ usable_scorers, ScoreTable, learner_name, scorer_caller from Orange.widgets.utils.itemmodels import DomainModel from Orange.widgets.utils.widgetpreview import WidgetPreview +from Orange.widgets.utils.concurrent import ThreadExecutor, TaskState from Orange.widgets.widget import OWWidget, Msg, Input, Output -from Orange.widgets.utils.concurrent import ThreadExecutor log = logging.getLogger(__name__) @@ -225,7 +224,7 @@ def __init__(self): # Do we need to [re]test any learners, set by _invalidate and # cleared by __update self.__needupdate = False - self.__task = None # type: Optional[Task] + self.__task = None # type: Optional[TaskState] self.__executor = ThreadExecutor() sbox = gui.vBox(self.controlArea, "Sampling") @@ -328,6 +327,7 @@ def set_train_data(self, data): ---------- data : Optional[Orange.data.Table] """ + self.cancel() self.Information.data_sampled.clear() self.Error.train_data_empty.clear() self.Error.class_required.clear() @@ -587,6 +587,7 @@ def _on_target_class_changed(self): self.update_stats_model() def _invalidate(self, which=None): + self.cancel() self.fold_feature_selected = \ self.resampling == OWTestLearners.FeatureFold # Invalidate learner results for `which` input keys @@ -802,59 +803,46 @@ def __submit(self, testfunc): """ assert self.__state != State.Running # Setup the task - task = Task() + task = TaskState() def progress_callback(finished): - if task.cancelled: + if task.is_interruption_requested(): raise UserInterrupt() - QMetaObject.invokeMethod( - self, "setProgressValue", Qt.QueuedConnection, - Q_ARG(float, 100 * finished) - ) - - def ondone(_): - QMetaObject.invokeMethod( - self, "__task_complete", Qt.QueuedConnection, - Q_ARG(object, task)) + task.set_progress_value(100 * finished) testfunc = partial(testfunc, callback=progress_callback) - task.future = self.__executor.submit(testfunc) - task.future.add_done_callback(ondone) + task.start(self.__executor, testfunc) + + task.progress_changed.connect(self.setProgressValue) + task.watcher.finished.connect(self.__task_complete) + self.Outputs.evaluations_results.invalidate() + self.Outputs.predictions.invalidate() self.progressBarInit() - self.setBlocking(True) self.setStatusMessage("Running") self.__state = State.Running self.__task = task @Slot(object) - def __task_complete(self, task): + def __task_complete(self, f: 'Future[Results]'): # handle a completed task assert self.thread() is QThread.currentThread() - if self.__task is not task: - assert task.cancelled - log.debug("Reaping cancelled task: %r", "<>") - return - - self.setBlocking(False) + assert self.__task is not None and self.__task.future is f self.progressBarFinished() self.setStatusMessage("") - result = task.future - assert result.done() + assert f.done() self.__task = None + self.__state = State.Done try: - results = result.result() # type: Results + results = f.result() # type: Results learners = results.learners # type: List[Learner] - except Exception as er: + except Exception as er: # pylint: disable=broad-except log.exception("testing error (in __task_complete):", exc_info=True) self.error("\n".join(traceback.format_exception_only(type(er), er))) - self.__state = State.Done return - self.__state = State.Done - learner_key = {slot.learner: key for key, slot in self.learners.items()} assert all(learner in learner_key for learner in learners) @@ -890,7 +878,10 @@ def cancel(self): self.__state = State.Cancelled task, self.__task = self.__task, None task.cancel() - assert task.future.done() + task.progress_changed.disconnect(self.setProgressValue) + task.watcher.finished.disconnect(self.__task_complete) + self.progressBarFinished() + self.setStatusMessage("") def onDeleteWidget(self): self.cancel() @@ -974,33 +965,6 @@ def results_one_vs_rest(results, pos_index): return res -class Task: - """ - A simple task state. - """ - #: A future holding the results. This field is set by the client. - future = ... # type: Future - #: True if the task was cancelled - cancelled = False # type: bool - #: A function to call. Filled by the client. - func = ... # type: Callable[[Callable[[float], None]], Results] - - def cancel(self): - """ - Cancel the task. - - Set the `cancelled` field to True and block until the future is done. - """ - log.debug("cancel task") - self.cancelled = True - cancelled = self.future.cancel() - if cancelled: - log.debug("Task cancelled before starting") - else: - log.debug("Attempting cooperative cancellation for task") - concurrent.futures.wait([self.future]) - - if __name__ == "__main__": # pragma: no cover filename = "iris" preview_data = Table(filename) diff --git a/Orange/widgets/unsupervised/owkmeans.py b/Orange/widgets/unsupervised/owkmeans.py index 11cc4a9f90d..f6f38f3bf9a 100644 --- a/Orange/widgets/unsupervised/owkmeans.py +++ b/Orange/widgets/unsupervised/owkmeans.py @@ -322,7 +322,7 @@ def __commit_finished(self): assert self.data is not None self.__task = None - self.setBlocking(False) + self.setInvalidated(False) self.progressBarFinished() if self.optimize_k: @@ -355,7 +355,7 @@ def __launch_tasks(self, ks): self.__task = Task(futures, watcher) self.progressBarInit() - self.setBlocking(True) + self.setInvalidated(True) def cancel(self): if self.__task is not None: @@ -368,7 +368,7 @@ def cancel(self): task.watcher.doneAll.disconnect(self.__commit_finished) self.progressBarFinished() - self.setBlocking(False) + self.setInvalidated(False) def run_optimization(self): if not self.enough_data_instances(self.k_from): diff --git a/Orange/widgets/unsupervised/owlouvainclustering.py b/Orange/widgets/unsupervised/owlouvainclustering.py index fc59219c03e..b550c8ec7de 100644 --- a/Orange/widgets/unsupervised/owlouvainclustering.py +++ b/Orange/widgets/unsupervised/owlouvainclustering.py @@ -313,12 +313,12 @@ def progressBarSet(self, value, *a, **kw): def __set_state_ready(self): self.progressBarFinished() - self.setBlocking(False) + self.setInvalidated(False) self.setStatusMessage("") def __set_state_busy(self): self.progressBarInit() - self.setBlocking(True) + self.setInvalidated(True) def __start_task(self, task, state): # type: (Callable[[], Any], TaskState) -> None @@ -407,10 +407,11 @@ def set_data(self, data): self.controls.pca_components.setEnabled(self.apply_pca) if prev_data and self.data and array_equal(prev_data.X, self.data.X): - if self.auto_commit: + if self.auto_commit and not self.isInvalidated(): self._send_data() return + self.cancel() # Clear the outputs self.Outputs.annotated_data.send(None) if Graph is not None: diff --git a/Orange/widgets/unsupervised/tests/test_owkmeans.py b/Orange/widgets/unsupervised/tests/test_owkmeans.py index 5c3fa3af4fd..cf3fffafe6c 100644 --- a/Orange/widgets/unsupervised/tests/test_owkmeans.py +++ b/Orange/widgets/unsupervised/tests/test_owkmeans.py @@ -73,7 +73,7 @@ def test_optimization_report_display(self): self.assertFalse(self.widget.mainArea.isHidden()) self.widget.apply_button.button.click() - self.wait_until_stop_blocking() + self.wait_until_finished() self.assertEqual(self.widget.table_view.model().rowCount() > 0, True) def test_changing_k_changes_radio(self): @@ -284,13 +284,13 @@ def test_optimization_fails(self): self.KMeansFail.fail_on = set(range(3, 9)) widget.invalidate() - self.wait_until_stop_blocking() + self.wait_until_finished() self.assertTrue(widget.Error.failed.is_shown()) self.assertIsNone(self.get_output(self.widget.Outputs.annotated_data)) self.KMeansFail.fail_on = set() widget.invalidate() - self.wait_until_stop_blocking() + self.wait_until_finished() self.assertFalse(widget.Error.failed.is_shown()) self.assertEqual(widget.selected_row(), 0) self.assertIsNotNone(self.get_output(self.widget.Outputs.annotated_data)) @@ -307,7 +307,7 @@ def test_run_fails(self): self.KMeansFail.fail_on = set() self.widget.invalidate() - self.wait_until_stop_blocking() + self.wait_until_finished() self.assertFalse(self.widget.Error.failed.is_shown()) self.assertIsNotNone(self.get_output(self.widget.Outputs.annotated_data)) @@ -429,7 +429,7 @@ def test_invalidate_clusterings_cancels_jobs(self): # Now, invalidate by changing max_iter widget.max_iterations = widget.max_iterations + 1 widget.invalidate() - self.wait_until_stop_blocking() + self.wait_until_finished() self.assertEqual(widget.clusterings, {}) @@ -513,7 +513,7 @@ def test_saved_selection(self): self.widget.send_data = Mock() self.widget.optimize_k = True self.send_signal(self.widget.Inputs.data, self.data) - self.wait_until_stop_blocking() + self.wait_until_finished() self.widget.table_view.selectRow(2) self.assertEqual(self.widget.selected_row(), 2) self.assertEqual(self.widget.send_data.call_count, 3) @@ -522,7 +522,7 @@ def test_saved_selection(self): w = self.create_widget(OWKMeans, stored_settings=settings) w.send_data = Mock() self.send_signal(w.Inputs.data, self.data, widget=w) - self.wait_until_stop_blocking(widget=w) + self.wait_until_finished(widget=w) self.assertEqual(w.send_data.call_count, 2) self.assertEqual(self.widget.selected_row(), w.selected_row()) diff --git a/requirements-gui.txt b/requirements-gui.txt index b53fd59bac4..20af12cb452 100644 --- a/requirements-gui.txt +++ b/requirements-gui.txt @@ -1,5 +1,5 @@ orange-canvas-core>=0.1.9,<0.2a -orange-widget-base>=4.1.0 +orange-widget-base>=4.2.0a # PyQt4/PyQt5 compatibility AnyQt>=0.0.8