Skip to content

Package: PooledMqttClientFactory

PooledMqttClientFactory

nameinstructionbranchcomplexitylinemethod
PooledMqttClientFactory(String)
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
create()
M: 59 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 20 C: 0
0%
M: 1 C: 0
0%
destroyObject(PooledObject)
M: 28 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 12 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
validateObject(PooledObject)
M: 17 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
wrap(MqttClient)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.transport.mqtt.pooling;
14:
15: import org.apache.commons.pool2.BasePooledObjectFactory;
16: import org.apache.commons.pool2.PooledObject;
17: import org.apache.commons.pool2.impl.DefaultPooledObject;
18: import org.eclipse.kapua.transport.mqtt.MqttClient;
19: import org.eclipse.kapua.transport.mqtt.MqttClientConnectionOptions;
20: import org.eclipse.kapua.transport.mqtt.exception.MqttClientException;
21: import org.eclipse.kapua.transport.mqtt.exception.MqttClientTerminateException;
22: import org.eclipse.kapua.transport.mqtt.pooling.setting.MqttClientPoolSetting;
23: import org.eclipse.kapua.transport.mqtt.pooling.setting.MqttClientPoolSettingKeys;
24: import org.eclipse.kapua.transport.mqtt.setting.MqttClientSetting;
25: import org.eclipse.kapua.transport.mqtt.setting.MqttClientSettingKeys;
26: import org.eclipse.kapua.transport.utils.ClientIdGenerator;
27: import org.slf4j.Logger;
28: import org.slf4j.LoggerFactory;
29:
30: import java.net.URI;
31:
32: /**
33: * Pooled object factory for {@link MqttClientPool}.
34: *
35: * @since 1.0.0
36: */
37: public class PooledMqttClientFactory extends BasePooledObjectFactory<MqttClient> {
38:
39: private static final Logger LOG = LoggerFactory.getLogger(PooledMqttClientFactory.class);
40: private static final ClientIdGenerator CLIENT_ID_GENERATOR = ClientIdGenerator.getInstance();
41:
42: private final String serverURI;
43:
44: public PooledMqttClientFactory(String serverURI) {
45: this.serverURI = serverURI;
46: }
47:
48: /**
49: * Creates the {@link MqttClient} for the {@link MqttClientPool}.
50: *
51: * <p>
52: * The client is initialized and connected. In case of any failure on connect operation, an exception is thrown and the the created client is destroyed.
53: * </p>
54: *
55: * @throws Exception FIXME [javadoc] document exception.
56: * @since 1.0.0
57: */
58: @Override
59: public MqttClient create() throws Exception {
60: //
61: // User pwd generation
62: MqttClientSetting mqttClientSettings = MqttClientSetting.getInstance();
63: MqttClientPoolSetting mqttClientPoolSettings = MqttClientPoolSetting.getInstance();
64:
65: String username = mqttClientSettings.getString(MqttClientSettingKeys.TRANSPORT_CREDENTIAL_USERNAME);
66: char[] password = mqttClientSettings.getString(MqttClientSettingKeys.TRANSPORT_CREDENTIAL_PASSWORD).toCharArray();
67: String clientId = CLIENT_ID_GENERATOR.next(mqttClientPoolSettings.getString(MqttClientPoolSettingKeys.CLIENT_POOL_CLIENT_ID_PREFIX));
68:
69: //
70: // Get new client and connection options
71: MqttClientConnectionOptions connectionOptions = new MqttClientConnectionOptions();
72: connectionOptions.setClientId(clientId);
73: connectionOptions.setUsername(username);
74: connectionOptions.setPassword(password);
75: connectionOptions.setEndpointURI(URI.create(serverURI));
76:
77: //
78: // Connect client
79: MqttClient kapuaClient = new MqttClient();
80: try {
81: kapuaClient.connectClient(connectionOptions);
82: } catch (MqttClientException mce) {
83: try {
84: kapuaClient.terminateClient();
85: } catch (MqttClientTerminateException mcte) {
86: LOG.error("Unable to properly terminate MQTT client after failed connect attempt: {}", clientId, mcte);
87: }
88:
89: throw mce;
90: }
91:
92: return kapuaClient;
93: }
94:
95: /**
96: * Wraps the given {@link MqttClient} into a {@link DefaultPooledObject}.
97: *
98: * @param mqttClient The object to wrap for {@link BasePooledObjectFactory}.
99: * @since 1.0.0
100: */
101: @Override
102: public PooledObject<MqttClient> wrap(MqttClient mqttClient) {
103: return new DefaultPooledObject<>(mqttClient);
104: }
105:
106: /**
107: * Validates status of the given {@link MqttClient} pooled object.
108: *
109: * <p>
110: * Check performed for the client to be marked as valid are:
111: * </p>
112: * <ul>
113: * <li>{@link MqttClient} {@code != null}</li>
114: * <li>{@link MqttClient#isConnected()} {@code == true}</li>
115: * </ul>
116: *
117: * @param pooledMqttClient The object to validate.
118: * @since 1.0.0
119: */
120: @Override
121: public boolean validateObject(PooledObject<MqttClient> pooledMqttClient) {
122:• if (pooledMqttClient == null) {
123: return false;
124: }
125:
126: MqttClient mqttClient = pooledMqttClient.getObject();
127:• return (mqttClient != null && mqttClient.isConnected());
128: }
129:
130: /**
131: * Destroys the given {@link MqttClient} pooled object.
132: * <p>
133: * Before calling super implementation {@link BasePooledObjectFactory#destroyObject(PooledObject)} it tries to clean up the {@link MqttClient}.
134: * </p>
135: *
136: * @param pooledMqttClient The pooled object to destroy.
137: * @since 1.0.0.
138: */
139: @Override
140: public void destroyObject(PooledObject<MqttClient> pooledMqttClient) throws Exception {
141:• if (pooledMqttClient == null) {
142: return;
143: }
144:
145: MqttClient mqttClient = pooledMqttClient.getObject();
146: try {
147:• if (mqttClient != null) {
148:• if (mqttClient.isConnected()) {
149: mqttClient.disconnectClient();
150: }
151: mqttClient.terminateClient();
152: }
153: } catch (MqttClientException mce) {
154: LOG.warn("Error while cleaning MqttClient {} before destroying it... Removing it from the pool anyway!", mqttClient.getClientId(), mce);
155: }
156:
157: super.destroyObject(pooledMqttClient);
158: }
159:
160: }