Skip to content

Commit 6c3eaf6

Browse files
authored
[JBPM-10209] Switching to CMT (#2362)
* [JBPM-10209] Switching to CMT * [JBPM-10209] A bit of refactor * [JBPM-10209] Gonzalos comments * [JBPM-10209] Disposable handling
1 parent 0d7951c commit 6c3eaf6

File tree

6 files changed

+115
-154
lines changed

6 files changed

+115
-154
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.jbpm.process.core;
18+
19+
import org.kie.internal.runtime.manager.Disposable;
20+
import org.kie.internal.runtime.manager.InternalRuntimeEngine;
21+
22+
public interface DisposableRuntimeEngine extends InternalRuntimeEngine, Disposable {
23+
boolean isDisposed();
24+
}

jbpm-flow/src/main/java/org/jbpm/process/core/timer/impl/GlobalTimerService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.drools.core.time.impl.DefaultJobHandle;
3838
import org.drools.core.time.impl.TimerJobFactoryManager;
3939
import org.drools.core.time.impl.TimerJobInstance;
40+
import org.jbpm.process.core.DisposableRuntimeEngine;
4041
import org.jbpm.process.core.timer.GlobalSchedulerService;
4142
import org.jbpm.process.core.timer.NamedJobContext;
4243
import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext;
@@ -420,7 +421,10 @@ public Environment getEnvironment() {
420421

421422
return runtime.getKieSession().getEnvironment();
422423
}
423-
424+
425+
public boolean isDisposed() {
426+
return runtime instanceof DisposableRuntimeEngine && ((DisposableRuntimeEngine)runtime).isDisposed();
427+
}
424428
}
425429

