Skip to content

Package: JbatchDriver

JbatchDriver

nameinstructionbranchcomplexitylinemethod
cleanJobData(KapuaId, KapuaId)
M: 19 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%
getJbatchJobExecutions(JobInstance)
M: 29 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
getJbatchJobName(KapuaId, KapuaId)
M: 15 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getJobExecutions(KapuaId, KapuaId)
M: 93 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 19 C: 0
0%
M: 1 C: 0
0%
getRunningJobExecutions(KapuaId, KapuaId)
M: 10 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
isRunningJob(KapuaId, KapuaId)
M: 9 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$getRunningJobExecutions$4(JobExecution)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$resumeJob$2(long, JobExecution)
M: 9 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$resumeJob$3(KapuaId, JobExecution)
M: 17 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
lambda$stopJob$0(long, JobExecution)
M: 9 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$stopJob$1(JobExecution)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
resumeJob(KapuaId, KapuaId, KapuaId)
M: 58 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 14 C: 0
0%
M: 1 C: 0
0%
startJob(KapuaId, KapuaId, JobStartOptions)
M: 250 C: 0
0%
M: 15 C: 0
0%
M: 9 C: 0
0%
M: 51 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 28 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
stopJob(KapuaId, KapuaId, KapuaId)
M: 59 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 15 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2018, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.job.engine.jbatch.driver;
14:
15: import com.ibm.jbatch.container.jsl.ExecutionElement;
16: import com.ibm.jbatch.container.jsl.ModelSerializerFactory;
17: import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
18: import com.ibm.jbatch.jsl.model.JSLJob;
19: import com.ibm.jbatch.jsl.model.Step;
20: import org.apache.commons.io.IOUtils;
21: import org.apache.commons.lang3.SystemUtils;
22: import org.eclipse.kapua.KapuaException;
23: import org.eclipse.kapua.KapuaIllegalArgumentException;
24: import org.eclipse.kapua.job.engine.JobStartOptions;
25: import org.eclipse.kapua.job.engine.commons.wrappers.JobContextPropertyNames;
26: import org.eclipse.kapua.job.engine.jbatch.driver.exception.CannotBuildJobDefDriverException;
27: import org.eclipse.kapua.job.engine.jbatch.driver.exception.CannotCleanJobDefFileDriverException;
28: import org.eclipse.kapua.job.engine.jbatch.driver.exception.CannotCreateTmpDirDriverException;
29: import org.eclipse.kapua.job.engine.jbatch.driver.exception.CannotWriteJobDefFileDriverException;
30: import org.eclipse.kapua.job.engine.jbatch.driver.exception.CleanJobDataDriverException;
31: import org.eclipse.kapua.job.engine.jbatch.driver.exception.ExecutionNotFoundDriverException;
32: import org.eclipse.kapua.job.engine.jbatch.driver.exception.ExecutionNotRunningDriverException;
33: import org.eclipse.kapua.job.engine.jbatch.driver.exception.JbatchDriverException;
34: import org.eclipse.kapua.job.engine.jbatch.driver.exception.JobExecutionIsRunningDriverException;
35: import org.eclipse.kapua.job.engine.jbatch.driver.exception.JobStartingDriverException;
36: import org.eclipse.kapua.job.engine.jbatch.driver.utils.JobDefinitionBuildUtils;
37: import org.eclipse.kapua.job.engine.jbatch.persistence.JPAPersistenceManagerImpl;
38: import org.eclipse.kapua.job.engine.jbatch.setting.JobEngineSettingKeys;
39: import org.eclipse.kapua.locator.KapuaLocator;
40: import org.eclipse.kapua.model.id.KapuaId;
41: import org.eclipse.kapua.service.job.Job;
42: import org.eclipse.kapua.service.job.execution.JobExecutionService;
43: import org.eclipse.kapua.service.job.step.JobStep;
44: import org.eclipse.kapua.service.job.step.JobStepAttributes;
45: import org.eclipse.kapua.service.job.step.JobStepFactory;
46: import org.eclipse.kapua.service.job.step.JobStepListResult;
47: import org.eclipse.kapua.service.job.step.JobStepQuery;
48: import org.eclipse.kapua.service.job.step.JobStepService;
49: import org.eclipse.kapua.service.job.step.definition.JobStepDefinition;
50: import org.eclipse.kapua.service.job.step.definition.JobStepDefinitionService;
51: import org.slf4j.Logger;
52: import org.slf4j.LoggerFactory;
53:
54: import javax.batch.operations.JobExecutionNotRunningException;
55: import javax.batch.operations.JobOperator;
56: import javax.batch.operations.JobSecurityException;
57: import javax.batch.operations.NoSuchJobException;
58: import javax.batch.operations.NoSuchJobExecutionException;
59: import javax.batch.operations.NoSuchJobInstanceException;
60: import javax.batch.runtime.BatchRuntime;
61: import javax.batch.runtime.JobExecution;
62: import javax.batch.runtime.JobInstance;
63: import javax.validation.constraints.NotNull;
64: import java.io.File;
65: import java.io.FileOutputStream;
66: import java.io.IOException;
67: import java.util.ArrayList;
68: import java.util.Collections;
69: import java.util.Comparator;
70: import java.util.Iterator;
71: import java.util.List;
72: import java.util.Properties;
73: import java.util.stream.Collectors;
74:
75: /**
76: * Driver class for Java Batch API
77: *
78: * @since 1.0.0
79: */
80: public class JbatchDriver {
81:
82: private static final Logger LOG = LoggerFactory.getLogger(JbatchDriver.class);
83:
84: private static final String JBATCH_EXECUTION_ID = "JBATCH_EXECUTION_ID";
85:
86: private static final JobOperator JOB_OPERATOR = BatchRuntime.getJobOperator();
87:
88: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
89:
90: private static final JobExecutionService JOB_EXECUTION_SERVICE = LOCATOR.getService(JobExecutionService.class);
91:
92: private static final JobStepService JOB_STEP_SERVICE = LOCATOR.getService(JobStepService.class);
93: private static final JobStepFactory JOB_STEP_FACTORY = LOCATOR.getFactory(JobStepFactory.class);
94:
95: private static final JobStepDefinitionService STEP_DEFINITION_SERVICE = LOCATOR.getService(JobStepDefinitionService.class);
96:
97: private JbatchDriver() {
98: }
99:
100: /**
101: * Builds the jBatch job name from the {@link Job#getScopeId()} and the {@link Job#getId()}.
102: * <p>
103: * Format is: job-{scopeIdShort}-{jobIdShort}
104: *
105: * @param scopeId The scopeId of the {@link Job}
106: * @param jobId The id of the {@link Job}
107: * @return The jBatch {@link Job} name
108: * @since 1.0.0
109: */
110: public static String getJbatchJobName(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) {
111: return String.format("job-%s-%s", scopeId.toCompactId(), jobId.toCompactId());
112: }
113:
114: /**
115: * Starts a jBatch job with data sourced from the Kapua {@link Job} definition.
116: * <p>
117: * It builds the XML jBatch job definition using the {@link JSLJob} model definition.
118: * The generated XML is store in the {@link SystemUtils#getJavaIoTmpDir()} since the default configuration of jBatch requires a path name to start the jBatch job
119: *
120: * @param scopeId The scopeId of the {@link Job}
121: * @param jobId The id of the {@link Job}
122: * @param jobStartOptions The {@link JobStartOptions} for this start {@link org.eclipse.kapua.service.job.Job} request.
123: * @throws CannotBuildJobDefDriverException if the creation of the {@link JSLJob} fails
124: * @throws CannotCreateTmpDirDriverException if the temp directory for storing the XML job definition file cannot be created
125: * @throws CannotCleanJobDefFileDriverException if the XML job definition file cannot be deleted, when existing
126: * @throws CannotWriteJobDefFileDriverException if the XML job definition file cannot be created and written in the tmp directory
127: * @throws JobExecutionIsRunningDriverException if the jBatch job has another {@link JobExecution} running
128: * @throws JobStartingDriverException if invoking {@link JobOperator#start(String, Properties)} throws an {@link Exception}
129: * @since 1.0.0
130: */
131: public static void startJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, @NotNull JobStartOptions jobStartOptions)
132: throws JbatchDriverException {
133:
134: String jobXmlDefinition;
135: String jobName = JbatchDriver.getJbatchJobName(scopeId, jobId);
136: try {
137: JobStepQuery query = JOB_STEP_FACTORY.newQuery(scopeId);
138: query.setPredicate(query.attributePredicate(JobStepAttributes.JOB_ID, jobId));
139:
140: JobStepListResult jobSteps = JOB_STEP_SERVICE.query(query);
141: jobSteps.sort(Comparator.comparing(JobStep::getStepIndex));
142:
143: List<ExecutionElement> jslExecutionElements = new ArrayList<>();
144: Iterator<JobStep> jobStepIterator = jobSteps.getItems().iterator();
145:• while (jobStepIterator.hasNext()) {
146: JobStep jobStep = jobStepIterator.next();
147:
148: Step jslStep = new Step();
149: JobStepDefinition jobStepDefinition = STEP_DEFINITION_SERVICE.find(KapuaId.ANY, jobStep.getJobStepDefinitionId());
150:• switch (jobStepDefinition.getStepType()) {
151: case GENERIC:
152: jslStep.setBatchlet(JobDefinitionBuildUtils.buildGenericStep(jobStepDefinition));
153: break;
154: case TARGET:
155: jslStep.setChunk(JobDefinitionBuildUtils.buildChunkStep(jobStepDefinition));
156: break;
157: default:
158: throw new KapuaIllegalArgumentException(jobStepDefinition.getStepType().name(), "jobStepDefinition.stepType");
159: }
160:
161: jslStep.setId("step-" + jobStep.getStepIndex());
162:
163:• if (jobStepIterator.hasNext()) {
164: jslStep.setNextFromAttribute("step-" + (jobStep.getStepIndex() + 1));
165: }
166:
167: jslStep.setProperties(JobDefinitionBuildUtils.buildStepProperties(jobStepDefinition, jobStep, jobStepIterator.hasNext(), jobStartOptions.getStepPropertiesOverrides()));
168:
169: jslExecutionElements.add(jslStep);
170: }
171:
172: JSLJob jslJob = new JSLJob();
173: jslJob.setRestartable("true");
174: jslJob.setId(jobName);
175: jslJob.setVersion("1.0");
176: jslJob.setProperties(JobDefinitionBuildUtils.buildJobProperties(scopeId, jobId, jobStartOptions));
177: jslJob.setListeners(JobDefinitionBuildUtils.buildListener());
178: jslJob.getExecutionElements().addAll(jslExecutionElements);
179:
180: jobXmlDefinition = ModelSerializerFactory.createJobModelSerializer().serializeModel(jslJob);
181: } catch (Exception e) {
182: throw new CannotBuildJobDefDriverException(e, jobName);
183: }
184:
185: //
186: // Retrieve temporary directory for job XML definition
187: String tmpDirectory = SystemUtils.getJavaIoTmpDir().getAbsolutePath();
188: File jobTempDirectory = new File(tmpDirectory, "kapua-job/" + scopeId.toCompactId());
189:• if (!jobTempDirectory.exists() && !jobTempDirectory.mkdirs()) {
190: throw new CannotCreateTmpDirDriverException(jobName, jobTempDirectory.getAbsolutePath());
191: }
192:
193: //
194: // Retrieve job XML definition file. Delete it if exist
195: File jobXmlDefinitionFile = new File(jobTempDirectory, jobId.toCompactId().concat("-").concat(String.valueOf(System.nanoTime())).concat(".xml"));
196:• if (jobXmlDefinitionFile.exists() && !jobXmlDefinitionFile.delete()) {
197: throw new CannotCleanJobDefFileDriverException(jobName, jobXmlDefinitionFile.getAbsolutePath());
198: }
199:
200: try (FileOutputStream tmpStream = new FileOutputStream(jobXmlDefinitionFile)) {
201: IOUtils.write(jobXmlDefinition, tmpStream);
202: } catch (IOException e) {
203: throw new CannotWriteJobDefFileDriverException(e, jobName, jobXmlDefinitionFile.getAbsolutePath());
204: }
205:
206: //
207: // Start job
208: try {
209: JOB_OPERATOR.start(jobXmlDefinitionFile.getAbsolutePath().replaceAll("\\.xml$", ""), new Properties());
210: } catch (NoSuchJobExecutionException | NoSuchJobException | JobSecurityException e) {
211: throw new JobStartingDriverException(e, jobName);
212: }
213: }
214:
215: /**
216: * Stops completely the jBatch job.
217: * <p>
218: * First invokes the {@link JobOperator#stop(long)} on the running execution, which stop the running execution.
219: * Secondly, according to the {@link JobEngineSettingKeys#JOB_ENGINE_STOP_WAIT_CHECK} value, it waits asynchronously the complete stop of the job
220: * to be able to invoke {@link JobOperator#abandon(long)} which terminate the jBatch Job.
221: * <p>
222: * A jBatch job cannot be resumed after this method is invoked on it.
223: *
224: * @param scopeId The scopeId of the {@link Job}
225: * @param jobId The id of the {@link Job}
226: * @param toStopJobExecutionId The optional {@link org.eclipse.kapua.service.job.execution.JobExecution#getId()} to stop.
227: * @throws ExecutionNotFoundDriverException when there isn't a corresponding job execution in jBatch tables
228: * @throws ExecutionNotRunningDriverException when the corresponding job execution is not running.
229: * @since 1.0.0
230: */
231: public static void stopJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, KapuaId toStopJobExecutionId) throws JbatchDriverException, KapuaException {
232:
233: String jobName = getJbatchJobName(scopeId, jobId);
234:
235: //
236: // Check running
237: List<JobExecution> runningExecutions = getRunningJobExecutions(scopeId, jobId);
238:• if (runningExecutions.isEmpty()) {
239: throw new ExecutionNotRunningDriverException(jobName);
240: }
241:
242: //
243: // Filter execution to stop
244:• if (toStopJobExecutionId != null) {
245: org.eclipse.kapua.service.job.execution.JobExecution je = JOB_EXECUTION_SERVICE.find(scopeId, toStopJobExecutionId);
246:
247: long toStopJbatchExecutionId = Long.parseLong((String) je.getEntityAttributes().get(JBATCH_EXECUTION_ID));
248:
249:• runningExecutions = runningExecutions.stream().filter(re -> re.getExecutionId() == toStopJbatchExecutionId).collect(Collectors.toList());
250: }
251:
252: //
253: // Do stop
254: try {
255: runningExecutions.forEach((runningExecution -> {
256: JOB_OPERATOR.stop(runningExecution.getExecutionId());
257: }));
258: } catch (NoSuchJobExecutionException e) {
259: throw new ExecutionNotFoundDriverException(e, jobName);
260: } catch (JobExecutionNotRunningException e) {
261: throw new ExecutionNotRunningDriverException(e, jobName);
262: }
263: }
264:
265: public static void resumeJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId, @NotNull KapuaId toResumeJobExecutionId) throws JbatchDriverException, KapuaException {
266:
267: String jobName = getJbatchJobName(scopeId, jobId);
268:
269: //
270: // Get list
271: List<JobExecution> stoppedJobExecutions = getJobExecutions(scopeId, jobId);
272:• if (stoppedJobExecutions.isEmpty()) {
273: throw new ExecutionNotFoundDriverException(jobName);
274: }
275:
276: //
277: // Filter execution to resume
278: org.eclipse.kapua.service.job.execution.JobExecution je = JOB_EXECUTION_SERVICE.find(scopeId, toResumeJobExecutionId);
279:
280: long toResumeJbatchExecutionId = Long.parseLong((String) je.getEntityAttributes().get(JBATCH_EXECUTION_ID));
281:
282:• stoppedJobExecutions = stoppedJobExecutions.stream().filter(re -> re.getExecutionId() == toResumeJbatchExecutionId).collect(Collectors.toList());
283:
284: //
285: // Do stop
286: try {
287: stoppedJobExecutions.forEach((stoppedExecution -> {
288: Properties properties = new Properties();
289:
290: properties.setProperty(JobContextPropertyNames.RESUMED_KAPUA_EXECUTION_ID, toResumeJobExecutionId.toCompactId());
291:
292: JOB_OPERATOR.restart(stoppedExecution.getExecutionId(), properties);
293: }));
294: } catch (NoSuchJobExecutionException e) {
295: throw new ExecutionNotFoundDriverException(e, jobName);
296: } catch (JobExecutionNotRunningException e) {
297: throw new ExecutionNotRunningDriverException(e, jobName);
298: }
299: }
300:
301:
302: /**
303: * Checks whether or not the {@link Job} identified by the parametersis in a running status.
304: * <p>
305: * jBatch {@link Job} running statuses are listed in {@link JbatchJobRunningStatuses}
306: *
307: * @param scopeId The scopeId of the {@link Job}
308: * @param jobId The id of the {@link Job}
309: * @return {@code true} if the jBatch {@link Job} is running, {@code false} otherwise.
310: * @since 1.0.0
311: */
312: public static boolean isRunningJob(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) {
313:• return !getRunningJobExecutions(scopeId, jobId).isEmpty();
314: }
315:
316: /**
317: * Deletes jBatch internal data for the given {@link Job} id.
318: * <p>
319: * The cleanup is required when deleting a {@link Job} to avoid stale data to be left in the jBatch tables.
320: *
321: * @param scopeId The {@link Job#getScopeId()}
322: * @param jobId The {@link Job#getId()}
323: * @throws CleanJobDataDriverException if the cleanup produces an error
324: * @since 1.0.0
325: */
326: public static void cleanJobData(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) throws CleanJobDataDriverException {
327: String jobName = getJbatchJobName(scopeId, jobId);
328: try {
329: ((JPAPersistenceManagerImpl) ServicesManagerImpl.getInstance().getPersistenceManagerService()).purgeByName(jobName);
330: } catch (Exception ex) {
331: throw new CleanJobDataDriverException(ex, jobName);
332: }
333: }
334:
335: //
336: // Private methods
337: //
338: private static List<JobExecution> getRunningJobExecutions(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) {
339: return getJobExecutions(scopeId, jobId).stream().filter(je -> JbatchJobRunningStatuses.getStatuses().contains(je.getBatchStatus())).collect(Collectors.toList());
340: }
341:
342: private static List<JobExecution> getJobExecutions(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) {
343: String jobName = getJbatchJobName(scopeId, jobId);
344:
345: // Get all JobInstances with this name
346: List<JobInstance> jobInstances;
347: try {
348: jobInstances = JOB_OPERATOR.getJobInstances(jobName, 0, Integer.MAX_VALUE);
349: } catch (NoSuchJobException nsje) {
350: LOG.warn("Error while getting JobInstance by name: {}. Exception: {}: {}", jobName, nsje.getClass().getSimpleName(), nsje.getMessage());
351: return Collections.emptyList();
352: } catch (NullPointerException npe) {
353: LOG.error("Unexpected NPE!", npe);
354: return Collections.emptyList();
355: }
356:
357: // For each JobInstance get its JobExecutions
358: List<JobExecution> jobExecutions = new ArrayList<>();
359:• for (JobInstance ji : jobInstances) {
360: try {
361: jobExecutions.addAll(getJbatchJobExecutions(ji));
362: } catch (NoSuchJobInstanceException nsjie) {
363: LOG.warn("Error while getting JobExecutions by JobInstance: {}. Exception: {}: {}. Continuing with other JobInstances", ji.getInstanceId(), nsjie.getClass().getSimpleName(), nsjie.getMessage());
364: } catch (NullPointerException npe) {
365: LOG.error("Unexpected NPE!", npe);
366: }
367: }
368:
369: return jobExecutions;
370: }
371:
372: private static List<JobExecution> getJbatchJobExecutions(@NotNull JobInstance jobInstance) {
373: try {
374: return JOB_OPERATOR.getJobExecutions(jobInstance);
375: } catch (NoSuchJobInstanceException nsjie) {
376: LOG.warn("Error while getting JobExecutions by JobInstance: {}. Exception {}: {}. Ignoring exception...", jobInstance.getInstanceId(), nsjie.getClass().getSimpleName(), nsjie.getMessage());
377: // This exception is thrown when there is no job instance, this means that the job never run before
378: // So we can ignore it and return an empty `List<>`
379: }
380:
381: return Collections.emptyList();
382: }
383: }