Package: AbstractBlockDriver

AbstractBlockDriver

nameinstructionbranchcomplexitylinemethod
AbstractBlockDriver()
M: 0 C: 3
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
beforeAggregation(Object, Mode, BlockTaskAggregator)
M: 1 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
createPreparedRead(List, List)
M: 0 C: 7
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
getReadMinimumGapSizeForDomain(Object)
M: 2 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$0(AbstractBlockDriver.Pair)
M: 0 C: 3
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
lambda$1(HashSet, AbstractBlockDriver.Pair)
M: 0 C: 14
100%
M: 0 C: 2
100%
M: 0 C: 2
100%
M: 0 C: 3
100%
M: 0 C: 1
100%
lambda$2(AbstractBlockDriver.Pair)
M: 0 C: 4
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
lambda$3(HashSet, Mode, ArrayList, Map.Entry)
M: 0 C: 58
100%
M: 0 C: 4
100%
M: 0 C: 3
100%
M: 0 C: 12
100%
M: 0 C: 1
100%
optimize(List, Mode)
M: 0 C: 49
100%
M: 0 C: 2
100%
M: 0 C: 2
100%
M: 0 C: 12
100%
M: 0 C: 1
100%
prepareRead(List)
M: 0 C: 37
100%
M: 0 C: 2
100%
M: 0 C: 2
100%
M: 0 C: 6
100%
M: 0 C: 1
100%
read(List)
M: 0 C: 39
100%
M: 0 C: 2
100%
M: 0 C: 2
100%
M: 0 C: 8
100%
M: 0 C: 1
100%
registerChannelListener(Map, ChannelListener)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
runTask(BlockTask)
M: 5 C: 4
44%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 1 C: 3
75%
M: 0 C: 1
100%
static {...}
M: 0 C: 4
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
unregisterChannelListener(ChannelListener)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
write(List)
M: 0 C: 39
100%
M: 0 C: 2
100%
M: 0 C: 2
100%
M: 0 C: 8
100%
M: 0 C: 1
100%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017 Eurotech and/or its affiliates and others
3: *
4: * All rights reserved. This program and the accompanying materials
5: * are made available under the terms of the Eclipse Public License v1.0
6: * which accompanies this distribution, and is available at
7: * http://www.eclipse.org/legal/epl-v10.html
8: *
9: * Contributors:
10: * Eurotech
11: *******************************************************************************/
12:
13: package org.eclipse.kura.driver.block.task;
14:
15: import static java.util.Objects.requireNonNull;
16:
17: import java.util.ArrayList;
18: import java.util.Collections;
19: import java.util.HashMap;
20: import java.util.HashSet;
21: import java.util.List;
22: import java.util.Map;
23: import java.util.function.Function;
24: import java.util.stream.Collectors;
25: import java.util.stream.Stream;
26:
27: import org.eclipse.kura.KuraErrorCode;
28: import org.eclipse.kura.KuraException;
29: import org.eclipse.kura.KuraRuntimeException;
30: import org.eclipse.kura.channel.ChannelFlag;
31: import org.eclipse.kura.channel.ChannelRecord;
32: import org.eclipse.kura.channel.ChannelStatus;
33: import org.eclipse.kura.channel.listener.ChannelListener;
34: import org.eclipse.kura.driver.Driver;
35: import org.eclipse.kura.driver.PreparedRead;
36: import org.eclipse.kura.driver.block.Block;
37: import org.eclipse.kura.driver.block.BlockFactory;
38: import org.slf4j.Logger;
39: import org.slf4j.LoggerFactory;
40:
41: /**
42: * <p>
43: * Provides an helper class that can be extended for implementing a {@link Driver} that supports the aggregation of
44: * requests using a {@link BlockTaskAggregator}.
45: * </p>
46: * <p>
47: * This class introduces the concept of domain. In general tasks that operate on different domains cannot be aggregated
48: * together. Examples of such domains can be a (unit id, function code) pair in the Modbus protocol or a data block
49: * (DB) in the case of the S7Comm protocol.
50: * </p>
51: * <p>
52: * Implementors of this class must provide an implementation for the
53: * {@link #getTaskFactoryForDomain(Object, Mode)} and {@link #toTasks(List, Mode)}
54: * methods, see the description of the two methods for more details:
55: * </p>
56: * <ul>
57: * <li>{@link #getTaskFactoryForDomain(Object, Mode)} must provide a {@link BlockFactory} that can be used for creating
58: * {@link ToplevelBlockTask} instances responsible of implementing the data transfer operations for the specified
59: * domain</li>
60: * <li>{@link #toTasks(List, Mode)} must convert the configuration contained in the provided list of
61: * {@link ChannelRecord} instances into a {@link Stream} of (domain, {@link BlockTask}) pairs.</li>
62: * </ul>
63: * <p>
64: * This class provides a default implementation for the {@link #read(List)}, {@link #write(List)} and
65: * {@link #prepareRead(List)} methods of the {@link Driver} interface.
66: * </p>
67: *
68: * @param <T>
69: * the type of the domain, can be any type suitable for being used as an {@link HashMap} key
70: */
71: public abstract class AbstractBlockDriver<T> implements Driver {
72:
73: private static final Logger logger = LoggerFactory.getLogger(AbstractBlockDriver.class);
74:
75: /**
76: * This method must provide a {@link BlockFactory} that can be used for creating {@link ToplevelBlockTask} instances
77: * responsible of implementing the I/O operations for the specified domain.
78: *
79: * @param domain
80: * the domain
81: * @param mode
82: * the {@link Mode} for the returned {@link ToplevelBlockTask}
83: * @return the created {@link ToplevelBlockTask}
84: */
85: protected abstract BlockFactory<ToplevelBlockTask> getTaskFactoryForDomain(T domain, Mode mode);
86:
87: /**
88: * <p>
89: * This method must return a {@link Stream} that yields (domain, {@link BlockTask}) pairs obtained by converting the
90: * {@link ChannelRecord} instances contained in the provided list.
91: * </p>
92: * <p>
93: * The {@link BlockTask} instances obtained from the stream will be aggregated and assigned to a
94: * {@link ToplevelBlockTask} parent. Tasks generated by the {@link Stream} returned by this method should not
95: * generally perform I/O operation but use the {@link Buffer} of their parent to implement their operations instead.
96: * </p>
97: * <p>
98: * If the specified mode is {@link Mode#READ}, only tasks having {@link Mode#READ} must be produces, if the
99: * specified mode is {@link Mode#WRITE}, the returned stream is allowed to produce tasks that are either in
100: * {@link Mode#WRITE} or {@link Mode#UPDATE} modes.
101: * </p>
102: *
103: *
104: * @param records
105: * the list of {@link ChannelRecord} instances to be converted into {@link BlockTask} instances
106: * @param mode
107: * the mode, can be either {@link Mode#READ} or {@link Mode#WRITE}
108: * @return a {@link Stream} yielding the converted tasks
109: */
110: protected abstract Stream<Pair<T, BlockTask>> toTasks(List<ChannelRecord> records, Mode mode);
111:
112: /**
113: * Returns the minimum gap size parameter that will be used to aggregate tasks in {@link Mode#READ} for the
114: * specified domain. The default is 0. Tasks in {@link Mode#WRITE} mode will always be aggregated using 0 as the
115: * value of the {@code minimumGapSize} parameter.
116: *
117: * @param domain
118: * the domain
119: * @return the minimum gap size for the provided domain
120: */
121: protected int getReadMinimumGapSizeForDomain(T domain) {
122: return 0;
123: }
124:
125: /**
126: * This method is called immediately before an aggregation is performed for the specific domain and mode. This
127: * method can be overridden by implementors in order to customize the {@link BlockTaskAggregator} provided as
128: * argument, for example by adding prohibited blocks. The default implementation is no-op.
129: *
130: * @param domain
131: * the domain
132: * @param mode
133: * the {@link Mode} of the tasks to be aggregated, can be either {@link Mode#READ} or {@link Mode#WRITE}
134: * @param aggregator
135: * the {@link BlockTaskAggregator} that will perform the aggregation
136: */
137: protected void beforeAggregation(T domain, Mode mode, BlockTaskAggregator aggregator) {
138: }
139:
140: /**
141: * Perform the following operations:
142: * <ol>
143: * <li>The provided list of {@link ChannelRecods} instances are converted into {@link BlockTask} instances using the
144: * {@link #toTasks(List, Mode)} method.</li>
145: * <li>The resulting {@link BlockTask} instances are grouped by domain.</li>
146: * <li>An aggregation process is performed for each domain. If the {@link Mode#WRITE} is provided as parameter, and
147: * tasks in {@link Mode#UPDATE} mode are found in a domain, the aggregation will be performed using a
148: * {@link UpdateBlockTaskAggregator}. Otherwise a {@link BlockTaskAggregator} will be used.<br>
149: * The {@link BlockFactory} instances used for the aggregation will be obtained using the
150: * {@link #getTaskFactoryForDomain(Object, Mode)} method.</li>
151: * <li>The {@link ToplevelBlockTask} instances for all domains will be returned in the result list.</li>
152: * </ol>
153: *
154: * @param records
155: * the {@link ChannelRecord} instances to be converted to {@link BlockTask} instances.
156: * @param mode
157: * the mode
158: * @return the list of {@link BlockTask} instances resulting from the aggregation.
159: * @throws KuraException
160: * if any exception is thrown during the process
161: */
162: protected List<BlockTask> optimize(List<ChannelRecord> records, Mode mode) throws KuraException {
163: try {
164: final ArrayList<BlockTask> resultTasks = new ArrayList<>();
165: final HashSet<T> domainsWithUpdateTasks = new HashSet<>();
166:
167: final Function<Pair<T, BlockTask>, T> classifier;
168:• if (mode == Mode.READ) {
169: classifier = pair -> pair.first;
170: } else {
171: classifier = pair -> {
172:• if (pair.second.getMode() == Mode.UPDATE) {
173: domainsWithUpdateTasks.add(pair.first);
174: }
175: return pair.first;
176: };
177: }
178:
179: final Map<T, ArrayList<Block>> groupedTasks = toTasks(records, mode).collect(Collectors.groupingBy(
180: classifier, Collectors.mapping(pair -> pair.second, Collectors.toCollection(ArrayList::new))));
181:
182: groupedTasks.entrySet().forEach(entry -> {
183: final T domain = entry.getKey();
184: final BlockTaskAggregator aggregator;
185:• if (domainsWithUpdateTasks.contains(domain)) {
186: aggregator = new UpdateBlockTaskAggregator(entry.getValue(),
187: getTaskFactoryForDomain(domain, Mode.READ), getTaskFactoryForDomain(domain, Mode.WRITE));
188: aggregator.setMinimumGapSize(getReadMinimumGapSizeForDomain(domain));
189: } else {
190: aggregator = new BlockTaskAggregator(entry.getValue(), getTaskFactoryForDomain(domain, mode));
191:• if (mode == Mode.READ) {
192: aggregator.setMinimumGapSize(getReadMinimumGapSizeForDomain(domain));
193: }
194: }
195: beforeAggregation(domain, mode, aggregator);
196: aggregator.stream().forEach(resultTasks::add);
197: });
198:
199: return resultTasks;
200: } catch (Exception e) {
201: throw new KuraException(KuraErrorCode.INVALID_PARAMETER, e);
202: }
203: }
204:
205: /**
206: * Executes the provided {@link BlockTask}. Implementors can override this method, for example for catching any
207: * exception thrown by the task and implement error handling.
208: *
209: * @param task
210: * the {@link BlockTask} to be run
211: */
212: protected void runTask(BlockTask task) {
213: try {
214: task.run();
215: } catch (Exception e) {
216: logger.warn("Task execution failed", e);
217: }
218: }
219:
220: @Override
221: public void registerChannelListener(final Map<String, Object> channelConfig, final ChannelListener listener)
222: throws ConnectionException {
223: throw new KuraRuntimeException(KuraErrorCode.OPERATION_NOT_SUPPORTED);
224: }
225:
226: @Override
227: public void unregisterChannelListener(final ChannelListener listener) throws ConnectionException {
228: throw new KuraRuntimeException(KuraErrorCode.OPERATION_NOT_SUPPORTED);
229: }
230:
231: @Override
232: public synchronized void read(final List<ChannelRecord> records) throws ConnectionException {
233: connect();
234: try {
235: optimize(records, Mode.READ).forEach(this::runTask);
236: } catch (Exception e) {
237: logger.warn("Unexpected exception during read", e);
238:• for (ChannelRecord record : records) {
239: record.setChannelStatus(new ChannelStatus(ChannelFlag.FAILURE, e.getMessage(), e));
240: record.setTimestamp(System.currentTimeMillis());
241: }
242: }
243: }
244:
245: @Override
246: public synchronized void write(final List<ChannelRecord> records) throws ConnectionException {
247: connect();
248: try {
249: optimize(records, Mode.WRITE).forEach(this::runTask);
250: } catch (Exception e) {
251: logger.warn("Unexpected exception during write", e);
252:• for (ChannelRecord record : records) {
253: record.setChannelStatus(new ChannelStatus(ChannelFlag.FAILURE, e.getMessage(), e));
254: record.setTimestamp(System.currentTimeMillis());
255: }
256: }
257: }
258:
259: protected PreparedRead createPreparedRead(List<ChannelRecord> records, List<BlockTask> tasks) {
260: return new BlockPreparedRead(records, tasks);
261: }
262:
263: @Override
264: public synchronized PreparedRead prepareRead(List<ChannelRecord> records) {
265: try {
266: return createPreparedRead(records, optimize(records, Mode.READ));
267: } catch (KuraException e) {
268:• for (ChannelRecord record : records) {
269: record.setChannelStatus(new ChannelStatus(ChannelFlag.FAILURE, e.getMessage(), e));
270: record.setTimestamp(System.currentTimeMillis());
271: }
272: return createPreparedRead(records, Collections.emptyList());
273: }
274: }
275:
276: public class BlockPreparedRead implements PreparedRead {
277:
278: private final List<ChannelRecord> records;
279: private final List<BlockTask> tasks;
280:
281: public BlockPreparedRead(List<ChannelRecord> records, List<BlockTask> tasks) {
282: this.records = records;
283: this.tasks = tasks;
284: }
285:
286: @Override
287: public void close() throws Exception {
288: }
289:
290: @Override
291: public List<ChannelRecord> execute() throws ConnectionException, KuraException {
292: synchronized (AbstractBlockDriver.this) {
293: connect();
294: for (BlockTask task : this.tasks) {
295: runTask(task);
296: }
297: return this.records;
298: }
299: }
300:
301: @Override
302: public List<ChannelRecord> getChannelRecords() {
303: return this.records;
304: }
305:
306: }
307:
308: public static final class Pair<U, V> {
309:
310: private final U first;
311: private final V second;
312:
313: public Pair(U first, V second) {
314: requireNonNull(first);
315: requireNonNull(second);
316: this.first = first;
317: this.second = second;
318: }
319:
320: public U getFirst() {
321: return first;
322: }
323:
324: public V getSecond() {
325: return second;
326: }
327: }
328: }