Skip to content

Package: PahoChannel$Builder

PahoChannel$Builder

nameinstructionbranchcomplexitylinemethod
PahoChannel.Builder()
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
build()
M: 63 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 12 C: 0
0%
M: 1 C: 0
0%
builder()
M: 2 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
persistentProvider()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
persistentProvider(Supplier)
M: 12 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Red Hat Inc and others.
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Red Hat Inc - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.client.gateway.mqtt.paho;
14:
15: import java.net.URI;
16: import java.nio.ByteBuffer;
17: import java.util.ArrayList;
18: import java.util.HashMap;
19: import java.util.List;
20: import java.util.Map;
21: import java.util.Objects;
22: import java.util.Set;
23: import java.util.concurrent.CompletableFuture;
24: import java.util.concurrent.CompletionStage;
25: import java.util.concurrent.TimeUnit;
26: import java.util.function.Supplier;
27:
28: import org.eclipse.kapua.client.gateway.BinaryPayloadCodec;
29: import org.eclipse.kapua.client.gateway.Credentials.UserAndPassword;
30: import org.eclipse.kapua.client.gateway.TransmissionException;
31: import org.eclipse.kapua.client.gateway.mqtt.AbstractMqttChannel;
32: import org.eclipse.kapua.client.gateway.mqtt.MqttMessageHandler;
33: import org.eclipse.kapua.client.gateway.mqtt.MqttNamespace;
34: import org.eclipse.kapua.client.gateway.mqtt.paho.internal.Listeners;
35: import org.eclipse.kapua.client.gateway.spi.util.Buffers;
36: import org.eclipse.kapua.client.gateway.spi.util.Strings;
37:
38: import org.eclipse.paho.client.mqttv3.IMqttActionListener;
39: import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
40: import org.eclipse.paho.client.mqttv3.IMqttToken;
41: import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
42: import org.eclipse.paho.client.mqttv3.MqttCallback;
43: import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
44: import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
45: import org.eclipse.paho.client.mqttv3.MqttException;
46: import org.eclipse.paho.client.mqttv3.MqttMessage;
47: import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
48: import org.slf4j.Logger;
49: import org.slf4j.LoggerFactory;
50:
51: public class PahoChannel extends AbstractMqttChannel {
52:
53: private static final Logger logger = LoggerFactory.getLogger(PahoChannel.class);
54:
55: public static class Builder extends AbstractMqttChannel.Builder<Builder> {
56:
57: private Supplier<MqttClientPersistence> persistenceProvider = MemoryPersistence::new;
58:
59: @Override
60: protected Builder builder() {
61: return this;
62: }
63:
64: public Builder persistentProvider(final Supplier<MqttClientPersistence> provider) {
65:• if (provider != null) {
66: persistenceProvider = provider;
67: } else {
68: persistenceProvider = MemoryPersistence::new;
69: }
70: return builder();
71: }
72:
73: public Supplier<MqttClientPersistence> persistentProvider() {
74: return persistenceProvider;
75: }
76:
77: @Override
78: public PahoChannel build() throws Exception {
79:
80: final URI broker = Objects.requireNonNull(broker(), "Broker must be set");
81: final String clientId = Strings.nonEmptyText(clientId(), "clientId");
82:
83: final MqttClientPersistence persistence = Objects.requireNonNull(persistenceProvider.get(), "Persistence provider returned 'null' persistence");
84: final MqttNamespace namespace = Objects.requireNonNull(namespace(), "Namespace must be set");
85: final BinaryPayloadCodec codec = Objects.requireNonNull(codec(), "Codec must be set");
86:
87: MqttAsyncClient client = new MqttAsyncClient(broker.toString(), clientId, persistence);
88: try {
89: final PahoChannel result = new PahoChannel(clientId, namespace, codec, client, persistence, createConnectOptions(this));
90: client = null;
91: return result;
92: } finally {
93:• if (client != null) {
94: try {
95: client.disconnectForcibly(0);
96: } finally {
97: client.close();
98: }
99: }
100: }
101: }
102: }
103:
104: private static MqttConnectOptions createConnectOptions(final Builder builder) {
105: final MqttConnectOptions result = new MqttConnectOptions();
106:
107: final Object credentials = builder.credentials();
108: if (credentials instanceof UserAndPassword) {
109: final UserAndPassword userAndPassword = (UserAndPassword) credentials;
110: result.setUserName(userAndPassword.getUsername());
111: result.setPassword(userAndPassword.getPassword());
112: } else if (credentials == null) {
113: // ignore
114: } else {
115: throw new IllegalArgumentException(String.format("Unsupported credentials type: %s", credentials.getClass().getName()));
116: }
117:
118: return result;
119: }
120:
121: private final MqttConnectOptions connectOptions;
122: private MqttAsyncClient client;
123:
124: private final Map<String, MqttMessageHandler> subscriptions = new HashMap<>();
125: private Context context;
126:
127: private PahoChannel(final String clientId, final MqttNamespace namespace, final BinaryPayloadCodec codec,
128: final MqttAsyncClient client, final MqttClientPersistence persistence, final MqttConnectOptions connectOptions) {
129:
130: super(codec, namespace, clientId);
131:
132: this.connectOptions = connectOptions;
133: this.client = client;
134:
135: this.client.setCallback(new MqttCallback() {
136:
137: @Override
138: public void messageArrived(final String topic, final MqttMessage message) throws Exception {
139: handleMessageArrived(topic, message);
140: }
141:
142: @Override
143: public void deliveryComplete(final IMqttDeliveryToken token) {
144: }
145:
146: @Override
147: public void connectionLost(final Throwable cause) {
148: handleDisconnected();
149: }
150: });
151: }
152:
153: @Override
154: public void handleInit(final Context context) {
155: this.context = context;
156: this.context.executor().execute(this::connect);
157: }
158:
159: protected void connect() {
160: try {
161: client.connect(connectOptions, null, new IMqttActionListener() {
162:
163: @Override
164: public void onSuccess(final IMqttToken asyncActionToken) {
165: handleConnected();
166: }
167:
168: @Override
169: public void onFailure(final IMqttToken asyncActionToken, final Throwable exception) {
170: handleDisconnected();
171: }
172: });
173: } catch (final MqttException e) {
174: logger.warn("Failed to call connect", e);
175: }
176: }
177:
178: @Override
179: public void handleClose(final Context context) {
180:
181: final MqttAsyncClient client;
182:
183: synchronized (this) {
184: client = this.client;
185: if (client == null) {
186: return;
187: }
188: this.client = null;
189: }
190:
191: // disconnect first
192:
193: try {
194: client.disconnect().waitForCompletion();
195: } catch (final MqttException e) {
196: }
197:
198: // now try to close (and free the resources)
199:
200: try {
201: client.close();
202: } catch (final MqttException e) {
203: }
204: }
205:
206: protected void handleConnected() {
207: synchronized (this) {
208: context.notifyConnected();
209: handleResubscribe();
210: }
211: }
212:
213: protected void handleDisconnected() {
214: synchronized (this) {
215: try {
216: context.notifyDisconnected();
217: } finally {
218: context.executor().schedule(this::connect, 1, TimeUnit.SECONDS);
219: }
220: }
221: }
222:
223: private void handleResubscribe() {
224: for (final Map.Entry<String, MqttMessageHandler> entry : subscriptions.entrySet()) {
225: internalSubscribe(entry.getKey()).whenComplete((value, ex) -> {
226: logger.warn("Failed to re-subscribe to '{}'", entry.getKey(), ex);
227: });
228: }
229: }
230:
231: @Override
232: public CompletionStage<?> publishMqtt(final String topic, final ByteBuffer payload) {
233: logger.debug("Publishing {} - {}", topic, payload);
234:
235: final CompletableFuture<?> future = new CompletableFuture<>();
236: try {
237: client.publish(topic, Buffers.toByteArray(payload), 1, false, null,
238: Listeners.toListener(
239: () -> future.complete(null),
240: error -> handlePublishError(future, error)));
241: } catch (MqttException e) {
242: future.completeExceptionally(e);
243: }
244: return future;
245: }
246:
247: private void handlePublishError(final CompletableFuture<?> future, final Throwable error) {
248: if (!(error instanceof MqttException)) {
249: // unknown exception type, simply forward
250: future.completeExceptionally(error);
251: return;
252: }
253:
254: // check for error code
255:
256: final MqttException e = (MqttException) error;
257: switch (e.getReasonCode()) {
258: case MqttException.REASON_CODE_CLIENT_EXCEPTION: //$FALL-THROUGH$
259: case MqttException.REASON_CODE_UNEXPECTED_ERROR:
260: // consider this non-temporary
261: future.completeExceptionally(error);
262: return;
263: default:
264: // consider this temporary and recoverable
265: future.completeExceptionally(new TransmissionException(error));
266: return;
267: }
268: }
269:
270: @Override
271: protected CompletionStage<?> subscribeMqtt(String topic, MqttMessageHandler messageHandler) {
272: synchronized (this) {
273: subscriptions.put(topic, messageHandler);
274: return internalSubscribe(topic);
275: }
276: }
277:
278: private CompletionStage<?> internalSubscribe(final String topic) {
279: final CompletableFuture<?> future = new CompletableFuture<>();
280: try {
281: client.subscribe(topic, 1, null, Listeners.toListener(future));
282: } catch (final MqttException e) {
283: future.completeExceptionally(e);
284: }
285: return future;
286: }
287:
288: @Override
289: protected void unsubscribeMqtt(final Set<String> mqttTopics) throws MqttException {
290: logger.info("Unsubscribe from: {}", mqttTopics);
291:
292: final List<String> topics = new ArrayList<>(mqttTopics.size());
293:
294: synchronized (this) {
295: for (String topic : mqttTopics) {
296: if (subscriptions.remove(topic) != null) {
297: topics.add(topic);
298: }
299: }
300: }
301:
302: client.unsubscribe(topics.toArray(new String[topics.size()]));
303: }
304:
305: protected void handleMessageArrived(final String topic, final MqttMessage message) throws Exception {
306: final ByteBuffer buffer = Buffers.wrap(message.getPayload());
307: buffer.flip();
308:
309: logger.debug("Received message - mqtt-topic: {}, payload: {}", topic, buffer);
310:
311: final MqttMessageHandler handler = subscriptions.get(topic);
312: if (handler != null) {
313: handler.handleMessage(topic, buffer);
314: }
315: }
316:
317: }