Skip to content

Package: JMSServiceEventBus$EventBusJMSConnectionBridge$PooledSenderFactory

JMSServiceEventBus$EventBusJMSConnectionBridge$PooledSenderFactory

nameinstructionbranchcomplexitylinemethod
JMSServiceEventBus.EventBusJMSConnectionBridge.PooledSenderFactory(JMSServiceEventBus.EventBusJMSConnectionBridge, String)
M: 9 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
create()
M: 17 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
destroyObject(PooledObject)
M: 12 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
wrap(JMSServiceEventBus.EventBusJMSConnectionBridge.Sender)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.commons.event.jms;
14:
15: import org.apache.commons.pool2.BasePooledObjectFactory;
16: import org.apache.commons.pool2.PooledObject;
17: import org.apache.commons.pool2.impl.DefaultPooledObject;
18: import org.apache.commons.pool2.impl.GenericObjectPool;
19: import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
20: import org.apache.qpid.jms.jndi.JmsInitialContextFactory;
21: import org.eclipse.kapua.KapuaException;
22: import org.eclipse.kapua.KapuaRuntimeException;
23: import org.eclipse.kapua.commons.event.ServiceEventBusDriver;
24: import org.eclipse.kapua.commons.event.ServiceEventBusManager;
25: import org.eclipse.kapua.commons.event.ServiceEventMarshaler;
26: import org.eclipse.kapua.commons.event.ServiceEventScope;
27: import org.eclipse.kapua.commons.metric.MetricServiceFactory;
28: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
29: import org.eclipse.kapua.commons.security.KapuaSession;
30: import org.eclipse.kapua.commons.setting.system.SystemSetting;
31: import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
32: import org.eclipse.kapua.event.ServiceEvent;
33: import org.eclipse.kapua.event.ServiceEventBus;
34: import org.eclipse.kapua.event.ServiceEventBusException;
35: import org.eclipse.kapua.event.ServiceEventBusListener;
36: import org.slf4j.Logger;
37: import org.slf4j.LoggerFactory;
38:
39: import com.codahale.metrics.Counter;
40:
41: import javax.jms.Connection;
42: import javax.jms.ConnectionFactory;
43: import javax.jms.ExceptionListener;
44: import javax.jms.JMSException;
45: import javax.jms.MessageConsumer;
46: import javax.jms.MessageProducer;
47: import javax.jms.Session;
48: import javax.jms.TextMessage;
49: import javax.jms.Topic;
50: import javax.naming.Context;
51: import javax.naming.NamingException;
52: import java.util.ArrayList;
53: import java.util.HashMap;
54: import java.util.Hashtable;
55: import java.util.Iterator;
56: import java.util.List;
57: import java.util.Map;
58:
59: import org.eclipse.kapua.KapuaException;
60: import org.eclipse.kapua.KapuaRuntimeException;
61: import org.eclipse.kapua.commons.event.ServiceEventBusDriver;
62: import org.eclipse.kapua.commons.event.ServiceEventBusManager;
63: import org.eclipse.kapua.commons.event.ServiceEventMarshaler;
64: import org.eclipse.kapua.commons.event.ServiceEventScope;
65: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
66: import org.eclipse.kapua.commons.security.KapuaSession;
67: import org.eclipse.kapua.commons.setting.system.SystemSetting;
68: import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
69: import org.eclipse.kapua.event.ServiceEvent;
70: import org.eclipse.kapua.event.ServiceEventBus;
71: import org.eclipse.kapua.event.ServiceEventBusException;
72: import org.eclipse.kapua.event.ServiceEventBusListener;
73:
74: import org.apache.commons.pool2.BasePooledObjectFactory;
75: import org.apache.commons.pool2.PooledObject;
76: import org.apache.commons.pool2.impl.DefaultPooledObject;
77: import org.apache.commons.pool2.impl.GenericObjectPool;
78: import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
79: import org.apache.qpid.jms.jndi.JmsInitialContextFactory;
80: import org.slf4j.Logger;
81: import org.slf4j.LoggerFactory;
82:
83: /**
84: * JMS event bus implementation
85: *
86: * @since 1.0
87: */
88: public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDriver {
89:
90: private static final Logger LOGGER = LoggerFactory.getLogger(JMSServiceEventBus.class);
91:
92: private static final int PRODUCER_POOL_MIN_SIZE = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_PRODUCER_POOL_MIN_SIZE);
93: private static final int PRODUCER_POOL_MAX_SIZE = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_PRODUCER_POOL_MAX_SIZE);
94: private static final int PRODUCER_POOL_BORROW_WAIT = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_PRODUCER_POOL_BORROW_WAIT_MAX);
95: private static final int PRODUCER_POOL_EVICTION_INTERVAL = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_PRODUCER_EVICTION_INTERVAL);
96: private static final int CONSUMER_POOL_SIZE = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_CONSUMER_POOL_SIZE);
97: private static final String MESSAGE_SERIALIZER = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_MESSAGE_SERIALIZER);
98: private static final String TRANSPORT_USE_EPOLL = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_TRANSPORT_USE_EPOLL);
99:
100: private List<Subscription> subscriptionList = new ArrayList<>();
101: private EventBusJMSConnectionBridge eventBusJMSConnectionBridge;
102: private ServiceEventMarshaler eventBusMarshaler;
103:
104: private Counter reconnectionRetryCount;
105: private Counter connectionErrorCount;
106:
107: /**
108: * Default constructor
109: *
110: * @throws JMSException
111: * @throws NamingException
112: */
113: public JMSServiceEventBus() throws JMSException, NamingException {
114: reconnectionRetryCount = MetricServiceFactory.getInstance().getCounter("event_bus", "handler", "reconnection_retry", "count");
115: connectionErrorCount = MetricServiceFactory.getInstance().getCounter("event_bus", "handler", "connection_error", "count");
116: eventBusJMSConnectionBridge = new EventBusJMSConnectionBridge();
117: }
118:
119: @Override
120: public String getType() {
121: return ServiceEventBusManager.JMS_20_EVENT_BUS;
122: }
123:
124: /**
125: * Start the event bus
126: *
127: * @throws ServiceEventBusException
128: */
129: @Override
130: public void start() throws ServiceEventBusException {
131: try {
132: // initialize event bus marshaler
133: Class<?> messageSerializerClazz = Class.forName(MESSAGE_SERIALIZER);
134: if (ServiceEventMarshaler.class.isAssignableFrom(messageSerializerClazz)) {
135: eventBusMarshaler = (ServiceEventMarshaler) messageSerializerClazz.newInstance();
136: } else {
137: throw new ServiceEventBusException(String.format("Wrong message serializer Object type ('%s')!", messageSerializerClazz));
138: }
139: eventBusJMSConnectionBridge.start();
140: } catch (JMSException | ClassNotFoundException | NamingException | InstantiationException | IllegalAccessException e) {
141: throw new ServiceEventBusException(e);
142: }
143: }
144:
145: @Override
146: public void publish(String address, ServiceEvent kapuaEvent)
147: throws ServiceEventBusException {
148: eventBusJMSConnectionBridge.publish(address, kapuaEvent);
149: }
150:
151: @Override
152: public synchronized void subscribe(String address, String name, final ServiceEventBusListener kapuaEventListener)
153: throws ServiceEventBusException {
154: try {
155: Subscription subscription = new Subscription(address, name, kapuaEventListener);
156: subscriptionList.add(subscription);
157: eventBusJMSConnectionBridge.subscribe(subscription);
158: } catch (ServiceEventBusException e) {
159: throw new ServiceEventBusException(e);
160: }
161: }
162:
163: private void setSession(ServiceEvent kapuaEvent) {
164: KapuaSession.createFrom(kapuaEvent.getScopeId(), kapuaEvent.getUserId());
165: }
166:
167: /**
168: * Stop the event bus
169: *
170: * @throws ServiceEventBusException
171: */
172: @Override
173: public void stop() throws ServiceEventBusException {
174: eventBusJMSConnectionBridge.stop();
175: }
176:
177: @Override
178: public ServiceEventBus getEventBus() {
179: return this;
180: }
181:
182: private void waitBeforeRetry() {
183: // wait a bit
184: try {
185: Thread.sleep(2000);// TODO move to configuration
186: } catch (InterruptedException e) {
187: LOGGER.error("Wait for connect interrupted!", e);
188: }
189: }
190:
191: private synchronized void restart() throws ServiceEventBusException, JMSException {
192: // restart the event bus connection bridge with a new instance
193: // so no synchronization is needed
194: EventBusJMSConnectionBridge instanceToCleanUp = null;
195: EventBusJMSConnectionBridge newInstance = null;
196: try {
197: newInstance = new EventBusJMSConnectionBridge();
198: newInstance.start();
199: // restore subscriptions
200: for (Subscription subscription : subscriptionList) {
201: newInstance.subscribe(subscription);
202: }
203: instanceToCleanUp = eventBusJMSConnectionBridge;
204: eventBusJMSConnectionBridge = newInstance;
205: } catch (Exception e) {
206: LOGGER.warn("Error while creating new Service Event Bus instance: {}", e.getMessage());
207: //try to cleanup the messy instance
208: if (newInstance!=null) {
209: try {
210: LOGGER.warn("Stopping new Service Event Bus instance...");
211: newInstance.stop();
212: LOGGER.warn("Stopping new Service Event Bus instance... DONE");
213: }
214: catch(Exception e1) {
215: //don't throw this exception since the real exception is the first one
216: LOGGER.warn("Stopping new Service Event Bus instance error: {}", e1.getMessage(), e1);
217: }
218: }
219: throw new ServiceEventBusException(e);
220: } finally {
221: try {
222: if (instanceToCleanUp != null) {
223: LOGGER.info("Stopping old Service Event Bus instance...");
224: instanceToCleanUp.stop();
225: } else {
226: LOGGER.warn("Stopping old Service Event Bus instance. Null instance found so nothig to do...");
227: }
228: } catch (ServiceEventBusException e) {
229: LOGGER.error("Stopping old Service Event Bus instance. Cannot destroy instance: {}", e.getMessage(), e);
230: } finally {
231: instanceToCleanUp = null;
232: }
233: }
234: }
235:
236: private class EventBusJMSConnectionBridge {
237:
238: private Connection jmsConnection;
239: private Map<String, SenderPool> senders = new HashMap<>();
240: private ExceptionListenerImpl exceptionListener;
241:
242: public EventBusJMSConnectionBridge() {
243: this.exceptionListener = new ExceptionListenerImpl();
244: }
245:
246: void start() throws JMSException, NamingException, ServiceEventBusException {
247: stop();
248: String eventbusUrl = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_URL);
249: String eventbusUsername = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_USERNAME);
250: String eventbusPassword = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_PASSWORD);
251:
252: Hashtable<String, String> environment = new Hashtable<>();
253: environment.put("connectionfactory.eventBusUrl", eventbusUrl);
254: environment.put("transport.useEpoll", TRANSPORT_USE_EPOLL);
255:
256: JmsInitialContextFactory initialContextFactory = new JmsInitialContextFactory();
257: Context context = initialContextFactory.getInitialContext(environment);
258: ConnectionFactory jmsConnectionFactory = (ConnectionFactory) context.lookup("eventBusUrl");
259:
260: jmsConnection = jmsConnectionFactory.createConnection(eventbusUsername, eventbusPassword);
261: jmsConnection.setExceptionListener(exceptionListener);
262: jmsConnection.start();
263: }
264:
265: void stop() throws ServiceEventBusException {
266: try {
267: if (jmsConnection != null) {
268: exceptionListener.stop();
269: jmsConnection.setExceptionListener(null);
270: jmsConnection.close();
271: }
272: } catch (JMSException e) {
273: throw new ServiceEventBusException(e);
274: } finally {
275: jmsConnection = null;
276: }
277:
278: // iterate over all possibles entries
279: Iterator<String> senderIterator = senders.keySet().iterator();
280: while (senderIterator.hasNext()) {
281: SenderPool senderPool = senders.get(senderIterator.next());
282: senderPool.close();
283: senderPool.clear();
284: // borrowed object will be returned to the pool soon (since the connection is gone bad) and then destroyed by the pool (since the pool is stopped)
285: senderIterator.remove();
286: }
287: }
288:
289: void publish(String address, ServiceEvent kapuaEvent)
290: throws ServiceEventBusException {
291: if (address != null && address.trim().length() > 0) {
292: SenderPool senderPool = senders.get(address);
293: Sender sender = null;
294: try {
295: if (senderPool == null) {
296: synchronized (SenderPool.class) {
297: senderPool = senders.get(address);
298: if (senderPool == null) {
299: senderPool = new SenderPool(new PooledSenderFactory(address));
300: senders.put(address, senderPool);
301: }
302: }
303: }
304: sender = senderPool.borrowObject();
305: sender.sendMessage(kapuaEvent);
306: } catch (Exception e) {
307: throw new ServiceEventBusException(e);
308: } finally {
309: if (sender != null) {
310: senderPool.returnObject(sender);
311: }
312: }
313: } else {
314: LOGGER.warn("Discarded event publish since the publish address is empty!");
315: }
316: }
317:
318: synchronized void subscribe(Subscription subscription)
319: throws ServiceEventBusException {
320: try {
321: String subscriptionStr = String.format("events.%s", subscription.getAddress());
322: // create a bunch of sessions to allow parallel event processing
323: LOGGER.info("Subscribing to address {} - name {} ...", subscriptionStr, subscription.getName());
324: for (int i = 0; i < CONSUMER_POOL_SIZE; i++) {
325: final Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
326: Topic jmsTopic = jmsSession.createTopic(subscriptionStr);
327: MessageConsumer jmsConsumer = jmsSession.createSharedDurableConsumer(jmsTopic, subscription.getName());
328: jmsConsumer.setMessageListener(message -> {
329: try {
330: if (message instanceof TextMessage) {
331: TextMessage textMessage = (TextMessage) message;
332: final ServiceEvent kapuaEvent = eventBusMarshaler.unmarshal(textMessage.getText());
333: setSession(kapuaEvent);
334: KapuaSecurityUtils.doPrivileged(() -> {
335: try {
336: // restore event context
337: ServiceEventScope.set(kapuaEvent);
338: subscription.getKapuaEventListener().onKapuaEvent(kapuaEvent);
339: } finally {
340: ServiceEventScope.end();
341: }
342: });
343:
344: } else {
345: LOGGER.error("Discarding wrong event message type '{}'", message != null ? message.getClass() : "null");
346: }
347: } catch (Throwable t) {
348: LOGGER.error(t.getMessage(), t);
349: // throwing the exception to prevent the message acknowledging (https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#AUTO_ACKNOWLEDGE)
350: throw KapuaRuntimeException.internalError(t);
351: }
352: });
353: }
354: LOGGER.info("Subscribing to address {} - name {} - pool size {} ...DONE", subscriptionStr, subscription.getName(), CONSUMER_POOL_SIZE);
355: } catch (JMSException e) {
356: throw new ServiceEventBusException(e);
357: }
358: }
359:
360: private class Sender {
361:
362: // TODO manage the session/producer in a stronger way (if the client disconnects due to a network error the connection will not be restored)
363: private Session jmsSession;
364: private MessageProducer jmsProducer;
365:
366: public Sender(Connection jmsConnection, String address) throws JMSException {
367: address = String.format("events.%s", address);
368: jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
369: Topic jmsTopic = jmsSession.createTopic(address);
370: jmsProducer = jmsSession.createProducer(jmsTopic);
371: }
372:
373: public void sendMessage(ServiceEvent kapuaEvent) throws Exception {
374: try {
375: TextMessage message = jmsSession.createTextMessage();
376: // Serialize outgoing kapua event based on platform configuration
377: message.setText(eventBusMarshaler.marshal(kapuaEvent));
378: message.setStringProperty(ServiceEventMarshaler.CONTENT_TYPE_KEY, eventBusMarshaler.getContentType());
379: jmsProducer.send(message);
380: } catch (JMSException | KapuaException e) {
381: LOGGER.error("Message publish interrupted: {}", e.getMessage());
382: throw e;
383: }
384: }
385:
386: public void close() {
387: try {
388: jmsSession.close();
389: } catch (JMSException e) {
390: LOGGER.warn("Cannot close the Sender connection!", e);
391: }
392: }
393:
394: }
395:
396: private class PooledSenderFactory extends BasePooledObjectFactory<Sender> {
397:
398: private String address;
399:
400: public PooledSenderFactory(String address) {
401: this.address = address;
402: }
403:
404: @Override
405: public Sender create()
406: throws Exception {
407: try {
408: return new Sender(jmsConnection, address);
409: } catch (JMSException e) {
410: throw new ServiceEventBusException(e);
411: }
412: }
413:
414: @Override
415: public PooledObject<Sender> wrap(Sender sender) {
416: return new DefaultPooledObject<>(sender);
417: }
418:
419: @Override
420: public void destroyObject(PooledObject<Sender> pooledSender)
421: throws Exception {
422: Sender sender = pooledSender.getObject();
423:• if (sender != null) {
424: sender.close();
425: }
426: super.destroyObject(pooledSender);
427: }
428:
429: }
430:
431: private class SenderPool extends GenericObjectPool<Sender> {
432:
433: public SenderPool(PooledSenderFactory factory) {
434: super(factory);
435:
436: GenericObjectPoolConfig senderPoolConfig = new GenericObjectPoolConfig();
437: senderPoolConfig.setMinIdle(PRODUCER_POOL_MIN_SIZE);
438: senderPoolConfig.setMaxIdle(PRODUCER_POOL_MAX_SIZE);
439: senderPoolConfig.setMaxTotal(PRODUCER_POOL_MAX_SIZE);
440: senderPoolConfig.setMaxWaitMillis(PRODUCER_POOL_BORROW_WAIT);
441: senderPoolConfig.setTestOnReturn(true);
442: senderPoolConfig.setTestOnBorrow(true);
443: senderPoolConfig.setTestWhileIdle(false);
444: senderPoolConfig.setBlockWhenExhausted(true);
445: senderPoolConfig.setTimeBetweenEvictionRunsMillis(PRODUCER_POOL_EVICTION_INTERVAL);
446: setConfig(senderPoolConfig);
447: }
448:
449: }
450:
451: private class ExceptionListenerImpl implements ExceptionListener {
452:
453: private boolean active = true;
454:
455: @Override
456: public void onException(JMSException e) {
457: LOGGER.error("EventBus Listener {} - Connection thrown exception: {}", this, e.getMessage(), e);
458: connectionErrorCount.inc();
459: int i = 1;
460: while (active) {
461: LOGGER.info("EventBus Listener {} - restarting attempt... {}", this, i);
462: try {
463: reconnectionRetryCount.inc();
464: restart();
465: LOGGER.info("EventBus Listener {} - EventBus restarting attempt... {} DONE (Connection restored)", this, i);
466: break;
467: } catch (ServiceEventBusException | JMSException e1) {
468: LOGGER.error("EventBus Listener {} - Cannot start new event bus connection... try again...", this, e1);
469: waitBeforeRetry();
470: }
471: i++;
472: }
473: }
474:
475: public void stop() {
476: active = false;
477: }
478: }
479: }
480:
481: private class Subscription {
482:
483: String name;
484: String address;
485: ServiceEventBusListener kapuaEventListener;
486:
487: public Subscription(String address, String name, ServiceEventBusListener kapuaEventListener) {
488: this.name = name;
489: this.address = address;
490: this.kapuaEventListener = kapuaEventListener;
491: }
492:
493: public String getName() {
494: return name;
495: }
496:
497: public String getAddress() {
498: return address;
499: }
500:
501: public ServiceEventBusListener getKapuaEventListener() {
502: return kapuaEventListener;
503: }
504:
505: }
506:
507: }