Skip to content

Package: DefaultTargetReader

DefaultTargetReader

nameinstructionbranchcomplexitylinemethod
DefaultTargetReader()
M: 20 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
lambda$open$0(JobTargetQuery)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$open$1(JobTarget)
M: 9 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
open(Serializable)
M: 70 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 15 C: 0
0%
M: 1 C: 0
0%
readItem()
M: 40 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
stepIndexFiltering(JobContextWrapper, StepContextWrapper, KapuaQuery, AndPredicate)
M: 30 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
targetSublistFiltering(JobContextWrapper, KapuaQuery, AndPredicate)
M: 14 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2020 Eurotech and/or its affiliates and others
3: *
4: * All rights reserved. This program and the accompanying materials
5: * are made available under the terms of the Eclipse Public License v1.0
6: * which accompanies this distribution, and is available at
7: * http://www.eclipse.org/legal/epl-v10.html
8: *
9: * Contributors:
10: * Eurotech - initial API and implementation
11: *******************************************************************************/
12: package org.eclipse.kapua.job.engine.commons.operation;
13:
14: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
15: import org.eclipse.kapua.job.engine.commons.logger.JobLogger;
16: import org.eclipse.kapua.job.engine.commons.wrappers.JobContextWrapper;
17: import org.eclipse.kapua.job.engine.commons.wrappers.JobTargetWrapper;
18: import org.eclipse.kapua.job.engine.commons.wrappers.StepContextWrapper;
19: import org.eclipse.kapua.locator.KapuaLocator;
20: import org.eclipse.kapua.model.query.KapuaQuery;
21: import org.eclipse.kapua.model.query.predicate.AndPredicate;
22: import org.eclipse.kapua.model.query.predicate.AttributePredicate;
23: import org.eclipse.kapua.service.job.operation.TargetReader;
24: import org.eclipse.kapua.service.job.step.JobStepIndex;
25: import org.eclipse.kapua.service.job.targets.JobTarget;
26: import org.eclipse.kapua.service.job.targets.JobTargetAttributes;
27: import org.eclipse.kapua.service.job.targets.JobTargetFactory;
28: import org.eclipse.kapua.service.job.targets.JobTargetListResult;
29: import org.eclipse.kapua.service.job.targets.JobTargetQuery;
30: import org.eclipse.kapua.service.job.targets.JobTargetService;
31: import org.eclipse.kapua.service.job.targets.JobTargetStatus;
32: import org.slf4j.Logger;
33: import org.slf4j.LoggerFactory;
34:
35: import javax.batch.api.chunk.AbstractItemReader;
36: import javax.batch.runtime.context.JobContext;
37: import javax.batch.runtime.context.StepContext;
38: import javax.inject.Inject;
39: import java.io.Serializable;
40: import java.util.ArrayList;
41: import java.util.List;
42:
43: /**
44: * Default {@link TargetReader} implementation.
45: * <p>
46: * All {@link org.eclipse.kapua.service.job.step.definition.JobStepDefinition} can use this {@link TargetReader} implementation or extend or provide one on their own.
47: *
48: * @since 1.0.0
49: */
50: public class DefaultTargetReader extends AbstractItemReader implements TargetReader {
51:
52: private static final Logger LOG = LoggerFactory.getLogger(DefaultTargetReader.class);
53:
54: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
55:
56: private final JobTargetFactory jobTargetFactory = LOCATOR.getFactory(JobTargetFactory.class);
57: private final JobTargetService jobTargetService = LOCATOR.getService(JobTargetService.class);
58:
59: @Inject
60: private JobContext jobContext;
61:
62: @Inject
63: private StepContext stepContext;
64:
65: protected List<JobTargetWrapper> wrappedJobTargets = new ArrayList<>();
66: protected int jobTargetIndex;
67:
68: @Override
69: public void open(Serializable arg0) throws Exception {
70: JobContextWrapper jobContextWrapper = new JobContextWrapper(jobContext);
71: StepContextWrapper stepContextWrapper = new StepContextWrapper(stepContext);
72:
73: JobLogger jobLogger = jobContextWrapper.getJobLogger();
74: jobLogger.setClassLog(LOG);
75:
76: jobLogger.info("Opening cursor...");
77:
78: //
79: // Job Id and JobTarget status filtering
80: JobTargetQuery query = jobTargetFactory.newQuery(jobContextWrapper.getScopeId());
81:
82: AndPredicate andPredicate = query.andPredicate(
83: query.attributePredicate(JobTargetAttributes.JOB_ID, jobContextWrapper.getJobId())
84: );
85:
86: //
87: // Step index filtering
88: stepIndexFiltering(jobContextWrapper, stepContextWrapper, query, andPredicate);
89:
90: //
91: // Filter selected target
92: targetSublistFiltering(jobContextWrapper, query, andPredicate);
93:
94: //
95: // Query the targets
96: query.setPredicate(andPredicate);
97:
98: JobTargetListResult jobTargets = KapuaSecurityUtils.doPrivileged(() -> jobTargetService.query(query));
99:
100: //
101: // Wrap the JobTargets in a wrapper object to store additional informations
102: jobTargets.getItems().forEach(jt -> wrappedJobTargets.add(new JobTargetWrapper(jt)));
103:
104: jobLogger.info("Opening cursor... DONE!");
105: }
106:
107: @Override
108: public Object readItem() throws Exception {
109: JobContextWrapper jobContextWrapper = new JobContextWrapper(jobContext);
110:
111: JobLogger jobLogger = jobContextWrapper.getJobLogger();
112: jobLogger.setClassLog(LOG);
113:
114: jobLogger.info("Reading item...");
115:
116: JobTargetWrapper currentWrappedJobTarget = null;
117:• if (jobTargetIndex < wrappedJobTargets.size()) {
118: currentWrappedJobTarget = wrappedJobTargets.get(jobTargetIndex++);
119: }
120:
121: jobLogger.info("Reading item... DONE!");
122: return currentWrappedJobTarget;
123: }
124:
125: /**
126: * This method apply {@link AttributePredicate}s according to the parameters contained into the {@link JobContextWrapper} and {@link StepContextWrapper}.
127: * <p>
128: * When no {@link JobStepIndex} is specified, the methods selects all targets that are set to the current {@link StepContextWrapper#getStepIndex()} and that don't have the
129: * {@link JobTargetStatus} set to {@link JobTargetStatus#PROCESS_OK}.
130: * <p>
131: * When a {@link JobStepIndex} is specified, the methods ignores all targets until the {@link StepContextWrapper#getStepIndex()} doesn't match the {@link JobContextWrapper#getFromStepIndex()}.
132: * Then the {@link JobTarget}s will be selected as regularly.
133: *
134: * @param jobContextWrapper The {@link JobContextWrapper} from which extract data
135: * @param stepContextWrapper The {@link StepContextWrapper} from which extract data
136: * @param query The {@link KapuaQuery} to perform
137: * @param andPredicate The {@link org.eclipse.kapua.model.query.predicate.AndPredicate} where to apply {@link org.eclipse.kapua.model.query.predicate.QueryPredicate}
138: * @since 1.0.0
139: */
140: protected void stepIndexFiltering(JobContextWrapper jobContextWrapper, StepContextWrapper stepContextWrapper, KapuaQuery query, AndPredicate andPredicate) {
141:
142: // Select all targets that aren't in PROCESS_OK status
143: andPredicate.and(query.attributePredicate(JobTargetAttributes.STATUS, JobTargetStatus.PROCESS_OK, AttributePredicate.Operator.NOT_EQUAL));
144:
145: // Select all target that are at the current step
146: andPredicate.and(query.attributePredicate(JobTargetAttributes.STEP_INDEX, stepContextWrapper.getStepIndex()));
147:
148: // Select all targets at or after the given fromStepIndex (if specified)
149:• if (jobContextWrapper.getFromStepIndex() != null) {
150: andPredicate.and(query.attributePredicate(JobTargetAttributes.STEP_INDEX, jobContextWrapper.getFromStepIndex(), AttributePredicate.Operator.GREATER_THAN_OR_EQUAL));
151: }
152: }
153:
154: /**
155: * This method apply {@link AttributePredicate}s according to the parameters contained into the {@link JobContextWrapper#getTargetSublist()}.
156: * <p>
157: * If the {@link JobContextWrapper#getTargetSublist()} has one or more {@link org.eclipse.kapua.model.id.KapuaId}s they will be added to the
158: * {@link org.eclipse.kapua.model.query.predicate.AndPredicate} to select only given {@link JobTarget}.
159: *
160: * @param jobContextWrapper The {@link JobContextWrapper} from which extract data
161: * @param query The {@link KapuaQuery} to perform
162: * @param andPredicate The {@link org.eclipse.kapua.model.query.predicate.AndPredicate} where to apply {@link org.eclipse.kapua.model.query.predicate.QueryPredicate}
163: * @since 1.0.0
164: */
165: protected void targetSublistFiltering(JobContextWrapper jobContextWrapper, KapuaQuery query, AndPredicate andPredicate) {
166:• if (!jobContextWrapper.getTargetSublist().isEmpty()) {
167: andPredicate.and(query.attributePredicate(JobTargetAttributes.ENTITY_ID, jobContextWrapper.getTargetSublist().toArray()));
168: }
169: }
170:
171: }