Skip to content

Package: MqttAsyncTransport

MqttAsyncTransport

nameinstructionbranchcomplexitylinemethod
MqttAsyncTransport(GatewayConfiguration)
M: 34 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
close()
M: 9 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
connect()
M: 18 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
disconnect()
M: 16 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
handleConnected()
M: 8 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
handleDisconnected()
M: 8 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
sendMessage(Topic, byte[])
M: 39 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
subscribe(Topic, Consumer)
M: 31 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
unsubscribe(Topic)
M: 20 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%
whenConnected(Runnable)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
whenDisconnected(Runnable)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Red Hat Inc and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Red Hat Inc - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.kura.simulator;
14:
15: import java.util.Objects;
16: import java.util.function.Consumer;
17:
18: import org.eclipse.kapua.kura.simulator.payload.Message;
19: import org.eclipse.kapua.kura.simulator.topic.Topic;
20: import org.eclipse.kapua.kura.simulator.util.Hex;
21: import org.eclipse.paho.client.mqttv3.IMqttActionListener;
22: import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
23: import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
24: import org.eclipse.paho.client.mqttv3.IMqttToken;
25: import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
26: import org.eclipse.paho.client.mqttv3.MqttCallback;
27: import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
28: import org.eclipse.paho.client.mqttv3.MqttException;
29: import org.eclipse.paho.client.mqttv3.MqttMessage;
30: import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
31: import org.slf4j.Logger;
32: import org.slf4j.LoggerFactory;
33:
34: /**
35: * A transport implementation based on MQTT
36: */
37: public class MqttAsyncTransport extends AbstractMqttTransport implements AutoCloseable {
38:
39: private static final Logger logger = LoggerFactory.getLogger(MqttAsyncTransport.class);
40:
41: private final MqttAsyncClient client;
42:
43: private final MqttConnectOptions connectOptions;
44:
45: private Runnable onConnected;
46:
47: private Runnable onDisconnected;
48:
49: public MqttAsyncTransport(final GatewayConfiguration configuration) throws MqttException {
50: super(configuration);
51:
52: final MemoryPersistence persistence = new MemoryPersistence();
53: final String plainBrokerUrl = plainUrl(configuration.getBrokerUrl());
54: this.client = new MqttAsyncClient(plainBrokerUrl, configuration.getClientId(), persistence);
55: this.client.setCallback(new MqttCallback() {
56:
57: @Override
58: public void messageArrived(final String topic, final MqttMessage message) throws Exception {
59: }
60:
61: @Override
62: public void deliveryComplete(final IMqttDeliveryToken token) {
63: }
64:
65: @Override
66: public void connectionLost(final Throwable cause) {
67: handleDisconnected();
68: }
69: });
70: this.connectOptions = createConnectOptions(configuration.getBrokerUrl());
71: }
72:
73: @Override
74: public void connect() {
75: try {
76: this.client.connect(this.connectOptions, null, new IMqttActionListener() {
77:
78: @Override
79: public void onSuccess(final IMqttToken asyncActionToken) {
80: handleConnected();
81: }
82:
83: @Override
84: public void onFailure(final IMqttToken asyncActionToken, final Throwable exception) {
85: logger.warn("Failed to connect", exception);
86: }
87: });
88: } catch (final MqttException e) {
89: logger.warn("Failed to initiate connect", e);
90: }
91: }
92:
93: @Override
94: public void disconnect() {
95: try {
96: this.client.disconnect(null, new IMqttActionListener() {
97:
98: @Override
99: public void onSuccess(final IMqttToken asyncActionToken) {
100: handleDisconnected();
101: }
102:
103: @Override
104: public void onFailure(final IMqttToken asyncActionToken, final Throwable exception) {
105: logger.warn("Failed to disconnect", exception);
106: }
107: });
108: } catch (final MqttException e) {
109: logger.warn("Failed to initiatate disconnect", e);
110: }
111: }
112:
113: @Override
114: public void close() throws MqttException {
115: try {
116: this.client.disconnect(0).waitForCompletion();
117: } finally {
118: this.client.close();
119: }
120: }
121:
122: @Override
123: public void subscribe(final Topic topic, final Consumer<Message> consumer) {
124: Objects.requireNonNull(consumer);
125:
126: try {
127: this.client.subscribe(topic.render(this.topicContext), 0, null, null, new IMqttMessageListener() {
128:
129: @Override
130: public void messageArrived(final String topic, final MqttMessage mqttMessage) throws Exception {
131: logger.debug("Received MQTT message from {}", topic);
132: consumer.accept(new Message(Topic.fromString(topic), mqttMessage.getPayload(),
133: MqttAsyncTransport.this.topicContext));
134: }
135: });
136: } catch (final MqttException e) {
137:• if (e.getReasonCode() != MqttException.REASON_CODE_CLIENT_NOT_CONNECTED) {
138: logger.warn("Failed to subscribe to: {}", topic, e);
139: }
140: }
141: }
142:
143: @Override
144: public void unsubscribe(final Topic topic) {
145: try {
146: this.client.unsubscribe(topic.render(this.topicContext));
147: } catch (final MqttException e) {
148:• if (e.getReasonCode() != MqttException.REASON_CODE_CLIENT_NOT_CONNECTED) {
149: logger.warn("Failed to unsubscribe: {}", topic, e);
150: }
151: }
152: }
153:
154: @Override
155: public void whenConnected(final Runnable runnable) {
156: this.onConnected = runnable;
157: }
158:
159: @Override
160: public void whenDisconnected(final Runnable runnable) {
161: this.onDisconnected = runnable;
162: }
163:
164: protected void handleConnected() {
165: final Runnable runnable = this.onConnected;
166:• if (runnable != null) {
167: runnable.run();
168: }
169: }
170:
171: protected void handleDisconnected() {
172: final Runnable runnable = this.onDisconnected;
173:• if (runnable != null) {
174: runnable.run();
175: }
176: }
177:
178: @Override
179: public void sendMessage(final Topic topic, final byte[] payload) {
180:• if (logger.isDebugEnabled()) {
181: logger.debug("Sending message - topic: {}, payload: {}", topic, Hex.toHex(payload, 256));
182: }
183:
184: try {
185: final String fullTopic = topic.render(this.topicContext);
186: logger.debug("Full topic: {}", fullTopic);
187:
188: this.client.publish(fullTopic, payload, 0, false);
189: } catch (final Exception e) {
190: logger.warn("Failed to send out message", e);
191: throw new RuntimeException(e);
192: }
193: }
194:
195: }