Skip to content

Package: KuraDeviceCallImpl

KuraDeviceCallImpl

nameinstructionbranchcomplexitylinemethod
KuraDeviceCallImpl()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
borrowClient(KuraRequestMessage)
M: 39 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
cancel(KuraRequestMessage, Long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
create(KuraRequestMessage, Long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
delete(KuraRequestMessage, Long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
execute(KuraRequestMessage, Long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getBaseMessageClass()
M: 2 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getTranslator(Class, Class)
M: 24 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
lambda$borrowClient$0(KuraRequestMessage)
M: 43 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
options(KuraRequestMessage, Long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
read(KuraRequestMessage, Long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
sendInternal(KuraRequestMessage, Long)
M: 91 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 25 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 20 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
submit(KuraRequestMessage, Long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
write(KuraRequestMessage, Long)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: * Red Hat Inc
13: *******************************************************************************/
14: package org.eclipse.kapua.service.device.call.kura;
15:
16: import com.google.common.base.Strings;
17: import org.checkerframework.checker.nullness.qual.Nullable;
18: import org.eclipse.kapua.KapuaEntityNotFoundException;
19: import org.eclipse.kapua.KapuaException;
20: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
21: import org.eclipse.kapua.commons.util.RandomUtils;
22: import org.eclipse.kapua.locator.KapuaLocator;
23: import org.eclipse.kapua.message.Message;
24: import org.eclipse.kapua.service.account.Account;
25: import org.eclipse.kapua.service.account.AccountService;
26: import org.eclipse.kapua.service.device.call.DeviceCall;
27: import org.eclipse.kapua.service.device.call.exception.DeviceCallSendException;
28: import org.eclipse.kapua.service.device.call.exception.DeviceCallTimeoutException;
29: import org.eclipse.kapua.service.device.call.kura.exception.KuraDeviceCallErrorCodes;
30: import org.eclipse.kapua.service.device.call.kura.exception.KuraDeviceCallException;
31: import org.eclipse.kapua.service.device.call.message.kura.KuraMessage;
32: import org.eclipse.kapua.service.device.call.message.kura.app.request.KuraRequestChannel;
33: import org.eclipse.kapua.service.device.call.message.kura.app.request.KuraRequestMessage;
34: import org.eclipse.kapua.service.device.call.message.kura.app.request.KuraRequestPayload;
35: import org.eclipse.kapua.service.device.call.message.kura.app.response.KuraResponseMessage;
36: import org.eclipse.kapua.service.device.registry.Device;
37: import org.eclipse.kapua.service.device.registry.DeviceRegistryService;
38: import org.eclipse.kapua.translator.Translator;
39: import org.eclipse.kapua.translator.exception.TranslatorNotFoundException;
40: import org.eclipse.kapua.transport.TransportClientFactory;
41: import org.eclipse.kapua.transport.TransportFacade;
42: import org.eclipse.kapua.transport.exception.TransportClientGetException;
43: import org.eclipse.kapua.transport.exception.TransportException;
44: import org.eclipse.kapua.transport.exception.TransportTimeoutException;
45: import org.eclipse.kapua.transport.message.TransportMessage;
46:
47: import javax.validation.constraints.NotNull;
48: import java.util.Date;
49: import java.util.HashMap;
50: import java.util.Map;
51: import java.util.Random;
52:
53: /**
54: * {@link DeviceCall} {@link Kura} implementation.
55: *
56: * @since 1.0.0
57: */
58: public class KuraDeviceCallImpl implements DeviceCall<KuraRequestMessage, KuraResponseMessage> {
59:
60: private static final Random RANDOM = RandomUtils.getInstance();
61:
62: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
63:
64: private static final AccountService ACCOUNT_SERVICE = LOCATOR.getService(AccountService.class);
65:
66: private static final DeviceRegistryService DEVICE_REGISTRY_SERVICE = LOCATOR.getService(DeviceRegistryService.class);
67:
68: private static final TransportClientFactory TRANSPORT_CLIENT_FACTORY = LOCATOR.getFactory(TransportClientFactory.class);
69:
70: @Override
71: public KuraResponseMessage create(@NotNull KuraRequestMessage requestMessage, @Nullable Long timeout)
72: throws DeviceCallTimeoutException, DeviceCallSendException, TransportException {
73: return sendInternal(requestMessage, timeout);
74: }
75:
76: @Override
77: public KuraResponseMessage read(@NotNull KuraRequestMessage requestMessage, @Nullable Long timeout)
78: throws DeviceCallTimeoutException, DeviceCallSendException, TransportException {
79: return sendInternal(requestMessage, timeout);
80: }
81:
82: @Override
83: public KuraResponseMessage options(@NotNull KuraRequestMessage requestMessage, @Nullable Long timeout)
84: throws DeviceCallTimeoutException, DeviceCallSendException, TransportException {
85: return sendInternal(requestMessage, timeout);
86: }
87:
88: @Override
89: public KuraResponseMessage delete(@NotNull KuraRequestMessage requestMessage, @Nullable Long timeout)
90: throws DeviceCallTimeoutException, DeviceCallSendException, TransportException {
91: return sendInternal(requestMessage, timeout);
92: }
93:
94: @Override
95: public KuraResponseMessage execute(@NotNull KuraRequestMessage requestMessage, @Nullable Long timeout)
96: throws DeviceCallTimeoutException, DeviceCallSendException, TransportException {
97: return sendInternal(requestMessage, timeout);
98: }
99:
100: @Override
101: public KuraResponseMessage write(@NotNull KuraRequestMessage requestMessage, @Nullable Long timeout)
102: throws DeviceCallTimeoutException, DeviceCallSendException, TransportException {
103: return sendInternal(requestMessage, timeout);
104: }
105:
106: @Override
107: public KuraResponseMessage submit(KuraRequestMessage requestMessage, @Nullable Long timeout)
108: throws DeviceCallTimeoutException, DeviceCallSendException, TransportException {
109: return sendInternal(requestMessage, timeout);
110: }
111:
112: @Override
113: public KuraResponseMessage cancel(KuraRequestMessage requestMessage, @Nullable Long timeout)
114: throws DeviceCallTimeoutException, DeviceCallSendException, TransportException {
115: return sendInternal(requestMessage, timeout);
116: }
117:
118: @Override
119: public Class<KuraMessage> getBaseMessageClass() {
120: return KuraMessage.class;
121: }
122:
123: //
124: // Private methods
125: //
126:
127: /**
128: * Sends the {@link KuraRequestMessage} and waits for the response if the {@code timeout} is given.
129: *
130: * @param requestMessage The {@link KuraRequestMessage} to send.
131: * @param timeout The timeout of waiting the {@link KuraResponseMessage}.
132: * @return The {@link KuraResponseMessage} received.
133: * @throws DeviceCallTimeoutException if waiting of the response goes on timeout.
134: * @throws DeviceCallSendException if sending the request produces any error.
135: * @since 1.0.0
136: */
137: protected KuraResponseMessage sendInternal(@NotNull KuraRequestMessage requestMessage, @Nullable Long timeout) throws DeviceCallTimeoutException, DeviceCallSendException, TransportException {
138:
139: KuraResponseMessage response = null;
140: try {
141: //
142: // Borrow a TransportClient
143: try (TransportFacade transportFacade = borrowClient(requestMessage)) {
144: //
145: // Get Kura to transport translator for the request and vice versa
146: Translator<KuraRequestMessage, TransportMessage<?, ?>> translatorKuraTransport = getTranslator(requestMessage.getClass(), transportFacade.getMessageClass());
147: Translator<TransportMessage<?, ?>, KuraResponseMessage> translatorTransportKura = getTranslator(transportFacade.getMessageClass(), KuraResponseMessage.class);
148:
149: //
150: // Make the request
151: // Add requestId and requesterClientId to both payload and channel if response is expected
152: // Note: Adding to both payload and channel to let the translator choose what to do base on the transport used.
153: KuraRequestChannel requestChannel = requestMessage.getChannel();
154: KuraRequestPayload requestPayload = requestMessage.getPayload();
155: requestPayload.setRequesterClientId(transportFacade.getClientId());
156:
157:• if (timeout != null) {
158: String requestId = String.valueOf(RANDOM.nextLong());
159: requestChannel.setRequestId(requestId);
160: requestChannel.setRequesterClientId(transportFacade.getClientId());
161:
162: requestPayload.setRequestId(requestId);
163: }
164:
165: //
166: // Do send
167: // Set current timestamp
168: requestMessage.setTimestamp(new Date());
169:
170: // Send
171: TransportMessage<?, ?> transportRequestMessage = translatorKuraTransport.translate(requestMessage);
172: TransportMessage<?, ?> transportResponseMessage = transportFacade.sendSync(transportRequestMessage, timeout);
173:
174: // Translate response
175:• if (timeout != null) {
176: response = translatorTransportKura.translate(transportResponseMessage);
177: }
178: }
179: } catch (TransportTimeoutException te) {
180: throw new DeviceCallTimeoutException(te, timeout);
181: } catch (TransportException te) {
182: throw te;
183: } catch (KapuaException se) {
184: throw new DeviceCallSendException(se, requestMessage);
185: }
186:
187: return response;
188: }
189:
190:
191: /**
192: * Picks a {@link TransportFacade} to send the {@link KuraResponseMessage}.
193: *
194: * @param kuraRequestMessage The {@link KuraRequestMessage} to send.
195: * @return The {@link TransportFacade} to use to send the {@link KuraResponseMessage}.
196: * @throws TransportClientGetException If getting the {@link TransportFacade} causes an {@link Exception}.
197: * @throws TransportException For all other errors
198: * @since 1.0.0
199: */
200: protected TransportFacade<?, ?, ?, ?> borrowClient(KuraRequestMessage kuraRequestMessage) throws TransportException {
201: String serverIp = null;
202: try {
203: serverIp = KapuaSecurityUtils.doPrivileged(() -> {
204: Account account = ACCOUNT_SERVICE.findByName(kuraRequestMessage.getChannel().getScope());
205:
206:• if (account == null) {
207: throw new KapuaEntityNotFoundException(Account.TYPE, kuraRequestMessage.getChannel().getScope());
208: }
209:
210: Device device = DEVICE_REGISTRY_SERVICE.findByClientId(account.getId(), kuraRequestMessage.getChannel().getClientId());
211:• if (device == null) {
212: throw new KapuaEntityNotFoundException(Device.TYPE, kuraRequestMessage.getChannel().getClientId());
213: }
214:
215: return device.getConnection().getServerIp();
216: });
217:
218:• if (Strings.isNullOrEmpty(serverIp)) {
219: throw new TransportClientGetException(serverIp);
220: }
221:
222: Map<String, Object> configParameters = new HashMap<>(1);
223: configParameters.put("serverAddress", serverIp);
224:
225: return TRANSPORT_CLIENT_FACTORY.getFacade(configParameters);
226: } catch (TransportException tce) {
227: throw tce;
228: } catch (Exception e) {
229: throw new TransportClientGetException(e, serverIp);
230: }
231: }
232:
233: /**
234: * Gets the translator for the given {@link Message} types.
235: *
236: * @param from The {@link Message} type from which to translate.
237: * @param to The {@link Message} type to which to translate.
238: * @param <F> The {@link Message} {@code class}from which to translate.
239: * @param <T> The {@link Message} {@code class} to which to translate.
240: * @return The {@link Translator} found.
241: * @throws KuraDeviceCallException If error occurs while looking for the {@link Translator}.
242: * @since 1.0.0
243: */
244: protected <F extends Message<?, ?>, T extends Message<?, ?>> Translator<F, T> getTranslator(Class<F> from, Class<T> to) throws KuraDeviceCallException {
245: Translator<F, T> translator;
246: try {
247: translator = Translator.getTranslatorFor(from, to);
248: } catch (TranslatorNotFoundException e) {
249: throw new KuraDeviceCallException(KuraDeviceCallErrorCodes.CALL_ERROR, e, from, to);
250: }
251: return translator;
252: }
253: }