diff --git a/jberet-core/src/main/java/org/jberet/operations/JobOperatorImpl.java b/jberet-core/src/main/java/org/jberet/operations/DefaultJobOperatorImpl.java similarity index 87% rename from jberet-core/src/main/java/org/jberet/operations/JobOperatorImpl.java rename to jberet-core/src/main/java/org/jberet/operations/DefaultJobOperatorImpl.java index 2b26288b1..cc8dcfab1 100644 --- a/jberet-core/src/main/java/org/jberet/operations/JobOperatorImpl.java +++ b/jberet-core/src/main/java/org/jberet/operations/DefaultJobOperatorImpl.java @@ -21,7 +21,7 @@ import org.jberet.spi.BatchEnvironment; import org.wildfly.security.manager.WildFlySecurityManager; -public class JobOperatorImpl extends AbstractJobOperator implements JobOperator { +public class DefaultJobOperatorImpl extends AbstractJobOperator implements JobOperator { private static final PrivilegedAction loaderAction = new PrivilegedAction() { @Override @@ -37,11 +37,11 @@ public BatchEnvironment run() { final JobRepository repository; private final BatchEnvironment batchEnvironment; - public JobOperatorImpl() throws BatchRuntimeException { + public DefaultJobOperatorImpl() throws BatchRuntimeException { this(WildFlySecurityManager.isChecking() ? AccessController.doPrivileged(loaderAction) : loaderAction.run()); } - public JobOperatorImpl(final BatchEnvironment batchEnvironment) throws BatchRuntimeException { + public DefaultJobOperatorImpl(final BatchEnvironment batchEnvironment) throws BatchRuntimeException { if (batchEnvironment == null) { throw BatchMessages.MESSAGES.batchEnvironmentNotFound(); } diff --git a/jberet-core/src/main/java/org/jberet/operations/ForceStopJobOperatorImpl.java b/jberet-core/src/main/java/org/jberet/operations/ForceStopJobOperatorImpl.java new file mode 100644 index 000000000..572a76730 --- /dev/null +++ b/jberet-core/src/main/java/org/jberet/operations/ForceStopJobOperatorImpl.java @@ -0,0 +1,13 @@ +package org.jberet.operations; + +import jakarta.batch.runtime.BatchStatus; +import org.jberet.runtime.JobExecutionImpl; + +public class ForceStopJobOperatorImpl extends DefaultJobOperatorImpl { + public void forceStop(final long executionId) { + final JobExecutionImpl jobExecution = getJobExecutionImpl(executionId); + jobExecution.setBatchStatus(BatchStatus.STOPPED); + jobExecution.setLastUpdatedTime(System.currentTimeMillis()); + getJobRepository().updateJobExecution(jobExecution, false, false); + } +} diff --git a/jberet-core/src/main/java/org/jberet/repository/InMemoryRepository.java b/jberet-core/src/main/java/org/jberet/repository/InMemoryRepository.java index f720c7f44..f3cc82f6a 100644 --- a/jberet-core/src/main/java/org/jberet/repository/InMemoryRepository.java +++ b/jberet-core/src/main/java/org/jberet/repository/InMemoryRepository.java @@ -217,6 +217,12 @@ public List getJobExecutions(final JobInstance jobInstance) { } } + // todo + @Override + public List getTimeoutJobExecutions(Long timeoutSeconds) { + return List.of(); + } + @Override public List getStepExecutions(final long jobExecutionId, final ClassLoader classLoader) { final JobExecutionImpl jobExecution = getJobExecution(jobExecutionId); diff --git a/jberet-core/src/main/java/org/jberet/repository/JdbcRepository.java b/jberet-core/src/main/java/org/jberet/repository/JdbcRepository.java index 0c1644597..efdecd175 100644 --- a/jberet-core/src/main/java/org/jberet/repository/JdbcRepository.java +++ b/jberet-core/src/main/java/org/jberet/repository/JdbcRepository.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -76,6 +77,7 @@ public final class JdbcRepository extends AbstractPersistentRepository { private static final String SELECT_ALL_JOB_EXECUTIONS = "select-all-job-executions"; private static final String SELECT_JOB_EXECUTIONS_BY_JOB_INSTANCE_ID = "select-job-executions-by-job-instance-id"; + private static final String SELECT_JOB_EXECUTIONS_BY_TIMEOUT_SECONDS = "select-job-executions-by-timeout-seconds"; private static final String SELECT_RUNNING_JOB_EXECUTIONS_BY_JOB_NAME = "select-running-job-executions-by-job-name"; private static final String SELECT_JOB_EXECUTIONS_BY_JOB_NAME = "select-job-executions-by-job-name"; private static final String SELECT_JOB_EXECUTION = "select-job-execution"; @@ -206,7 +208,7 @@ private void createTables(final Properties configProperties) { final String tablePrefix = configProperties.getProperty(DB_TABLE_PREFIX_KEY, "").trim(); final String tableSuffix = configProperties.getProperty(DB_TABLE_SUFFIX_KEY, "").trim(); final Pattern tableNamesPattern = tablePrefix.length() > 0 || tableSuffix.length() > 0 ? - Pattern.compile("JOB_INSTANCE|JOB_EXECUTION|STEP_EXECUTION|PARTITION_EXECUTION"): null; + Pattern.compile("JOB_INSTANCE|JOB_EXECUTION|STEP_EXECUTION|PARTITION_EXECUTION") : null; final InputStream sqlResource = getClassLoader(false).getResourceAsStream(sqlFile); try { @@ -289,7 +291,7 @@ private void createTables(final Properties configProperties) { countJobInstancesStatement.setString(1, "A"); rs = countJobInstancesStatement.executeQuery(); BatchLogger.LOGGER.tracef( - "This invocation needed to create tables since they didn't exit, but failed to create because they've been created by another concurrent invocation, so ignore the exception and return normally: %s", e1); + "This invocation needed to create tables since they didn't exit, but failed to create because they've been created by another concurrent invocation, so ignore the exception and return normally: %s", e1); } catch (final SQLException sqle) { //still cannot access the table, so fail it throw BatchMessages.MESSAGES.failToCreateTables(e1, databaseProductName, ddlFile); @@ -564,15 +566,15 @@ public JobExecutionImpl getJobExecution(final long jobExecutionId) { if (result.getEndTime() == null && rs.getTimestamp(TableColumns.ENDTIME) != null) { final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)); result = new JobExecutionImpl(getJobInstance(jobInstanceId), - jobExecutionId, - BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)), - rs.getTimestamp(TableColumns.CREATETIME), - rs.getTimestamp(TableColumns.STARTTIME), - rs.getTimestamp(TableColumns.ENDTIME), - rs.getTimestamp(TableColumns.LASTUPDATEDTIME), - rs.getString(TableColumns.BATCHSTATUS), - rs.getString(TableColumns.EXITSTATUS), - rs.getString(TableColumns.RESTARTPOSITION)); + jobExecutionId, + BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)), + rs.getTimestamp(TableColumns.CREATETIME), + rs.getTimestamp(TableColumns.STARTTIME), + rs.getTimestamp(TableColumns.ENDTIME), + rs.getTimestamp(TableColumns.LASTUPDATEDTIME), + rs.getString(TableColumns.BATCHSTATUS), + rs.getString(TableColumns.EXITSTATUS), + rs.getString(TableColumns.RESTARTPOSITION)); jobExecutions.replace(jobExecutionId, new SoftReference(result, jobExecutionReferenceQueue, jobExecutionId)); } @@ -631,10 +633,10 @@ public List getJobExecutions(final JobInstance jobInstance) { final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)); jobExecution1 = new JobExecutionImpl(getJobInstance(jobInstanceId), executionId, jobParameters1, - rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME), - rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME), - rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS), - rs.getString(TableColumns.RESTARTPOSITION)); + rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME), + rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME), + rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS), + rs.getString(TableColumns.RESTARTPOSITION)); jobExecutions.replace(executionId, new SoftReference(jobExecution1, jobExecutionReferenceQueue, executionId)); } @@ -650,12 +652,67 @@ public List getJobExecutions(final JobInstance jobInstance) { return result; } + @Override + public List getTimeoutJobExecutions(Long timeoutSeconds) { + final String query = sqls.getProperty(SELECT_JOB_EXECUTIONS_BY_TIMEOUT_SECONDS); + long jobInstanceId = 0; + final List result = new ArrayList(); + final Connection connection = getConnection(); + ResultSet rs = null; + PreparedStatement preparedStatement = null; + try { + preparedStatement = connection.prepareStatement(query); + + Timestamp timeoutTime = Timestamp.from(Instant.now().plusSeconds(timeoutSeconds)); + + preparedStatement.setTimestamp(1, timeoutTime); + rs = preparedStatement.executeQuery(); + while (rs.next()) { + final long executionId = rs.getLong(TableColumns.JOBEXECUTIONID); + final SoftReference ref = jobExecutions.get(executionId); + JobExecutionImpl jobExecution1 = (ref != null) ? ref.get() : null; + if (jobExecution1 == null) { + jobInstanceId = rs.getLong(TableColumns.JOBINSTANCEID); + final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)); + jobExecution1 = + new JobExecutionImpl(getJobInstance(jobInstanceId), executionId, jobParameters1, + rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME), + rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME), + rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS), + rs.getString(TableColumns.RESTARTPOSITION)); + + jobExecutions.put(executionId, + new SoftReference(jobExecution1, jobExecutionReferenceQueue, executionId)); + } else { + if (jobExecution1.getEndTime() == null && rs.getTimestamp(TableColumns.ENDTIME) != null) { + final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)); + jobExecution1 = + new JobExecutionImpl(getJobInstance(jobInstanceId), executionId, jobParameters1, + rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME), + rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME), + rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS), + rs.getString(TableColumns.RESTARTPOSITION)); + jobExecutions.replace(executionId, + new SoftReference(jobExecution1, jobExecutionReferenceQueue, executionId)); + } + } + // jobExecution1 is either got from the cache, or created, now add it to the result list + result.add(jobExecution1); + } + } catch (final Exception e) { + throw BatchMessages.MESSAGES.failToRunQuery(e, query); + } finally { + close(connection, preparedStatement, null, rs); + } + return result; + } + private boolean isExecutionStale(final JobExecutionImpl jobExecution) { final BatchStatus jobStatus = jobExecution.getBatchStatus(); if (jobStatus.equals(BatchStatus.COMPLETED) || - jobStatus.equals(BatchStatus.FAILED) || - jobStatus.equals(BatchStatus.STOPPED) || - jobStatus.equals(BatchStatus.ABANDONED) || jobExecution.getStepExecutions().size() >= 1) { + jobStatus.equals(BatchStatus.FAILED) || + jobStatus.equals(BatchStatus.STOPPED) || + jobStatus.equals(BatchStatus.ABANDONED) || jobExecution.getStepExecutions().size() >= 1) { return false; } @@ -878,8 +935,9 @@ public List getPartitionExecutions(final long stepExecut /** * Updates the partition execution in job repository, using the {@code updateSql} passed in. + * * @param partitionExecution the partition execution to update to job repository - * @param updateSql the update sql to use + * @param updateSql the update sql to use * @return the number of rows affected by this update sql execution */ private int updatePartitionExecution(final PartitionExecutionImpl partitionExecution, final String updateSql) { @@ -906,8 +964,9 @@ private int updatePartitionExecution(final PartitionExecutionImpl partitionExecu /** * Updates the step execution in job repository, using the {@code updateSql} passed in. + * * @param stepExecution the step execution to update to job repository - * @param updateSql the update sql to use + * @param updateSql the update sql to use * @return the number of rows affected by this update sql execution */ private int updateStepExecution0(final StepExecution stepExecution, final String updateSql) { @@ -1058,7 +1117,7 @@ public void executeStatements(final String statements, final String statementsRe } private List getJobExecutions0(final String selectSql, final String jobName, final boolean runningExecutionsOnly, - final Integer limit) { + final Integer limit) { final List result = new ArrayList<>(); Connection connection = null; ResultSet rs = null; diff --git a/jberet-core/src/main/java/org/jberet/repository/JobRepository.java b/jberet-core/src/main/java/org/jberet/repository/JobRepository.java index db104f20b..efc33cf97 100644 --- a/jberet-core/src/main/java/org/jberet/repository/JobRepository.java +++ b/jberet-core/src/main/java/org/jberet/repository/JobRepository.java @@ -18,6 +18,7 @@ import jakarta.batch.runtime.StepExecution; import org.jberet.job.model.Job; +import org.jberet.operations.DefaultJobOperatorImpl; import org.jberet.runtime.AbstractStepExecution; import org.jberet.runtime.JobExecutionImpl; import org.jberet.runtime.JobInstanceImpl; @@ -58,6 +59,8 @@ public interface JobRepository { JobExecution getJobExecution(long jobExecutionId); List getJobExecutions(JobInstance jobInstance); + List getTimeoutJobExecutions(Long timeoutSeconds); + /** * Gets job execution ids belonging to the job identified by the {@code jobName}. * @param jobName the job name identifying the job @@ -66,6 +69,8 @@ public interface JobRepository { */ List getJobExecutionsByJob(String jobName); + + /** * Gets job execution ids belonging to the job identified by the {@code jobName}. * @@ -95,7 +100,7 @@ public interface JobRepository { * @return a list of job execution ids * * @since 1.1.0.Final - * @see org.jberet.operations.JobOperatorImpl#getRunningExecutions(java.lang.String) + * @see DefaultJobOperatorImpl#getRunningExecutions(java.lang.String) */ List getRunningExecutions(final String jobName); diff --git a/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java b/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java index 948babd18..c89a325f0 100644 --- a/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java +++ b/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java @@ -928,7 +928,6 @@ private enum ChunkState { TO_END_RETRY, //need to end retrying the current chunk TO_START_NEW, //the current chunk is done and need to start a new chunk next DEPLETED, //no more input items, the processing can still go to next iteration so this last item can be retried - JOB_STOPPING, //the job has been requested to stop JOB_STOPPED } diff --git a/jberet-core/src/main/java/org/jberet/spi/DefaultJobOperatorContextSelector.java b/jberet-core/src/main/java/org/jberet/spi/DefaultJobOperatorContextSelector.java index 0349f6c39..91feecc9c 100644 --- a/jberet-core/src/main/java/org/jberet/spi/DefaultJobOperatorContextSelector.java +++ b/jberet-core/src/main/java/org/jberet/spi/DefaultJobOperatorContextSelector.java @@ -10,7 +10,7 @@ package org.jberet.spi; -import org.jberet.operations.JobOperatorImpl; +import org.jberet.operations.DefaultJobOperatorImpl; /** * A default context selector. @@ -29,7 +29,7 @@ public class DefaultJobOperatorContextSelector implements JobOperatorContextSele * Creates a new default context selector */ public DefaultJobOperatorContextSelector() { - jobOperatorContext = JobOperatorContext.create(new JobOperatorImpl()); + jobOperatorContext = JobOperatorContext.create(new DefaultJobOperatorImpl()); } @Override diff --git a/jberet-core/src/main/java/org/jberet/spi/ForceStopJobOperatorContextSelector.java b/jberet-core/src/main/java/org/jberet/spi/ForceStopJobOperatorContextSelector.java new file mode 100644 index 000000000..e89b10128 --- /dev/null +++ b/jberet-core/src/main/java/org/jberet/spi/ForceStopJobOperatorContextSelector.java @@ -0,0 +1,19 @@ +package org.jberet.spi; + +import org.jberet.operations.ForceStopJobOperatorImpl; + +public class ForceStopJobOperatorContextSelector implements JobOperatorContextSelector { + private final JobOperatorContext jobOperatorContext; + + /** + * Creates a new default context selector + */ + public ForceStopJobOperatorContextSelector() { + jobOperatorContext = JobOperatorContext.create(new ForceStopJobOperatorImpl()); + } + + @Override + public JobOperatorContext getJobOperatorContext() { + return jobOperatorContext; + } +} diff --git a/jberet-core/src/main/java/org/jberet/spi/JobOperatorContext.java b/jberet-core/src/main/java/org/jberet/spi/JobOperatorContext.java index f74d81358..e6528172f 100644 --- a/jberet-core/src/main/java/org/jberet/spi/JobOperatorContext.java +++ b/jberet-core/src/main/java/org/jberet/spi/JobOperatorContext.java @@ -12,7 +12,7 @@ import jakarta.batch.operations.JobOperator; -import org.jberet.operations.JobOperatorImpl; +import org.jberet.operations.DefaultJobOperatorImpl; import org.jberet.util.Assertions; /** @@ -58,7 +58,7 @@ public static JobOperatorContext getJobOperatorContext() { * @return the new context */ public static JobOperatorContext create(final BatchEnvironment batchEnvironment) { - final JobOperator jobOperator = new JobOperatorImpl(Assertions.notNull(batchEnvironment, "batchEnvironment")); + final JobOperator jobOperator = new DefaultJobOperatorImpl(Assertions.notNull(batchEnvironment, "batchEnvironment")); return new JobOperatorContext() { @Override public JobOperator getJobOperator() { diff --git a/jberet-core/src/main/resources/sql/jberet-sql.properties b/jberet-core/src/main/resources/sql/jberet-sql.properties index 316e0f715..7b74f098e 100644 --- a/jberet-core/src/main/resources/sql/jberet-sql.properties +++ b/jberet-core/src/main/resources/sql/jberet-sql.properties @@ -6,6 +6,9 @@ insert-job-instance = INSERT INTO JOB_INSTANCE(JOBNAME, APPLICATIONNAME) VALUES( select-all-job-executions = SELECT * FROM JOB_EXECUTION select-job-executions-by-job-instance-id = SELECT * FROM JOB_EXECUTION WHERE JOBINSTANCEID=? ORDER BY JOBEXECUTIONID + +select-job-executions-by-timeout-seconds = SELECT * FROM JOB_EXECUTION WHERE lastupdatedtime < ? AND batchstatus in ('STOPPING', 'STARTED', 'STARTING') + select-job-execution = SELECT * FROM JOB_EXECUTION WHERE JOBEXECUTIONID=? select-running-job-executions-by-job-name = SELECT JOB_EXECUTION.JOBEXECUTIONID FROM JOB_EXECUTION \ INNER JOIN JOB_INSTANCE ON JOB_EXECUTION.JOBINSTANCEID=JOB_INSTANCE.JOBINSTANCEID \ diff --git a/jberet-core/src/test/java/org/jberet/test/JobRepositoryTest.java b/jberet-core/src/test/java/org/jberet/test/JobRepositoryTest.java index 7d0b6ee9f..f91d48426 100644 --- a/jberet-core/src/test/java/org/jberet/test/JobRepositoryTest.java +++ b/jberet-core/src/test/java/org/jberet/test/JobRepositoryTest.java @@ -15,6 +15,7 @@ import java.util.Properties; import javax.transaction.xa.XAResource; +import jakarta.batch.runtime.JobExecution; import jakarta.transaction.HeuristicMixedException; import jakarta.transaction.HeuristicRollbackException; import jakarta.transaction.InvalidTransactionException; diff --git a/jberet-job-repositories/infinispan-repository/src/main/java/org/jberet/repository/InfinispanRepository.java b/jberet-job-repositories/infinispan-repository/src/main/java/org/jberet/repository/InfinispanRepository.java index 616bd61bd..62ee4be61 100644 --- a/jberet-job-repositories/infinispan-repository/src/main/java/org/jberet/repository/InfinispanRepository.java +++ b/jberet-job-repositories/infinispan-repository/src/main/java/org/jberet/repository/InfinispanRepository.java @@ -183,6 +183,12 @@ public List getJobExecutions(final JobInstance jobInstance) { return result; } + // todo + @Override + public List getTimeoutJobExecutions(Long timeoutSeconds) { + return List.of(); + } + @Override public List getStepExecutions(final long jobExecutionId, final ClassLoader classLoader) { final JobExecutionImpl jobExecution = jobExecutionCache.get(jobExecutionId); diff --git a/jberet-job-repositories/mongo-repository/src/main/java/org/jberet/repository/MongoRepository.java b/jberet-job-repositories/mongo-repository/src/main/java/org/jberet/repository/MongoRepository.java index e308a7c42..87cfdb91a 100644 --- a/jberet-job-repositories/mongo-repository/src/main/java/org/jberet/repository/MongoRepository.java +++ b/jberet-job-repositories/mongo-repository/src/main/java/org/jberet/repository/MongoRepository.java @@ -16,11 +16,13 @@ import java.util.Date; import java.util.List; import java.util.Properties; + import jakarta.batch.runtime.BatchStatus; import jakarta.batch.runtime.JobExecution; import jakarta.batch.runtime.JobInstance; import jakarta.batch.runtime.Metric; import jakarta.batch.runtime.StepExecution; + import javax.naming.InitialContext; import javax.naming.NamingException; @@ -241,7 +243,7 @@ public List getJobExecutions(final JobInstance jobInstance) { long jobInstanceId = jobInstance == null ? 0 : jobInstance.getInstanceId(); FindIterable findIterable = jobInstance == null ? - db.getCollection(TableColumns.JOB_EXECUTION, DBObject.class).find(): + db.getCollection(TableColumns.JOB_EXECUTION, DBObject.class).find() : db.getCollection(TableColumns.JOB_EXECUTION, DBObject.class).find( new BasicDBObject(TableColumns.JOBINSTANCEID, jobInstance.getInstanceId())); @@ -275,6 +277,12 @@ public List getJobExecutions(final JobInstance jobInstance) { return result; } + // todo + @Override + public List getTimeoutJobExecutions(Long timeoutSeconds) { + return List.of(); + } + @Override public List getRunningExecutions(final String jobName) { final List result = new ArrayList(); @@ -471,7 +479,7 @@ public StepExecutionImpl findOriginalStepExecutionForRestart(final String stepNa final BasicDBObject keys = new BasicDBObject(TableColumns.JOBEXECUTIONID, 1); keys.put(TableColumns._id, 0); final MongoCursor cursor = db.getCollection(TableColumns.JOB_EXECUTION, DBObject.class).find( - new BasicDBObject(TableColumns.JOBINSTANCEID, jobExecutionToRestart.getJobInstance().getInstanceId())) + new BasicDBObject(TableColumns.JOBINSTANCEID, jobExecutionToRestart.getJobInstance().getInstanceId())) .projection(keys).iterator(); final BasicDBList basicDBList = new BasicDBList(); while (cursor.hasNext()) { diff --git a/jberet-se/src/test/java/org/jberet/se/test/ForceStopBatchletTest.java b/jberet-se/src/test/java/org/jberet/se/test/ForceStopBatchletTest.java new file mode 100644 index 000000000..d4c9a5b87 --- /dev/null +++ b/jberet-se/src/test/java/org/jberet/se/test/ForceStopBatchletTest.java @@ -0,0 +1,40 @@ +package org.jberet.se.test; + +import jakarta.batch.runtime.BatchStatus; +import org.jberet.operations.ForceStopJobOperatorImpl; +import org.jberet.runtime.JobExecutionImpl; +import org.jberet.spi.ForceStopJobOperatorContextSelector; +import org.jberet.spi.JobOperatorContext; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +public class ForceStopBatchletTest { + private static final String jobName = "org.jberet.se.test.sleepBatchlet"; + + @Test + public void sleepForceStop() throws Exception { + + JobOperatorContext.setJobOperatorContextSelector(new ForceStopJobOperatorContextSelector()); + ForceStopJobOperatorImpl operator = (ForceStopJobOperatorImpl) JobOperatorContext.getJobOperatorContext().getJobOperator(); + + final int sleepMinutes = 6; + final Properties params = new Properties(); + params.setProperty("sleep.minutes", String.valueOf(sleepMinutes)); + final long jobExecutionId = operator.start(jobName, params); + final JobExecutionImpl jobExecution = (JobExecutionImpl) operator.getJobExecution(jobExecutionId); + + operator.forceStop(jobExecutionId); + // don't await. +// jobExecution.awaitTermination(1, TimeUnit.MINUTES); + + System.out.printf("jobExecution id=%s, batchStatus=%s, exitStatus=%s, jobParameters=%s, restartPosition=%s, " + + "createTime=%s, startTime=%s, lastUpdateTime=%s, endTime=%s%n", + jobExecutionId, jobExecution.getBatchStatus(), jobExecution.getExitStatus(), jobExecution.getJobParameters(), + jobExecution.getRestartPosition(), jobExecution.getCreateTime(), jobExecution.getStartTime(), + jobExecution.getLastUpdatedTime(), jobExecution.getLastUpdatedTime(), jobExecution.getEndTime()); + Assertions.assertEquals(BatchStatus.STOPPED, jobExecution.getBatchStatus()); + Assertions.assertEquals(BatchStatus.STOPPED.name(), jobExecution.getExitStatus()); + } +} diff --git a/test-apps/common/src/main/java/org/jberet/testapps/common/AbstractIT.java b/test-apps/common/src/main/java/org/jberet/testapps/common/AbstractIT.java index 48f1cd435..8762b3908 100644 --- a/test-apps/common/src/main/java/org/jberet/testapps/common/AbstractIT.java +++ b/test-apps/common/src/main/java/org/jberet/testapps/common/AbstractIT.java @@ -23,7 +23,7 @@ import jakarta.batch.runtime.StepExecution; import org.jberet.job.model.Job; -import org.jberet.operations.JobOperatorImpl; +import org.jberet.operations.DefaultJobOperatorImpl; import org.jberet.runtime.JobExecutionImpl; import org.jberet.runtime.StepExecutionImpl; import org.jberet.spi.JobOperatorContext; @@ -52,7 +52,7 @@ abstract public class AbstractIT { //delay bootstrapping JobOperator, since some tests may need to adjust jberet configuration, such as //infinispanRepository tests. - protected JobOperatorImpl jobOperator; + protected DefaultJobOperatorImpl jobOperator; /** * Initializes and bootstraps {@code JobOperator}. @@ -81,7 +81,7 @@ public void before() throws Exception { // Casting to JobOperatorImpl works in Java SE environment, but does not work in WildFly Jakarta EE // environment, which has a different JobOperator implementation class. // - jobOperator = (JobOperatorImpl) JobOperatorContext.getJobOperatorContext().getJobOperator(); + jobOperator = (DefaultJobOperatorImpl) JobOperatorContext.getJobOperatorContext().getJobOperator(); } } diff --git a/test-apps/purgeJdbcRepository/pom.xml b/test-apps/purgeJdbcRepository/pom.xml index de5ad4eb7..051174ac9 100644 --- a/test-apps/purgeJdbcRepository/pom.xml +++ b/test-apps/purgeJdbcRepository/pom.xml @@ -49,5 +49,11 @@ chunkPartition ${project.version} + + + org.postgresql + postgresql + 42.7.4 + \ No newline at end of file diff --git a/test-apps/purgeJdbcRepository/src/main/resources/jberet.properties b/test-apps/purgeJdbcRepository/src/main/resources/jberet.properties index 986518ec3..06a34283a 100644 --- a/test-apps/purgeJdbcRepository/src/main/resources/jberet.properties +++ b/test-apps/purgeJdbcRepository/src/main/resources/jberet.properties @@ -1,3 +1,6 @@ # Optional, valid values are jdbc (default), mongodb, infinispan and in-memory job-repository-type = jdbc -db-url = jdbc:h2:./target/jberet-repo +#db-url = jdbc:h2:./target/jberet-repo +db-url = jdbc:postgresql://localhost:5432/batch_db +db-user = batch_user +db-password = 123 \ No newline at end of file diff --git a/test-apps/purgeJdbcRepository/src/test/java/org/jberet/testapps/purgeJdbcRepository/PurgeJdbcRepositoryIT.java b/test-apps/purgeJdbcRepository/src/test/java/org/jberet/testapps/purgeJdbcRepository/PurgeJdbcRepositoryIT.java index ae554a7ce..fccfe9e96 100644 --- a/test-apps/purgeJdbcRepository/src/test/java/org/jberet/testapps/purgeJdbcRepository/PurgeJdbcRepositoryIT.java +++ b/test-apps/purgeJdbcRepository/src/test/java/org/jberet/testapps/purgeJdbcRepository/PurgeJdbcRepositoryIT.java @@ -21,11 +21,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; + import jakarta.batch.operations.JobRestartException; import jakarta.batch.operations.NoSuchJobException; import jakarta.batch.operations.NoSuchJobExecutionException; import jakarta.batch.runtime.BatchStatus; +import jakarta.batch.runtime.JobExecution; import org.jberet.repository.JdbcRepository; import org.jberet.se.BatchSEEnvironment; import org.jberet.testapps.purgeInMemoryRepository.PurgeRepositoryTestBase; @@ -38,7 +40,7 @@ public class PurgeJdbcRepositoryIT extends PurgeRepositoryTestBase { static final String purgeJdbcRepositoryJobName = "purgeJdbcRepository"; - ///////////////////////////////////////////////////// + /// ////////////////////////////////////////////////// @Test @Disabled("run it manually, Ctrl-C before it completes") public void ctrlC_1() throws Exception { @@ -57,7 +59,7 @@ public void restartKilledStrict() throws Exception { assertThrows(JobRestartException.class, super::restartKilledStrict); } - ///////////////////////////////////////////////////// + /// ////////////////////////////////////////////////// @Test @Disabled("run it manually, Ctrl-C before it completes") public void ctrlC_2() throws Exception { @@ -70,7 +72,7 @@ public void restartKilled() throws Exception { super.restartKilled(); } - ///////////////////////////////////////////////////// + /// ////////////////////////////////////////////////// @Test @Disabled("run it manually, Ctrl-C before it completes") public void ctrlC_3() throws Exception { @@ -83,7 +85,7 @@ public void restartKilledDetect() throws Exception { super.restartKilledDetect(); } - ///////////////////////////////////////////////////// + /// ////////////////////////////////////////////////// @Test @Disabled("run it manually, Ctrl-C before it completes") public void ctrlC_4() throws Exception { @@ -96,7 +98,7 @@ public void restartKilledForce() throws Exception { super.restartKilledForce(); } - ///////////////////////////////////////////////////// + /// ////////////////////////////////////////////////// @Test @Disabled("run it manually, Ctrl-C before it completes") public void ctrlC_5() throws Exception { @@ -109,14 +111,14 @@ public void restartKilledStopAbandon() throws Exception { super.restartKilledStopAbandon(); } - ///////////////////////////////////////////////////// + /// ////////////////////////////////////////////////// @Test @Disabled("run it manually") public void memoryTest() throws Exception { super.memoryTest(); } - ///////////////////////////////////////////////////// + /// ////////////////////////////////////////////////// @Test public void restartNoSuchJobExecutionException() { assertThrows(NoSuchJobExecutionException.class, () -> jobOperator.restart(-1, null)); @@ -153,6 +155,17 @@ public void getRunningExecutions() throws Exception { super.getRunningExecutions(); } + @Disabled + @Test + public void getTimeoutExecutions() throws Exception { + // todo: create a timeout job; + purgeJobExecutions(); + super.getRunningExecutions(); + List executions = jobOperator.getJobRepository().getTimeoutJobExecutions(Long.valueOf(0)); + System.out.println(executions.size()); + } + + @Test public void getRunningExecutions2() throws Exception { purgeJobExecutions(); @@ -306,7 +319,7 @@ public void withSqlDeleteJobExecutionsCascade() throws Exception { params.setProperty("sql", "delete from JOB_EXECUTION where JOBINSTANCEID in " + - "(select DISTINCT JOBINSTANCEID from JOB_INSTANCE where JOBNAME like 'prepurge%'); "); + "(select DISTINCT JOBINSTANCEID from JOB_INSTANCE where JOBNAME like 'prepurge%'); "); params.setProperty("jobExecutionsByJobNames", prepurgeAndPrepurge2JobNames); @@ -429,7 +442,7 @@ private void purgeJobExecutions() throws Exception { params.setProperty("purgeJobsByNames", prepurgeAndPrepurge2JobNames); startAndVerifyPurgeJob(purgeJdbcRepositoryJobName); final JdbcRepository jdbcRepository = (JdbcRepository) jobOperator.getJobRepository(); - jdbcRepository.executeStatements( "delete from PARTITION_EXECUTION; delete from STEP_EXECUTION; delete from JOB_EXECUTION; delete from JOB_INSTANCE", null); + jdbcRepository.executeStatements("delete from PARTITION_EXECUTION; delete from STEP_EXECUTION; delete from JOB_EXECUTION; delete from JOB_INSTANCE", null); } }