Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement onUncaughtError event for thread pools #1777

Open
wants to merge 6 commits into
base: 8.3.0-Dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 57 additions & 16 deletions src/lime/system/ThreadPool.hx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import neko.vm.Thread;
#elseif html5
import lime._internal.backend.html5.HTML5Thread as Thread;
#end
#if (haxe_ver >= 4.1)
import haxe.Exception;
#end

/**
A simple and thread-safe way to run a one or more asynchronous jobs. It
Expand Down Expand Up @@ -167,6 +170,15 @@ class ThreadPool extends WorkOutput
once per job.
**/
public var onRun(default, null) = new Event<State->Void>();
#if (haxe_ver >= 4.1)
/**
Dispatched on the main thread when `doWork` throws an error. Dispatched
at most once per job.

If no listeners have been added, instead the error will be rethrown.
**/
public var onUncaughtError(default, null) = new Event<Exception->Void>();
#end

/**
(Single-threaded mode only.) How important this pool's jobs are relative
Expand Down Expand Up @@ -428,9 +440,9 @@ class ThreadPool extends WorkOutput
event.job.doWork.dispatch(event.job.state, output);
}
}
catch (e:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end)
catch (e:#if (haxe_ver >= 4.1) Exception #else Dynamic #end)
{
output.sendError(e);
output.sendUncaughtError(e);
}

output.activeJob = null;
Expand Down Expand Up @@ -526,9 +538,9 @@ class ThreadPool extends WorkOutput
}
while (!__jobComplete.value && timeElapsed < maxTimeElapsed);
}
catch (e:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end)
catch (e:#if (haxe_ver >= 4.1) Exception #else Dynamic #end)
{
sendError(e);
sendUncaughtError(e);
}

activeJob.duration += timeElapsed;
Expand Down Expand Up @@ -562,16 +574,7 @@ class ThreadPool extends WorkOutput
case PROGRESS:
onProgress.dispatch(threadEvent.message);

case COMPLETE, ERROR:
if (threadEvent.event == COMPLETE)
{
onComplete.dispatch(threadEvent.message);
}
else
{
onError.dispatch(threadEvent.message);
}

case COMPLETE, ERROR, UNCAUGHT_ERROR:
__activeJobs.remove(activeJob);

#if lime_threads
Expand All @@ -593,13 +596,51 @@ class ThreadPool extends WorkOutput

completed = threadEvent.event == COMPLETE && activeJobs == 0 && __jobQueue.length == 0;

if (threadEvent.event == COMPLETE)
{
onComplete.dispatch(threadEvent.message);
}
else if (threadEvent.event == ERROR)
{
onError.dispatch(threadEvent.message);
}
else
{
var message:String;

#if (haxe_ver >= 4.1)
if (Std.isOfType(threadEvent.message, Exception))
{
if (onUncaughtError.__listeners.length > 0)
{
onUncaughtError.dispatch(threadEvent.message);
message = null;
}
else
{
message = (threadEvent.message:Exception).details();
}
}
else
#end
{
message = Std.string(threadEvent.message);
}

if (message != null)
{
activeJob = null;
Log.error(message);
}
}

default:
}

activeJob = null;
}

if (completed)
if (activeJobs == 0 && __jobQueue.length == 0)
{
Application.current.onUpdate.remove(__update);
}
Expand Down Expand Up @@ -753,7 +794,7 @@ class JobList
__addingWorkPriority = length > 0;
return true;
}
else if (removeByID(job.id))
else if (job != null && removeByID(job.id))
{
return true;
}
Expand Down
28 changes: 25 additions & 3 deletions src/lime/system/WorkOutput.hx
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ class WorkOutput
private var __jobOutput:Deque<ThreadEvent> = new Deque();
/**
Thread-local storage. Tracks whether `sendError()` or `sendComplete()`
was called by this job.
was called by this job, or if the job threw an "uncaught" error.
**/
private var __jobComplete:Tls<Bool> = new Tls();

/**
The job that is currently running on this thread, or the job that
triggered the ongoing `onComplete`, `onError`, or `onProgress` event.
Will be null in all other cases.
triggered the ongoing `onComplete`, `onError`, `onUncaughtError`, or
`onProgress` event. Will be null in all other cases.
**/
public var activeJob(get, set):Null<JobData>;
@:noCompletion private var __activeJob:Tls<JobData> = new Tls();
Expand Down Expand Up @@ -139,6 +139,24 @@ class WorkOutput
}
}

private function sendUncaughtError(message:Dynamic):Void
{
if (!__jobComplete.value)
{
__jobComplete.value = true;

#if (lime_threads && html5)
if (mode == MULTI_THREADED)
{
activeJob.doWork.makePortable();
Thread.returnMessage(new ThreadEvent(UNCAUGHT_ERROR, message, activeJob));
}
else
#end
__jobOutput.add(new ThreadEvent(UNCAUGHT_ERROR, message, activeJob));
}
}

/**
Dispatches `onProgress` on the main thread, with the given message. This
can be called any number of times per job.
Expand Down Expand Up @@ -351,6 +369,10 @@ class JobData
Sent by the background thread, indicating failure.
**/
var ERROR = "ERROR";
/**
Sent by the background thread, indicating failure.
**/
var UNCAUGHT_ERROR = "UNCAUGHT_ERROR";
/**
Sent by the background thread.
**/
Expand Down
Loading