Skip to content

Package: QueuedJobExecutionCheckTask

QueuedJobExecutionCheckTask

nameinstructionbranchcomplexitylinemethod
QueuedJobExecutionCheckTask(KapuaId, KapuaId, KapuaId)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
lambda$run$0(QueuedJobExecutionQuery)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$run$1(QueuedJobExecution)
M: 9 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$run$2(QueuedJobExecution)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$run$3(QueuedJobExecution)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
run()
M: 167 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 26 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 23 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2019, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.job.engine.jbatch.listener;
14:
15: import org.eclipse.kapua.commons.model.query.predicate.AndPredicateImpl;
16: import org.eclipse.kapua.commons.model.query.predicate.AttributePredicateImpl;
17: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
18: import org.eclipse.kapua.job.engine.JobEngineService;
19: import org.eclipse.kapua.job.engine.jbatch.setting.JobEngineSetting;
20: import org.eclipse.kapua.job.engine.jbatch.setting.JobEngineSettingKeys;
21: import org.eclipse.kapua.job.engine.queue.QueuedJobExecution;
22: import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionAttributes;
23: import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionFactory;
24: import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionListResult;
25: import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionQuery;
26: import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionService;
27: import org.eclipse.kapua.job.engine.queue.QueuedJobExecutionStatus;
28: import org.eclipse.kapua.locator.KapuaLocator;
29: import org.eclipse.kapua.model.id.KapuaId;
30: import org.slf4j.Logger;
31: import org.slf4j.LoggerFactory;
32:
33: import java.util.TimerTask;
34:
35: public class QueuedJobExecutionCheckTask extends TimerTask {
36:
37: private static final Logger LOG = LoggerFactory.getLogger(QueuedJobExecutionCheckTask.class);
38:
39: private static final JobEngineSetting JOB_ENGINE_SETTING = JobEngineSetting.getInstance();
40:
41: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
42:
43: private static final JobEngineService JOB_ENGINE_SERVICE = LOCATOR.getService(JobEngineService.class);
44:
45: private static final QueuedJobExecutionService QUEUED_JOB_EXECUTION_SERVICE = LOCATOR.getService(QueuedJobExecutionService.class);
46: private static final QueuedJobExecutionFactory QUEUED_JOB_EXECUTION_FACTORY = LOCATOR.getFactory(QueuedJobExecutionFactory.class);
47:
48:
49: private KapuaId scopeId;
50: private KapuaId jobId;
51: private KapuaId jobExecutionId;
52:
53: public QueuedJobExecutionCheckTask(KapuaId scopeId, KapuaId jobId, KapuaId jobExecutionId) {
54: this.scopeId = scopeId;
55: this.jobId = jobId;
56: this.jobExecutionId = jobExecutionId;
57: }
58:
59: @Override
60: public void run() {
61: LOG.info("Checking Job Execution queue for: {}...", jobExecutionId);
62:
63: try {
64: QueuedJobExecutionQuery query = QUEUED_JOB_EXECUTION_FACTORY.newQuery(scopeId);
65:
66: query.setPredicate(
67: new AndPredicateImpl(
68: new AttributePredicateImpl<>(QueuedJobExecutionAttributes.JOB_ID, jobId),
69: new AttributePredicateImpl<>(QueuedJobExecutionAttributes.WAIT_FOR_JOB_EXECUTION_ID, jobExecutionId)
70: )
71: );
72:
73: QueuedJobExecutionListResult queuedJobExecutions = KapuaSecurityUtils.doPrivileged(() -> QUEUED_JOB_EXECUTION_SERVICE.query(query));
74:
75: int i = 0;
76: int failedToResumeExecution = 0;
77:• for (QueuedJobExecution qje : queuedJobExecutions.getItems()) {
78: Thread.sleep(JOB_ENGINE_SETTING.getInt(JobEngineSettingKeys.JOB_ENGINE_QUEUE_PROCESSING_RUN_DELAY));
79:
80: LOG.info("Resuming Job Execution ({}/{}): {}...", ++i, queuedJobExecutions.getSize(), qje.getJobExecutionId());
81:
82: try {
83: KapuaSecurityUtils.doPrivileged(() -> JOB_ENGINE_SERVICE.resumeJobExecution(qje.getScopeId(), qje.getJobId(), qje.getJobExecutionId()));
84:
85: qje.setStatus(QueuedJobExecutionStatus.PROCESSED);
86: KapuaSecurityUtils.doPrivileged(() -> QUEUED_JOB_EXECUTION_SERVICE.update(qje));
87: } catch (Exception e) {
88: LOG.error("Resuming Job Execution ({}/{}): {}... ERROR!", i, queuedJobExecutions.getSize(), qje.getJobExecutionId(), e);
89: failedToResumeExecution++;
90:
91: qje.setStatus(QueuedJobExecutionStatus.FAILED_TO_RESUME);
92: KapuaSecurityUtils.doPrivileged(() -> QUEUED_JOB_EXECUTION_SERVICE.update(qje));
93: continue;
94: }
95:
96: LOG.info("Resuming Job Execution ({}/{}): {}... DONE!", i, queuedJobExecutions.getSize(), qje.getJobExecutionId());
97: }
98:
99: LOG.info("Checking Job Execution queue for: {}... DONE! Queued job failed to resume: {}.", jobExecutionId, failedToResumeExecution);
100: } catch (Exception e) {
101: LOG.error("Checking Job Execution queue for: {}... ERROR!", jobExecutionId, e);
102: }
103: }
104: }