Skip to content

Package: MqttClientPool

MqttClientPool

nameinstructionbranchcomplexitylinemethod
MqttClientPool(PooledMqttClientFactory)
M: 59 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 14 C: 0
0%
M: 1 C: 0
0%
getInstance(String)
M: 7 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$getInstance$0(String, String)
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
returnObject(MqttClient)
M: 43 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.transport.mqtt.pooling;
14:
15: import org.apache.commons.pool2.impl.GenericObjectPool;
16: import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
17: import org.eclipse.kapua.transport.mqtt.MqttClient;
18: import org.eclipse.kapua.transport.mqtt.exception.MqttClientCleanException;
19: import org.eclipse.kapua.transport.mqtt.exception.MqttClientTerminateException;
20: import org.eclipse.kapua.transport.mqtt.pooling.setting.MqttClientPoolSetting;
21: import org.eclipse.kapua.transport.mqtt.pooling.setting.MqttClientPoolSettingKeys;
22: import org.slf4j.Logger;
23: import org.slf4j.LoggerFactory;
24:
25: import java.util.HashMap;
26: import java.util.Map;
27:
28: /**
29: * Client pool for {@link MqttClient} objects.
30: * <p>
31: * This serves to optimize communication at the transport level of Kapua.
32: * Client borrowed from this pool are already connected and ready to publish and subscribe.
33: * </p>
34: *
35: * @since 1.0.0
36: */
37: public class MqttClientPool extends GenericObjectPool<MqttClient> {
38:
39: private static final Logger LOG = LoggerFactory.getLogger(MqttClientPool.class);
40:
41: /**
42: * Singleton instances of {@link MqttClientPool} by their host.
43: *
44: * @since 1.0.0
45: */
46: private static final Map<String, MqttClientPool> MQTT_CLIENT_POOL_BY_HOST = new HashMap<>();
47:
48: /**
49: * Initialize a {@link MqttClientPool} with the according configuration sourced from {@link MqttClientPoolSetting}.
50: *
51: * @since 1.0.0
52: */
53: private MqttClientPool(PooledMqttClientFactory factory) {
54: super(factory);
55:
56: MqttClientPoolSetting config = MqttClientPoolSetting.getInstance();
57: GenericObjectPoolConfig clientPoolConfig = new GenericObjectPoolConfig();
58: clientPoolConfig.setMinIdle(config.getInt(MqttClientPoolSettingKeys.CLIENT_POOL_SIZE_IDLE_MIN));
59: clientPoolConfig.setMaxIdle(config.getInt(MqttClientPoolSettingKeys.CLIENT_POOL_SIZE_IDLE_MAX));
60: clientPoolConfig.setMaxTotal(config.getInt(MqttClientPoolSettingKeys.CLIENT_POOL_SIZE_TOTAL_MAX));
61:
62: clientPoolConfig.setMaxWaitMillis(config.getInt(MqttClientPoolSettingKeys.CLIENT_POOL_BORROW_WAIT_MAX));
63:
64: clientPoolConfig.setTestOnReturn(config.getBoolean(MqttClientPoolSettingKeys.CLIENT_POOL_ON_RETURN_TEST));
65: clientPoolConfig.setTestOnBorrow(config.getBoolean(MqttClientPoolSettingKeys.CLIENT_POOL_ON_BORROW_TEST));
66:
67: clientPoolConfig.setTestWhileIdle(config.getBoolean(MqttClientPoolSettingKeys.CLIENT_POOL_WHEN_IDLE_TEST));
68: clientPoolConfig.setBlockWhenExhausted(config.getBoolean(MqttClientPoolSettingKeys.CLIENT_POOL_WHEN_EXAUSTED_BLOCK));
69:
70: clientPoolConfig.setTimeBetweenEvictionRunsMillis(config.getLong(MqttClientPoolSettingKeys.CLIENT_POOL_EVICTION_INTERVAL));
71:
72: setConfig(clientPoolConfig);
73: }
74:
75: /**
76: * Gets the singleton instance of {@link MqttClientPool}.
77: *
78: * @param serverURI The {@link java.net.URI} in {@link String} form for which get the {@link MqttClientPool}.
79: * @return The singleton instance of {@link MqttClientPool}.
80: * @since 1.1.0
81: */
82: public static MqttClientPool getInstance(String serverURI) {
83: return MQTT_CLIENT_POOL_BY_HOST.computeIfAbsent(serverURI, k -> new MqttClientPool(new PooledMqttClientFactory(serverURI)));
84: }
85:
86: /**
87: * Returns a borrowed object to the pool.
88: * <p>
89: * Before calling super implementation {@link GenericObjectPool#returnObject(Object)} the {@link MqttClient} is cleaned by invoking the {@link MqttClient#clean()}.
90: * </p>
91: *
92: * @since 1.0.0
93: */
94: @Override
95: public void returnObject(MqttClient kapuaClient) {
96: try {
97: // Clean up callback
98: kapuaClient.clean();
99:
100: // Return object to pool
101: super.returnObject(kapuaClient);
102: } catch (MqttClientCleanException mcce) {
103: LOG.error("Error while returning MqttClient ({}) to the pool. Terminating...", kapuaClient.getClientId());
104: try {
105: kapuaClient.terminateClient();
106: LOG.error("Error while returning MqttClient ({}) to the pool. Terminating... DONE! Error was: {}", kapuaClient.getClientId(), mcce.getMessage());
107: } catch (MqttClientTerminateException mcte) {
108: LOG.error("Error while returning MqttClient ({}) to the pool. Terminating... ERROR! Error was: {}", kapuaClient.getClientId(), mcce.getMessage(), mcte);
109: }
110: }
111: }
112: }