Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JBPM-10209] Switching to CMT #2362

Merged
merged 4 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.process.core;

import org.kie.internal.runtime.manager.Disposable;
import org.kie.internal.runtime.manager.InternalRuntimeEngine;

public interface DisposableRuntimeEngine extends InternalRuntimeEngine, Disposable {
boolean isDisposed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.drools.core.time.impl.DefaultJobHandle;
import org.drools.core.time.impl.TimerJobFactoryManager;
import org.drools.core.time.impl.TimerJobInstance;
import org.jbpm.process.core.DisposableRuntimeEngine;
import org.jbpm.process.core.timer.GlobalSchedulerService;
import org.jbpm.process.core.timer.NamedJobContext;
import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext;
Expand Down Expand Up @@ -420,7 +421,10 @@ public Environment getEnvironment() {

return runtime.getKieSession().getEnvironment();
}


public boolean isDisposed() {
return runtime instanceof DisposableRuntimeEngine && ((DisposableRuntimeEngine)runtime).isDisposed();
}
}

public List<TimerServiceListener> getListeners() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public GlobalJpaTimerJobInstance(Job job, JobContext ctx, Trigger trigger,
@Override
public Void call() throws Exception {
AsyncExecutionMarker.markAsync();
ExecutableRunner runner = null;
ExecutableRunner<?> runner = null;
TransactionManager jtaTm = null;
boolean success = false;
try {
Expand All @@ -88,16 +88,16 @@ public Void call() throws Exception {
success = true;
return null;
} catch( Exception e ) {
e.printStackTrace();
logger.error("Exception executing timer", e);
success = false;
throw e;
} finally {
AsyncExecutionMarker.reset();
if (runner != null && runner instanceof DisposableCommandService) {
if (allowedToDispose(((DisposableCommandService) runner).getEnvironment())) {
logger.debug("Allowed to dispose command service from global timer job instance");
((DisposableCommandService) runner).dispose();
}
if (runner != null && runner instanceof DisposableCommandService) {
if (allowedToDispose(((DisposableCommandService) runner))) {
logger.debug("Allowed to dispose command service from global timer job instance");
((DisposableCommandService) runner).dispose();
}
}
closeTansactionIfNeeded(jtaTm, success);
}
Expand All @@ -123,7 +123,11 @@ public String toString() {
return "GlobalJpaTimerJobInstance [timerServiceId=" + timerServiceId
+ ", getJobHandle()=" + getJobHandle() + "]";
}


private boolean allowedToDispose(DisposableCommandService disposableCommandService) {
return !disposableCommandService.isDisposed() && allowedToDispose (disposableCommandService.getEnvironment());
}

protected boolean allowedToDispose(Environment environment) {
if (hasEnvironmentEntry(environment, "IS_JTA_TRANSACTION", false)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import java.util.concurrent.CopyOnWriteArrayList;

import org.jbpm.process.audit.JPAAuditLogService;
import org.jbpm.process.core.DisposableRuntimeEngine;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.manager.Context;
import org.kie.api.runtime.manager.RuntimeManager;
import org.kie.api.runtime.manager.audit.AuditService;
import org.kie.api.task.TaskService;
import org.kie.internal.runtime.manager.Disposable;
import org.kie.internal.runtime.manager.DisposeListener;
import org.kie.internal.runtime.manager.InternalRuntimeEngine;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.kie.internal.runtime.manager.SessionNotFoundException;

Expand All @@ -36,7 +35,7 @@
* and work item handlers might be interested in receiving notification when the runtime engine is disposed of,
* in order deactivate themselves too and not receive any other events.
*/
public class RuntimeEngineImpl implements InternalRuntimeEngine, Disposable {
public class RuntimeEngineImpl implements DisposableRuntimeEngine {

private RuntimeEngineInitlializer initializer;
private Context<?> context;
Expand Down Expand Up @@ -143,6 +142,7 @@ public void setManager(RuntimeManager manager) {
this.manager = manager;
}

@Override
public boolean isDisposed() {
return disposed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,18 @@

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.NoSuchObjectLocalException;
import javax.ejb.SessionContext;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerHandle;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.transaction.RollbackException;
import javax.transaction.UserTransaction;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import org.drools.core.time.JobHandle;
import org.drools.core.time.impl.TimerJobInstance;
Expand All @@ -58,8 +55,6 @@

@Singleton
@Startup
@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER)
@TransactionManagement(TransactionManagementType.BEAN)
@Lock(LockType.READ)
public class EJBTimerScheduler {

Expand All @@ -70,17 +65,19 @@ public class EJBTimerScheduler {
private static final Integer TIMER_RETRY_LIMIT = Integer.parseInt(System.getProperty("org.kie.jbpm.timer.retry.limit", "3"));

private static final Integer OVERDUE_WAIT_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.wait", "20000"));

private static final Integer OVERDUE_CHECK_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.check", "200"));
fjtirado marked this conversation as resolved.
Show resolved Hide resolved

private boolean useLocalCache = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.local.cache", "false"));

private ConcurrentMap<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>();

@Resource
protected javax.ejb.TimerService timerService;

@Resource
protected UserTransaction utx;

protected SessionContext ctx;
public void setUseLocalCache(boolean useLocalCache) {
this.useLocalCache = useLocalCache;
}
Expand All @@ -97,112 +94,78 @@ public void executeTimerJob(Timer timer) {
EjbTimerJob timerJob = (EjbTimerJob) timer.getInfo();
TimerJobInstance timerJobInstance = timerJob.getTimerJobInstance();
logger.debug("About to execute timer for job {}", timerJob);

String timerServiceId = ((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId();

// handle overdue timers as ejb timer service might start before all deployments are ready
long time = 0;
while (TimerServiceRegistry.getInstance().get(timerServiceId) == null) {
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
time += 500;

if (time > OVERDUE_WAIT_TIME) {
logger.debug("No timer service found after waiting {} ms", time);
break;
try {
while (TimerServiceRegistry.getInstance().get(((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId()) == null) {
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
Thread.sleep(OVERDUE_CHECK_TIME);
time += OVERDUE_CHECK_TIME;
if (time > OVERDUE_WAIT_TIME) {
logger.debug("No timer service found after waiting {} ms", time);
break;
}
}
}
} catch (InterruptedException e) {
logger.warn("Thread has been interrupted", e);
Thread.currentThread().interrupt();
}
try {
transaction(this::executeTimerJobInstance, timerJobInstance);
} catch (SessionNotFoundException e) {
logger.warn("Process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, e);
removeUnrecoverableTimer(timerJob, timer);
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
} catch (Exception e) {
recoverTimerJobInstance(timerJob, timer, e);
}
}

private void executeTimerJobInstance(TimerJobInstance timerJobInstance) throws Exception {
try {
((Callable<?>) timerJobInstance).call();
} catch (Exception e) {
throw e;
}
((Callable<?>) timerJobInstance).call();
}

private void removeUnrecoverableTimer(EjbTimerJob ejbTimerJob, Timer timer) {
try {
Transaction<TimerJobInstance> tx = timerJobInstance -> {
if (!this.removeJob(timerJobInstance.getJobHandle(), timer)) {
logger.warn("Session not found for timer {}. Timer could not removed.", ejbTimerJob.getTimerJobInstance());
}
};
transaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("There was a problem during timer removal {}", ejbTimerJob.getTimerJobInstance(), e1);
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception e) {
if (isSessionNotFound(e)) {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", ejbTimerJob.getTimerJobInstance(), e);
private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception cause) {
Transaction<TimerJobInstance> tx;
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
if (isSessionNotFound(cause)) {
// if session is not found means the process has already finished. In this case we just need to remove
// the timer and avoid any recovery as it should not trigger any more timers.
removeUnrecoverableTimer(ejbTimerJob, timer);
return;
tx = timerJobInstance -> {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, cause);
if (!removeJob(timerJobInstance.getJobHandle(), timer)) {
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
logger.warn("Session not found for timer {}. Timer could not removed.", timerJobInstance);
}
};
}

if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
// this is an interval trigger. Problem here is that the timer scheduled by DefaultTimerJobInstance is lost
// because of the transaction, so we need to do this here.
try {

logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", ejbTimerJob.getTimerJobInstance());
Transaction<TimerJobInstance> tx = timerJobInstance -> {
if (this.removeJob(timerJobInstance.getJobHandle(), null)) {
this.internalSchedule(timerJobInstance);
} else {
logger.debug("Interval trigger {} was removed before rescheduling", ejbTimerJob.getTimerJobInstance());
}
};
transaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("Could not schedule the interval trigger {}", ejbTimerJob.getTimerJobInstance(), e1);
}
return;
tx = timerJobInstance -> {
logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance);
if (removeJob(timerJobInstance.getJobHandle(), null)) {
internalSchedule(timerJobInstance);
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance);
}
};
}
else {
// if there is not next date to be fired, we need to apply policy otherwise will be lost
tx = timerJobInstance -> {
logger.warn("Execution of time failed. The timer will be retried {}", timerJobInstance);
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = ejbTimerJob instanceof EjbTimerJobRetry ? ((EjbTimerJobRetry) ejbTimerJob).next() : new EjbTimerJobRetry(timerJobInstance);
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", timerJobInstance, TIMER_RETRY_LIMIT);
} else {
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
((GlobalJpaTimerJobInstance) timerJobInstance).setTimerInfo(newTimer.getHandle());
((GlobalJpaTimerJobInstance) timerJobInstance).setExternalTimerId(getPlatformTimerId(newTimer));
}
};
}

// if there is not next date to be fired, we need to apply policy otherwise will be lost

logger.warn("Execution of time failed. The timer will be retried {}", ejbTimerJob.getTimerJobInstance());
Transaction<TimerJobInstance> operation = (instance) -> {
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = null;
if(ejbTimerJob instanceof EjbTimerJobRetry) {
info = ((EjbTimerJobRetry) ejbTimerJob).next();
} else {
info = new EjbTimerJobRetry(instance);
}
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", instance, TIMER_RETRY_LIMIT);
return;
}
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
TimerHandle handler = newTimer.getHandle();
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setTimerInfo(handler);
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setExternalTimerId(getPlatformTimerId(newTimer));
};
try {
transaction(operation, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.error("Failed to executed timer recovery {}", e1.getMessage(), e1);
invokeTransaction (tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e) {
logger.error("Failed to executed timer recovery", e);
}

}

private boolean isSessionNotFound(Exception e) {
Expand All @@ -218,30 +181,16 @@ private boolean isSessionNotFound(Exception e) {

@FunctionalInterface
private interface Transaction<I> {

void doWork(I item) throws Exception;
}

private <I> void transaction(Transaction<I> operation, I item) throws Exception {
try {
utx.begin();
operation.doWork(item);
utx.commit();
} catch(RollbackException e) {
logger.warn("Transaction was rolled back for {} with status {}", item, utx.getStatus());
if(utx.getStatus() == javax.transaction.Status.STATUS_ACTIVE) {
utx.rollback();
}
throw new RuntimeException("jbpm timer has been rolledback", e);
} catch (Exception e) {
try {
utx.rollback();
} catch (Exception re) {
logger.error("transaction could not be rolled back", re);
}

throw e;
}
@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
public <I> void transaction(Transaction<I> operation, I item) throws Exception {
operation.doWork(item);
}

private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception {
ctx.getBusinessObject(EJBTimerScheduler.class).transaction(operation,item);
}

public void internalSchedule(TimerJobInstance timerJobInstance) {
Expand Down
Loading
Loading