Skip to content

Package: MqttDevice$1

MqttDevice$1

nameinstructionbranchcomplexitylinemethod
connectionLost(Throwable)
M: 21 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
deliveryComplete(IMqttDeliveryToken)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
messageArrived(String, MqttMessage)
M: 49 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
{...}
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech
12: *******************************************************************************/
13: package org.eclipse.kapua.service.device.registry.steps;
14:
15: import org.eclipse.kapua.qa.common.Suppressed;
16: import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
17: import org.eclipse.paho.client.mqttv3.MqttCallback;
18: import org.eclipse.paho.client.mqttv3.MqttClient;
19: import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
20: import org.eclipse.paho.client.mqttv3.MqttException;
21: import org.eclipse.paho.client.mqttv3.MqttMessage;
22: import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
23: import org.slf4j.Logger;
24: import org.slf4j.LoggerFactory;
25:
26: import java.util.HashMap;
27: import java.util.Map;
28: import java.util.concurrent.atomic.AtomicInteger;
29:
30: /**
31: * Device that connects to MQTT broker and listens for messages as kapua-sys user
32: * and allows another client to connect and publish and subscribe to messages.
33: * <p>
34: * Used in Cucumber for writing Gherkin scenarios for broker service.
35: */
36: public class MqttDevice {
37:
38: private static final AtomicInteger COUNT = new AtomicInteger(0);
39:
40: /**
41: * Logger.
42: */
43: private static final Logger logger = LoggerFactory.getLogger(MqttDevice.class);
44:
45: /**
46: * URI of mqtt broker.
47: */
48: private static final String BROKER_URI = "tcp://localhost:1883";
49:
50: /**
51: * Listening mqtt client name.
52: */
53: private static final java.lang.String LISTENER_NAME = "ListenerClient_";
54:
55: /**
56: * System user under which Device is listening for messages.
57: */
58: private static final String SYS_USER = "kapua-sys";
59:
60: /**
61: * System user password while connecting to broker.
62: */
63: private static final String SYS_PASSWORD = "kapua-password";
64:
65: /**
66: * Default quality of service - mqtt.
67: */
68: // TODO switch to qos 1????
69: private static final int DEFAULT_QOS = 0;
70:
71: /**
72: * Default retain flag is false.
73: */
74: private static final boolean DEFAULT_RETAIN = false;
75:
76: /**
77: * No filter on topic.
78: */
79: private static final String NO_TOPIC_FILTER = "#";
80:
81: /**
82: * Map of Mqtt client for sending messages.
83: */
84: private Map<String, MqttClient> mqttClients;
85:
86: /**
87: * Mqtt client for listening from messages.
88: */
89: private MqttClient subscribedClient;
90:
91: private String clientId;
92:
93: /**
94: * Map for storing received messages that clients are listening to.
95: * It is Map of Maps, first key is clientId. Key in second map is
96: * topic on which message was received.
97: */
98: private Map<String, Map<String, String>> clientReceivedMqttMessage;
99:
100: /**
101: * Map for storing received messages that Listener is listening to.
102: */
103: private Map<String, String> listenerReceivedMqttMessage;
104:
105: public MqttDevice() {
106:
107: mqttClients = new HashMap<>();
108: clientId = LISTENER_NAME + COUNT.incrementAndGet();
109: }
110:
111: /**
112: * Connect subscriber to mqtt broker.
113: */
114: public void mqttSubscriberConnect() {
115:
116: MqttConnectOptions subscriberOpts = new MqttConnectOptions();
117: subscriberOpts.setUserName(SYS_USER);
118: subscriberOpts.setPassword(SYS_PASSWORD.toCharArray());
119: try {
120: subscribedClient = new MqttClient(BROKER_URI, clientId,
121: new MemoryPersistence());
122: subscribedClient.connect(subscriberOpts);
123: subscribedClient.subscribe(NO_TOPIC_FILTER, DEFAULT_QOS);
124: } catch (MqttException e) {
125: e.printStackTrace();
126: }
127:
128: subscribedClient.setCallback(new MqttCallback() {
129: @Override
130: public void connectionLost(Throwable throwable) {
131: logger.info("(Client {}) Listener connection to broker lost. {}", clientId, throwable.getMessage(), throwable);
132: }
133:
134: @Override
135: public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
136: logger.info("(Client {}) - Message arrived in Listener with topic: {}", clientId, topic);
137: // exclude the connect messages sent by the broker (that may affect the tests)
138: // this messages can be received by this callback before the listenerReceivedMqttMessage is properly initialized. So a check for null should be performed
139: // TODO manage this client in a better way, so the list of the received messages should be internal and exposed as getter to the caller.
140:• if (listenerReceivedMqttMessage != null) {
141:• if (!topic.contains("MQTT/CONNECT") || topic.contains("MQTT/DISCONNECT")) {
142: listenerReceivedMqttMessage.clear();
143: listenerReceivedMqttMessage.put(topic, new String(mqttMessage.getPayload()));
144: } else {
145: logger.info("(Client {}) - Received CONNECT/DISCONNECT message. The message will be discarded!", clientId);
146: }
147: } else {
148: logger.info("(Client {}) - Received message map is null. The message is not stored!", clientId);
149: }
150: }
151:
152: @Override
153: public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
154: logger.info("Listener message delivery complete.");
155: }
156: });
157: }
158:
159: /**
160: * Disconnect Device mqtt subscriber that listens on mqtt broker.
161: */
162: public void mqttSubscriberDisconnect() {
163: logger.info("(Client {}) - Unsubscribing", clientId);
164: try {
165: try (final Suppressed<Exception> s = Suppressed.withException()) {
166: s.run(subscribedClient::disconnect);
167: s.run(subscribedClient::close);
168: }
169: } catch (final Exception e) {
170: logger.warn("Failed during cleanup of subscriber Paho resources", e);
171: }
172: }
173:
174:
175: /**
176: * Connect mqtt client that sends and listens to messages.
177: *
178: * @param clientId mqtt client identifier
179: * @param userName user with which client connects to broker
180: * @param password password for connection
181: * @param topicFilter filter for topics client is listening to
182: * @throws MqttException
183: */
184: public void mqttClientConnect(String clientId, String userName, String password, String topicFilter)
185: throws MqttException {
186:
187: MqttConnectOptions clientOpts = new MqttConnectOptions();
188: clientOpts.setUserName(userName);
189: clientOpts.setPassword(password.toCharArray());
190: MqttClient mqttClient = null;
191: mqttClient = new MqttClient(BROKER_URI, clientId,
192: new MemoryPersistence());
193: mqttClient.connect(clientOpts);
194: if ((topicFilter != null) && (topicFilter.length() > 0)) {
195: mqttClient.subscribe(topicFilter, DEFAULT_QOS);
196: }
197:
198: mqttClient.setCallback(new MqttCallback() {
199:
200: @Override
201: public void connectionLost(Throwable throwable) {
202: logger.info("Client connection to broker lost.");
203: }
204:
205: @Override
206: public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
207: logger.info("Message arrived in client with topic: {}", topic);
208:
209: clientReceivedMqttMessage.clear();
210: Map<String, String> topicPayload = new HashMap<>();
211: topicPayload.put(topic, new String(mqttMessage.getPayload()));
212: clientReceivedMqttMessage.put(clientId, topicPayload);
213: }
214:
215: @Override
216: public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
217: logger.info("Client message delivery complete.");
218: }
219: });
220:
221: mqttClients.put(clientId, mqttClient);
222: }
223:
224: /**
225: * Disconnect Device mqtt client that listens and sends messages to mqtt broker.
226: */
227: public void mqttClientsDisconnect() {
228: logger.info("(Client {}) - Disconnecting", clientId);
229: for (Map.Entry<String, MqttClient> mqttClient : mqttClients.entrySet()) {
230: try {
231: try (final Suppressed<Exception> s = Suppressed.withException()) {
232: s.run(mqttClient.getValue()::disconnect);
233: s.run(mqttClient.getValue()::close);
234: }
235: } catch (final Exception e) {
236: logger.warn("Failed during cleanup of client Paho resources", e);
237: }
238: }
239: }
240:
241: /**
242: * Send string message to topic, also provide map where message received
243: * from broker is stored.
244: *
245: * @param clientId id of client with which message is sent.
246: * @param payload simple string payload
247: * @param topic topic described as string e.g. /foo/bar
248: * @param clientMqttMessage object where client received message is stored
249: * @param listenerMqttMessage object where listener received message is stored
250: */
251: public void mqttClientPublishString(String clientId, String payload, String topic,
252: Map<String, Map<String, String>> clientMqttMessage,
253: Map<String, String> listenerMqttMessage) {
254:
255: this.clientReceivedMqttMessage = clientMqttMessage;
256: this.listenerReceivedMqttMessage = listenerMqttMessage;
257: MqttClient mqttClient = mqttClients.get(clientId);
258: try {
259: mqttClient.publish(topic, payload.getBytes(), DEFAULT_QOS, DEFAULT_RETAIN);
260: } catch (MqttException e) {
261: e.printStackTrace();
262: }
263: }
264:
265: }