Skip to content

Package: DefaultTargetReader

DefaultTargetReader

nameinstructionbranchcomplexitylinemethod
DefaultTargetReader()
M: 20 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
getTargetDisplayName(JobTarget)
M: 12 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
lambda$getTargetDisplayName$3(JobTarget)
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$open$0(JobTargetQuery)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$open$1(JobTarget)
M: 9 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$readItem$2(JobContextWrapper, JobLogger)
M: 49 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%
open(Serializable)
M: 98 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 17 C: 0
0%
M: 1 C: 0
0%
readItem()
M: 18 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 11 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
stepIndexFiltering(JobContextWrapper, StepContextWrapper, KapuaQuery, AndPredicate)
M: 30 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
targetSublistFiltering(JobContextWrapper, KapuaQuery, AndPredicate)
M: 14 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.job.engine.commons.operation;
14:
15: import org.eclipse.kapua.KapuaException;
16: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
17: import org.eclipse.kapua.job.engine.commons.logger.JobLogger;
18: import org.eclipse.kapua.job.engine.commons.wrappers.JobContextWrapper;
19: import org.eclipse.kapua.job.engine.commons.wrappers.JobTargetWrapper;
20: import org.eclipse.kapua.job.engine.commons.wrappers.StepContextWrapper;
21: import org.eclipse.kapua.locator.KapuaLocator;
22: import org.eclipse.kapua.model.query.KapuaQuery;
23: import org.eclipse.kapua.model.query.predicate.AndPredicate;
24: import org.eclipse.kapua.model.query.predicate.AttributePredicate;
25: import org.eclipse.kapua.service.device.registry.Device;
26: import org.eclipse.kapua.service.device.registry.DeviceRegistryService;
27: import org.eclipse.kapua.service.job.operation.TargetReader;
28: import org.eclipse.kapua.service.job.step.JobStepIndex;
29: import org.eclipse.kapua.service.job.targets.JobTarget;
30: import org.eclipse.kapua.service.job.targets.JobTargetAttributes;
31: import org.eclipse.kapua.service.job.targets.JobTargetFactory;
32: import org.eclipse.kapua.service.job.targets.JobTargetListResult;
33: import org.eclipse.kapua.service.job.targets.JobTargetQuery;
34: import org.eclipse.kapua.service.job.targets.JobTargetService;
35: import org.eclipse.kapua.service.job.targets.JobTargetStatus;
36: import org.slf4j.Logger;
37: import org.slf4j.LoggerFactory;
38:
39: import javax.batch.api.chunk.AbstractItemReader;
40: import javax.batch.runtime.context.JobContext;
41: import javax.batch.runtime.context.StepContext;
42: import javax.inject.Inject;
43: import java.io.Serializable;
44: import java.util.ArrayList;
45: import java.util.List;
46:
47: /**
48: * Default {@link TargetReader} implementation.
49: * <p>
50: * All {@link org.eclipse.kapua.service.job.step.definition.JobStepDefinition} can use this {@link TargetReader} implementation or extend or provide one on their own.
51: *
52: * @since 1.0.0
53: */
54: public class DefaultTargetReader extends AbstractItemReader implements TargetReader {
55:
56: private static final Logger LOG = LoggerFactory.getLogger(DefaultTargetReader.class);
57:
58: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
59:
60: private static final DeviceRegistryService DEVICE_REGISTRY_SERVICE = LOCATOR.getService(DeviceRegistryService.class);
61:
62: private final JobTargetFactory jobTargetFactory = LOCATOR.getFactory(JobTargetFactory.class);
63: private final JobTargetService jobTargetService = LOCATOR.getService(JobTargetService.class);
64:
65: @Inject
66: private JobContext jobContext;
67:
68: @Inject
69: private StepContext stepContext;
70:
71: protected List<JobTargetWrapper> wrappedJobTargets = new ArrayList<>();
72: protected int jobTargetIndex;
73:
74: @Override
75: public void open(Serializable arg0) throws Exception {
76: JobContextWrapper jobContextWrapper = new JobContextWrapper(jobContext);
77: StepContextWrapper stepContextWrapper = new StepContextWrapper(stepContext);
78:
79: JobLogger jobLogger = jobContextWrapper.getJobLogger();
80: jobLogger.setClassLog(LOG);
81:
82: int stepIndex = stepContextWrapper.getStepIndex();
83: String stepName = stepContextWrapper.getKapuaStepName();
84:
85: jobLogger.info("Reading target chunk. Step:{} (index:{})...", stepName, stepIndex);
86:
87: //
88: // Job Id and JobTarget status filtering
89: JobTargetQuery query = jobTargetFactory.newQuery(jobContextWrapper.getScopeId());
90:
91: AndPredicate andPredicate = query.andPredicate(
92: query.attributePredicate(JobTargetAttributes.JOB_ID, jobContextWrapper.getJobId())
93: );
94:
95: //
96: // Step index filtering
97: stepIndexFiltering(jobContextWrapper, stepContextWrapper, query, andPredicate);
98:
99: //
100: // Filter selected target
101: targetSublistFiltering(jobContextWrapper, query, andPredicate);
102:
103: //
104: // Query the targets
105: query.setPredicate(andPredicate);
106:
107: JobTargetListResult jobTargets = KapuaSecurityUtils.doPrivileged(() -> jobTargetService.query(query));
108:
109: //
110: // Wrap the JobTargets in a wrapper object to store additional informations
111: jobTargets.getItems().forEach(jt -> wrappedJobTargets.add(new JobTargetWrapper(jt)));
112:
113: jobLogger.info("Reading target chunk. Step:{} (index:{})...DONE", stepName, stepIndex);
114: }
115:
116: @Override
117: public Object readItem() throws Exception {
118: JobContextWrapper jobContextWrapper = new JobContextWrapper(jobContext);
119:
120: JobLogger jobLogger = jobContextWrapper.getJobLogger();
121: jobLogger.setClassLog(LOG);
122:
123: return KapuaSecurityUtils.doPrivileged(() -> {
124: JobTargetWrapper currentWrappedJobTarget = null;
125:• if (jobTargetIndex < wrappedJobTargets.size()) {
126: currentWrappedJobTarget = wrappedJobTargets.get(jobTargetIndex++);
127: JobTarget jobTarget = jobTargetService.find(jobContextWrapper.getScopeId(), currentWrappedJobTarget.getJobTarget().getId());
128: jobLogger.info("Read target: {} (id: {})", getTargetDisplayName(jobTarget), jobTarget.getId().toCompactId());
129: }
130: return currentWrappedJobTarget;
131: });
132: }
133:
134: /**
135: * This method apply {@link AttributePredicate}s according to the parameters contained into the {@link JobContextWrapper} and {@link StepContextWrapper}.
136: * <p>
137: * When no {@link JobStepIndex} is specified, the methods selects all targets that are set to the current {@link StepContextWrapper#getStepIndex()} and that don't have the
138: * {@link JobTargetStatus} set to {@link JobTargetStatus#PROCESS_OK}.
139: * <p>
140: * When a {@link JobStepIndex} is specified, the methods ignores all targets until the {@link StepContextWrapper#getStepIndex()} doesn't match the {@link JobContextWrapper#getFromStepIndex()}.
141: * Then the {@link JobTarget}s will be selected as regularly.
142: *
143: * @param jobContextWrapper The {@link JobContextWrapper} from which extract data
144: * @param stepContextWrapper The {@link StepContextWrapper} from which extract data
145: * @param query The {@link KapuaQuery} to perform
146: * @param andPredicate The {@link org.eclipse.kapua.model.query.predicate.AndPredicate} where to apply {@link org.eclipse.kapua.model.query.predicate.QueryPredicate}
147: * @since 1.0.0
148: */
149: protected void stepIndexFiltering(JobContextWrapper jobContextWrapper, StepContextWrapper stepContextWrapper, KapuaQuery query, AndPredicate andPredicate) {
150:
151: // Select all targets that aren't in PROCESS_OK status
152: andPredicate.and(query.attributePredicate(JobTargetAttributes.STATUS, JobTargetStatus.PROCESS_OK, AttributePredicate.Operator.NOT_EQUAL));
153:
154: // Select all target that are at the current step
155: andPredicate.and(query.attributePredicate(JobTargetAttributes.STEP_INDEX, stepContextWrapper.getStepIndex()));
156:
157: // Select all targets at or after the given fromStepIndex (if specified)
158:• if (jobContextWrapper.getFromStepIndex() != null) {
159: andPredicate.and(query.attributePredicate(JobTargetAttributes.STEP_INDEX, jobContextWrapper.getFromStepIndex(), AttributePredicate.Operator.GREATER_THAN_OR_EQUAL));
160: }
161: }
162:
163: /**
164: * This method apply {@link AttributePredicate}s according to the parameters contained into the {@link JobContextWrapper#getTargetSublist()}.
165: * <p>
166: * If the {@link JobContextWrapper#getTargetSublist()} has one or more {@link org.eclipse.kapua.model.id.KapuaId}s they will be added to the
167: * {@link org.eclipse.kapua.model.query.predicate.AndPredicate} to select only given {@link JobTarget}.
168: *
169: * @param jobContextWrapper The {@link JobContextWrapper} from which extract data
170: * @param query The {@link KapuaQuery} to perform
171: * @param andPredicate The {@link org.eclipse.kapua.model.query.predicate.AndPredicate} where to apply {@link org.eclipse.kapua.model.query.predicate.QueryPredicate}
172: * @since 1.0.0
173: */
174: protected void targetSublistFiltering(JobContextWrapper jobContextWrapper, KapuaQuery query, AndPredicate andPredicate) {
175:• if (!jobContextWrapper.getTargetSublist().isEmpty()) {
176: andPredicate.and(query.attributePredicate(JobTargetAttributes.ENTITY_ID, jobContextWrapper.getTargetSublist().toArray()));
177: }
178: }
179:
180: protected String getTargetDisplayName(JobTarget jobTarget) throws KapuaException {
181: Device device = KapuaSecurityUtils.doPrivileged(() -> DEVICE_REGISTRY_SERVICE.find(jobTarget.getScopeId(), jobTarget.getJobTargetId()));
182:• if (device == null) {
183: return "N/A";
184: }
185: return device.getClientId();
186: }
187:
188: }