426430
public List<TimerServiceListener> getListeners() {

jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/timer/GlobalJpaTimerJobInstance.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public GlobalJpaTimerJobInstance(Job job, JobContext ctx, Trigger trigger,
6565
@Override
6666
public Void call() throws Exception {
6767
AsyncExecutionMarker.markAsync();
68-
ExecutableRunner runner = null;
68+
ExecutableRunner<?> runner = null;
6969
TransactionManager jtaTm = null;
7070
boolean success = false;
7171
try {
@@ -88,16 +88,16 @@ public Void call() throws Exception {
8888
success = true;
8989
return null;
9090
} catch( Exception e ) {
91-
e.printStackTrace();
91+
logger.error("Exception executing timer", e);
9292
success = false;
9393
throw e;
9494
} finally {
9595
AsyncExecutionMarker.reset();
96-
if (runner != null && runner instanceof DisposableCommandService) {
97-
if (allowedToDispose(((DisposableCommandService) runner).getEnvironment())) {
98-
logger.debug("Allowed to dispose command service from global timer job instance");
99-
((DisposableCommandService) runner).dispose();
100-
}
96+
if (runner != null && runner instanceof DisposableCommandService) {
97+
if (allowedToDispose(((DisposableCommandService) runner))) {
98+
logger.debug("Allowed to dispose command service from global timer job instance");
99+
((DisposableCommandService) runner).dispose();
100+
}
101101
}
102102
closeTansactionIfNeeded(jtaTm, success);
103103
}
@@ -123,7 +123,11 @@ public String toString() {
123123
return "GlobalJpaTimerJobInstance [timerServiceId=" + timerServiceId
124124
+ ", getJobHandle()=" + getJobHandle() + "]";
125125
}
126-
126+
127+
private boolean allowedToDispose(DisposableCommandService disposableCommandService) {
128+
return !disposableCommandService.isDisposed() && allowedToDispose (disposableCommandService.getEnvironment());
129+
}
130+
127131
protected boolean allowedToDispose(Environment environment) {
128132
if (hasEnvironmentEntry(environment, "IS_JTA_TRANSACTION", false)) {
129133
return true;

jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/RuntimeEngineImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@
1919
import java.util.concurrent.CopyOnWriteArrayList;
2020

2121
import org.jbpm.process.audit.JPAAuditLogService;
22+
import org.jbpm.process.core.DisposableRuntimeEngine;
2223
import org.kie.api.runtime.KieSession;
2324
import org.kie.api.runtime.manager.Context;
2425
import org.kie.api.runtime.manager.RuntimeManager;
2526
import org.kie.api.runtime.manager.audit.AuditService;
2627
import org.kie.api.task.TaskService;
27-
import org.kie.internal.runtime.manager.Disposable;
2828
import org.kie.internal.runtime.manager.DisposeListener;
29-
import org.kie.internal.runtime.manager.InternalRuntimeEngine;
3029
import org.kie.internal.runtime.manager.InternalRuntimeManager;
3130
import org.kie.internal.runtime.manager.SessionNotFoundException;
3231

@@ -36,7 +35,7 @@
3635
* and work item handlers might be interested in receiving notification when the runtime engine is disposed of,
3736
* in order deactivate themselves too and not receive any other events.
3837
*/
39-
public class RuntimeEngineImpl implements InternalRuntimeEngine, Disposable {
38+
public class RuntimeEngineImpl implements DisposableRuntimeEngine {
4039

4140
private RuntimeEngineInitlializer initializer;
4241
private Context<?> context;
@@ -143,6 +142,7 @@ public void setManager(RuntimeManager manager) {
143142
this.manager = manager;
144143
}
145144

145+
@Override
146146
public boolean isDisposed() {
147147
return disposed;
148148
}

jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EJBTimerScheduler.java

Lines changed: 67 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,18 @@
3232

3333
import javax.annotation.PostConstruct;
3434
import javax.annotation.Resource;
35-
import javax.ejb.ConcurrencyManagement;
36-
import javax.ejb.ConcurrencyManagementType;
3735
import javax.ejb.Lock;
3836
import javax.ejb.LockType;
3937
import javax.ejb.NoSuchObjectLocalException;
38+
import javax.ejb.SessionContext;
4039
import javax.ejb.Singleton;
4140
import javax.ejb.Startup;
4241
import javax.ejb.Timeout;
4342
import javax.ejb.Timer;
4443
import javax.ejb.TimerConfig;
4544
import javax.ejb.TimerHandle;
46-
import javax.ejb.TransactionManagement;
47-
import javax.ejb.TransactionManagementType;
48-
import javax.transaction.RollbackException;
49-
import javax.transaction.UserTransaction;
45+
import javax.ejb.TransactionAttribute;
46+
import javax.ejb.TransactionAttributeType;
5047

5148
import org.drools.core.time.JobHandle;
5249
import org.drools.core.time.impl.TimerJobInstance;
@@ -58,8 +55,6 @@
5855

5956
@Singleton
6057
@Startup
61-
@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER)
62-
@TransactionManagement(TransactionManagementType.BEAN)
6358
@Lock(LockType.READ)
6459
public class EJBTimerScheduler {
6560

@@ -70,17 +65,19 @@ public class EJBTimerScheduler {
7065
private static final Integer TIMER_RETRY_LIMIT = Integer.parseInt(System.getProperty("org.kie.jbpm.timer.retry.limit", "3"));
7166

7267
private static final Integer OVERDUE_WAIT_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.wait", "20000"));
68+
69+
private static final Integer OVERDUE_CHECK_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.check", "200"));
7370

7471
private boolean useLocalCache = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.local.cache", "false"));
7572

7673
private ConcurrentMap<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>();
7774

7875
@Resource
7976
protected javax.ejb.TimerService timerService;
80-
77+
8178
@Resource
82-
protected UserTransaction utx;
83-
79+
protected SessionContext ctx;
80+
8481
public void setUseLocalCache(boolean useLocalCache) {
8582
this.useLocalCache = useLocalCache;
8683
}
@@ -97,112 +94,78 @@ public void executeTimerJob(Timer timer) {
9794
EjbTimerJob timerJob = (EjbTimerJob) timer.getInfo();
9895
TimerJobInstance timerJobInstance = timerJob.getTimerJobInstance();
9996
logger.debug("About to execute timer for job {}", timerJob);
100-
101-
String timerServiceId = ((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId();
102-
10397
// handle overdue timers as ejb timer service might start before all deployments are ready
10498
long time = 0;
105-
while (TimerServiceRegistry.getInstance().get(timerServiceId) == null) {
106-
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
107-
try {
108-
Thread.sleep(500);
109-
} catch (InterruptedException e) {
110-
e.printStackTrace();
111-
}
112-
time += 500;
113-
114-
if (time > OVERDUE_WAIT_TIME) {
115-
logger.debug("No timer service found after waiting {} ms", time);
116-
break;
99+
try {
100+
while (TimerServiceRegistry.getInstance().get(((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId()) == null) {
101+
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
102+
Thread.sleep(OVERDUE_CHECK_TIME);
103+
time += OVERDUE_CHECK_TIME;
104+
if (time > OVERDUE_WAIT_TIME) {
105+
logger.debug("No timer service found after waiting {} ms", time);
106+
break;
107+
}
117108
}
118-
}
109+
} catch (InterruptedException e) {
110+
logger.warn("Thread has been interrupted", e);
111+
Thread.currentThread().interrupt();
112+
}
119113
try {
120-
transaction(this::executeTimerJobInstance, timerJobInstance);
121-
} catch (SessionNotFoundException e) {
122-
logger.warn("Process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, e);
123-
removeUnrecoverableTimer(timerJob, timer);
114+
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
124115
} catch (Exception e) {
125116
recoverTimerJobInstance(timerJob, timer, e);
126117
}
127118
}
128119

129120
private void executeTimerJobInstance(TimerJobInstance timerJobInstance) throws Exception {
130-
try {
131-
((Callable<?>) timerJobInstance).call();
132-
} catch (Exception e) {
133-
throw e;
134-
}
121+
((Callable<?>) timerJobInstance).call();
135122
}
136123

137-
private void removeUnrecoverableTimer(EjbTimerJob ejbTimerJob, Timer timer) {
138-
try {
139-
Transaction<TimerJobInstance> tx = timerJobInstance -> {
140-
if (!this.removeJob(timerJobInstance.getJobHandle(), timer)) {
141-
logger.warn("Session not found for timer {}. Timer could not removed.", ejbTimerJob.getTimerJobInstance());
142-
}
143-
};
144-
transaction(tx, ejbTimerJob.getTimerJobInstance());
145-
} catch (Exception e1) {
146-
logger.warn("There was a problem during timer removal {}", ejbTimerJob.getTimerJobInstance(), e1);
147-
}
148-
}
149-
150-
private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception e) {
151-
if (isSessionNotFound(e)) {
152-
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", ejbTimerJob.getTimerJobInstance(), e);
124+
private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception cause) {
125+
Transaction<TimerJobInstance> tx;
126+
if (isSessionNotFound(cause)) {
153127
// if session is not found means the process has already finished. In this case we just need to remove
154128
// the timer and avoid any recovery as it should not trigger any more timers.
155-
removeUnrecoverableTimer(ejbTimerJob, timer);
156-
return;
129+
tx = timerJobInstance -> {
130+
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, cause);
131+
if (!removeJob(timerJobInstance.getJobHandle(), timer)) {
132+
logger.warn("Session not found for timer {}. Timer could not removed.", timerJobInstance);
133+
}
134+
};
157135
}
158-
159-
if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
136+
else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
160137
// this is an interval trigger. Problem here is that the timer scheduled by DefaultTimerJobInstance is lost
161138
// because of the transaction, so we need to do this here.
162-
try {
163-
164-
logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", ejbTimerJob.getTimerJobInstance());
165-
Transaction<TimerJobInstance> tx = timerJobInstance -> {
166-
if (this.removeJob(timerJobInstance.getJobHandle(), null)) {
167-
this.internalSchedule(timerJobInstance);
168-
} else {
169-
logger.debug("Interval trigger {} was removed before rescheduling", ejbTimerJob.getTimerJobInstance());
170-
}
171-
};
172-
transaction(tx, ejbTimerJob.getTimerJobInstance());
173-
} catch (Exception e1) {
174-
logger.warn("Could not schedule the interval trigger {}", ejbTimerJob.getTimerJobInstance(), e1);
175-
}
176-
return;
139+
tx = timerJobInstance -> {
140+
logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance);
141+
if (removeJob(timerJobInstance.getJobHandle(), null)) {
142+
internalSchedule(timerJobInstance);
143+
} else {
144+
logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance);
145+
}
146+
};
147+
}
148+
else {
149+
// if there is not next date to be fired, we need to apply policy otherwise will be lost
150+
tx = timerJobInstance -> {
151+
logger.warn("Execution of time failed. The timer will be retried {}", timerJobInstance);
152+
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
153+
EjbTimerJobRetry info = ejbTimerJob instanceof EjbTimerJobRetry ? ((EjbTimerJobRetry) ejbTimerJob).next() : new EjbTimerJobRetry(timerJobInstance);
154+
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
155+
logger.warn("The timer {} reached retry limit {}. It won't be retried again", timerJobInstance, TIMER_RETRY_LIMIT);
156+
} else {
157+
TimerConfig config = new TimerConfig(info, true);
158+
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
159+
((GlobalJpaTimerJobInstance) timerJobInstance).setTimerInfo(newTimer.getHandle());
160+
((GlobalJpaTimerJobInstance) timerJobInstance).setExternalTimerId(getPlatformTimerId(newTimer));
161+
}
162+
};
177163
}
178-
179-
// if there is not next date to be fired, we need to apply policy otherwise will be lost
180-
181-
logger.warn("Execution of time failed. The timer will be retried {}", ejbTimerJob.getTimerJobInstance());
182-
Transaction<TimerJobInstance> operation = (instance) -> {
183-
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
184-
EjbTimerJobRetry info = null;
185-
if(ejbTimerJob instanceof EjbTimerJobRetry) {
186-
info = ((EjbTimerJobRetry) ejbTimerJob).next();
187-
} else {
188-
info = new EjbTimerJobRetry(instance);
189-
}
190-
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
191-
logger.warn("The timer {} reached retry limit {}. It won't be retried again", instance, TIMER_RETRY_LIMIT);
192-
return;
193-
}
194-
TimerConfig config = new TimerConfig(info, true);
195-
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
196-
TimerHandle handler = newTimer.getHandle();
197-
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setTimerInfo(handler);
198-
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setExternalTimerId(getPlatformTimerId(newTimer));
199-
};
200164
try {
201-
transaction(operation, ejbTimerJob.getTimerJobInstance());
202-
} catch (Exception e1) {
203-
logger.error("Failed to executed timer recovery {}", e1.getMessage(), e1);
165+
invokeTransaction (tx, ejbTimerJob.getTimerJobInstance());
166+
} catch (Exception e) {
167+
logger.error("Failed to executed timer recovery", e);
204168
}
205-
206169
}
207170

208171
private boolean isSessionNotFound(Exception e) {
@@ -218,30 +181,16 @@ private boolean isSessionNotFound(Exception e) {
218181

219182
@FunctionalInterface
220183
private interface Transaction<I> {
221-
222184
void doWork(I item) throws Exception;
223185
}
224186

225-
private <I> void transaction(Transaction<I> operation, I item) throws Exception {
226-
try {
227-
utx.begin();
228-
operation.doWork(item);
229-
utx.commit();
230-
} catch(RollbackException e) {
231-
logger.warn("Transaction was rolled back for {} with status {}", item, utx.getStatus());
232-
if(utx.getStatus() == javax.transaction.Status.STATUS_ACTIVE) {
233-
utx.rollback();
234-
}
235-
throw new RuntimeException("jbpm timer has been rolledback", e);
236-
} catch (Exception e) {
237-
try {
238-
utx.rollback();
239-
} catch (Exception re) {
240-
logger.error("transaction could not be rolled back", re);
241-
}
242-
243-
throw e;
244-
}
187+
@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
188+
public <I> void transaction(Transaction<I> operation, I item) throws Exception {
189+
operation.doWork(item);
190+
}
191+
192+
private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception {
193+
ctx.getBusinessObject(EJBTimerScheduler.class).transaction(operation,item);
245194
}
246195

247196
public void internalSchedule(TimerJobInstance timerJobInstance) {

0 commit comments

Comments
 (0)