Skip to content

Package: FuseChannel$1

FuseChannel$1

nameinstructionbranchcomplexitylinemethod
onConnected()
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
onDisconnected()
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
onFailure(Throwable)
M: 1 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
onPublish(UTF8Buffer, Buffer, Callback)
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
onPublish(UTF8Buffer, Buffer, Runnable)
M: 10 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
{...}
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Red Hat Inc and others.
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Red Hat Inc - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.client.gateway.mqtt.fuse;
14:
15: import java.net.URI;
16: import java.nio.ByteBuffer;
17: import java.util.ArrayList;
18: import java.util.HashMap;
19: import java.util.List;
20: import java.util.Map;
21: import java.util.Objects;
22: import java.util.Set;
23: import java.util.concurrent.CompletableFuture;
24: import java.util.concurrent.CompletionStage;
25:
26: import org.eclipse.kapua.client.gateway.BinaryPayloadCodec;
27: import org.eclipse.kapua.client.gateway.Credentials.UserAndPassword;
28: import org.eclipse.kapua.client.gateway.mqtt.AbstractMqttChannel;
29: import org.eclipse.kapua.client.gateway.mqtt.MqttMessageHandler;
30: import org.eclipse.kapua.client.gateway.mqtt.MqttNamespace;
31: import org.eclipse.kapua.client.gateway.mqtt.fuse.internal.Callbacks;
32: import org.eclipse.kapua.client.gateway.spi.util.Strings;
33:
34: import org.fusesource.hawtbuf.Buffer;
35: import org.fusesource.hawtbuf.UTF8Buffer;
36: import org.fusesource.mqtt.client.Callback;
37: import org.fusesource.mqtt.client.CallbackConnection;
38: import org.fusesource.mqtt.client.ExtendedListener;
39: import org.fusesource.mqtt.client.MQTT;
40: import org.fusesource.mqtt.client.Promise;
41: import org.fusesource.mqtt.client.QoS;
42: import org.slf4j.Logger;
43: import org.slf4j.LoggerFactory;
44:
45: public class FuseChannel extends AbstractMqttChannel {
46:
47: private static final Logger logger = LoggerFactory.getLogger(FuseChannel.class);
48:
49: public static class Builder extends AbstractMqttChannel.Builder<Builder> {
50:
51: @Override
52: protected Builder builder() {
53: return this;
54: }
55:
56: @Override
57: public FuseChannel build() throws Exception {
58:
59: final URI broker = Objects.requireNonNull(broker(), "Broker must be set");
60: final String clientId = Strings.nonEmptyText(clientId(), "clientId");
61:
62: final MqttNamespace namespace = Objects.requireNonNull(namespace(), "Namespace must be set");
63: final BinaryPayloadCodec codec = Objects.requireNonNull(codec(), "Codec must be set");
64:
65: final MQTT mqtt = new MQTT();
66: mqtt.setCleanSession(false);
67: mqtt.setHost(broker);
68: mqtt.setClientId(clientId);
69:
70: final Object credentials = credentials();
71: if (credentials == null) {
72: // none
73: } else if (credentials instanceof UserAndPassword) {
74: final UserAndPassword userAndPassword = (UserAndPassword) credentials;
75: mqtt.setUserName(userAndPassword.getUsername());
76: mqtt.setPassword(userAndPassword.getPasswordAsString());
77: } else {
78: throw new IllegalStateException(
79: String.format("Unknown credentials type: %s", credentials.getClass().getName()));
80: }
81:
82: final CallbackConnection connection = mqtt.callbackConnection();
83: final FuseChannel result = new FuseChannel(clientId, namespace, codec, connection);
84: return result;
85: }
86: }
87:
88: private final ExtendedListener listener = new ExtendedListener() {
89:
90: @Override
91: public void onPublish(final UTF8Buffer topic, final Buffer body, final Runnable ack) {
92: onPublish(topic, body, new Callback<Callback<Void>>() {
93:
94: @Override
95: public void onSuccess(Callback<Void> value) {
96: ack.run();
97: }
98:
99: @Override
100: public void onFailure(Throwable value) {
101: }
102:
103: });
104: }
105:
106: @Override
107: public void onFailure(Throwable value) {
108: }
109:
110: @Override
111: public void onDisconnected() {
112: handleDisconnected();
113: }
114:
115: @Override
116: public void onConnected() {
117: handleConnected();
118: }
119:
120: @Override
121: public void onPublish(final UTF8Buffer topic, final Buffer body, final Callback<Callback<Void>> ack) {
122: handleMessageArrived(topic.toString(), body, ack);
123: }
124: };
125:
126: private final CallbackConnection connection;
127:
128: private final Map<String, MqttMessageHandler> subscriptions = new HashMap<>();
129:
130: private Context context;
131:
132: private FuseChannel(final String clientId, final MqttNamespace namespace, final BinaryPayloadCodec codec, final CallbackConnection connection) {
133:
134: super(codec, namespace, clientId);
135:
136: this.connection = connection;
137: this.connection.listener(listener);
138: }
139:
140: protected void handleConnected() {
141: context.notifyConnected();
142: }
143:
144: protected void handleDisconnected() {
145: context.notifyDisconnected();
146: }
147:
148: @Override
149: public void handleInit(final Context context) {
150: this.context = context;
151: connection.connect(new Promise<>());
152: }
153:
154: @Override
155: public void handleClose(final Context context) {
156: connection.disconnect(null);
157: }
158:
159: @Override
160: public CompletionStage<?> publishMqtt(final String topic, final ByteBuffer payload) {
161: final CompletableFuture<Void> future = new CompletableFuture<>();
162: connection.publish(Buffer.utf8(topic), new Buffer(payload), QoS.AT_LEAST_ONCE, false, Callbacks.asCallback(future));
163: return future;
164: }
165:
166: @Override
167: protected CompletionStage<?> subscribeMqtt(final String topic, final MqttMessageHandler messageHandler) {
168: synchronized (this) {
169: subscriptions.put(topic, messageHandler);
170:
171: final CompletableFuture<byte[]> future = new CompletableFuture<>();
172: connection.subscribe(
173: new org.fusesource.mqtt.client.Topic[] {
174: new org.fusesource.mqtt.client.Topic(topic, QoS.AT_LEAST_ONCE) },
175: Callbacks.asCallback(future));
176:
177: return future;
178: }
179: }
180:
181: @Override
182: protected void unsubscribeMqtt(final Set<String> mqttTopics) {
183:
184: logger.info("Unsubscribe from: {}", mqttTopics);
185:
186: final List<UTF8Buffer> topics = new ArrayList<>(mqttTopics.size());
187:
188: synchronized (this) {
189: for (final String topic : mqttTopics) {
190: if (subscriptions.remove(topic) != null) {
191: topics.add(new UTF8Buffer(topic));
192: }
193: }
194: }
195:
196: connection.unsubscribe(topics.toArray(new UTF8Buffer[topics.size()]), new Promise<>());
197: }
198:
199: protected void handleMessageArrived(final String topic, final Buffer payload, final Callback<Callback<Void>> ack) {
200: final MqttMessageHandler handler;
201:
202: synchronized (this) {
203: handler = subscriptions.get(topic);
204: }
205:
206: if (handler != null) {
207: try {
208: handler.handleMessage(topic, payload.toByteBuffer());
209: ack.onSuccess(null);
210: } catch (Exception e) {
211: ack.onFailure(e);
212: }
213: }
214: }
215:
216: }