Skip to content

Package: JMSServiceEventBus$EventBusJMSConnectionBridge$PooledSenderFactory

JMSServiceEventBus$EventBusJMSConnectionBridge$PooledSenderFactory

nameinstructionbranchcomplexitylinemethod
JMSServiceEventBus.EventBusJMSConnectionBridge.PooledSenderFactory(JMSServiceEventBus.EventBusJMSConnectionBridge, String)
M: 9 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
create()
M: 17 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
destroyObject(PooledObject)
M: 12 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
wrap(JMSServiceEventBus.EventBusJMSConnectionBridge.Sender)
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2020 Eurotech and/or its affiliates and others
3: *
4: * All rights reserved. This program and the accompanying materials
5: * are made available under the terms of the Eclipse Public License v1.0
6: * which accompanies this distribution, and is available at
7: * http://www.eclipse.org/legal/epl-v10.html
8: *
9: * Contributors:
10: * Eurotech - initial API and implementation
11: *******************************************************************************/
12: package org.eclipse.kapua.commons.event.jms;
13:
14: import org.apache.commons.pool2.BasePooledObjectFactory;
15: import org.apache.commons.pool2.PooledObject;
16: import org.apache.commons.pool2.impl.DefaultPooledObject;
17: import org.apache.commons.pool2.impl.GenericObjectPool;
18: import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
19: import org.apache.qpid.jms.jndi.JmsInitialContextFactory;
20: import org.eclipse.kapua.KapuaException;
21: import org.eclipse.kapua.KapuaRuntimeException;
22: import org.eclipse.kapua.commons.event.ServiceEventBusDriver;
23: import org.eclipse.kapua.commons.event.ServiceEventBusManager;
24: import org.eclipse.kapua.commons.event.ServiceEventMarshaler;
25: import org.eclipse.kapua.commons.event.ServiceEventScope;
26: import org.eclipse.kapua.commons.metric.MetricServiceFactory;
27: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
28: import org.eclipse.kapua.commons.security.KapuaSession;
29: import org.eclipse.kapua.commons.setting.system.SystemSetting;
30: import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
31: import org.eclipse.kapua.event.ServiceEvent;
32: import org.eclipse.kapua.event.ServiceEventBus;
33: import org.eclipse.kapua.event.ServiceEventBusException;
34: import org.eclipse.kapua.event.ServiceEventBusListener;
35: import org.slf4j.Logger;
36: import org.slf4j.LoggerFactory;
37:
38: import com.codahale.metrics.Counter;
39:
40: import javax.jms.Connection;
41: import javax.jms.ConnectionFactory;
42: import javax.jms.ExceptionListener;
43: import javax.jms.JMSException;
44: import javax.jms.MessageConsumer;
45: import javax.jms.MessageProducer;
46: import javax.jms.Session;
47: import javax.jms.TextMessage;
48: import javax.jms.Topic;
49: import javax.naming.Context;
50: import javax.naming.NamingException;
51: import java.util.ArrayList;
52: import java.util.HashMap;
53: import java.util.Hashtable;
54: import java.util.Iterator;
55: import java.util.List;
56: import java.util.Map;
57:
58: import org.eclipse.kapua.KapuaException;
59: import org.eclipse.kapua.KapuaRuntimeException;
60: import org.eclipse.kapua.commons.event.ServiceEventBusDriver;
61: import org.eclipse.kapua.commons.event.ServiceEventBusManager;
62: import org.eclipse.kapua.commons.event.ServiceEventMarshaler;
63: import org.eclipse.kapua.commons.event.ServiceEventScope;
64: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
65: import org.eclipse.kapua.commons.security.KapuaSession;
66: import org.eclipse.kapua.commons.setting.system.SystemSetting;
67: import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
68: import org.eclipse.kapua.event.ServiceEvent;
69: import org.eclipse.kapua.event.ServiceEventBus;
70: import org.eclipse.kapua.event.ServiceEventBusException;
71: import org.eclipse.kapua.event.ServiceEventBusListener;
72:
73: import org.apache.commons.pool2.BasePooledObjectFactory;
74: import org.apache.commons.pool2.PooledObject;
75: import org.apache.commons.pool2.impl.DefaultPooledObject;
76: import org.apache.commons.pool2.impl.GenericObjectPool;
77: import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
78: import org.apache.qpid.jms.jndi.JmsInitialContextFactory;
79: import org.slf4j.Logger;
80: import org.slf4j.LoggerFactory;
81:
82: /**
83: * JMS event bus implementation
84: *
85: * @since 1.0
86: */
87: public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDriver {
88:
89: private static final Logger LOGGER = LoggerFactory.getLogger(JMSServiceEventBus.class);
90:
91: private static final int PRODUCER_POOL_MIN_SIZE = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_PRODUCER_POOL_MIN_SIZE);
92: private static final int PRODUCER_POOL_MAX_SIZE = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_PRODUCER_POOL_MAX_SIZE);
93: private static final int PRODUCER_POOL_BORROW_WAIT = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_PRODUCER_POOL_BORROW_WAIT_MAX);
94: private static final int PRODUCER_POOL_EVICTION_INTERVAL = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_PRODUCER_EVICTION_INTERVAL);
95: private static final int CONSUMER_POOL_SIZE = SystemSetting.getInstance().getInt(SystemSettingKey.EVENT_BUS_CONSUMER_POOL_SIZE);
96: private static final String MESSAGE_SERIALIZER = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_MESSAGE_SERIALIZER);
97: private static final String TRANSPORT_USE_EPOLL = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_TRANSPORT_USE_EPOLL);
98:
99: private List<Subscription> subscriptionList = new ArrayList<>();
100: private EventBusJMSConnectionBridge eventBusJMSConnectionBridge;
101: private ServiceEventMarshaler eventBusMarshaler;
102:
103: private Counter reconnectionRetryCount;
104: private Counter connectionErrorCount;
105:
106: /**
107: * Default constructor
108: *
109: * @throws JMSException
110: * @throws NamingException
111: */
112: public JMSServiceEventBus() throws JMSException, NamingException {
113: reconnectionRetryCount = MetricServiceFactory.getInstance().getCounter("event_bus", "handler", "reconnection_retry", "count");
114: connectionErrorCount = MetricServiceFactory.getInstance().getCounter("event_bus", "handler", "connection_error", "count");
115: eventBusJMSConnectionBridge = new EventBusJMSConnectionBridge();
116: }
117:
118: @Override
119: public String getType() {
120: return ServiceEventBusManager.JMS_20_EVENT_BUS;
121: }
122:
123: /**
124: * Start the event bus
125: *
126: * @throws ServiceEventBusException
127: */
128: @Override
129: public void start() throws ServiceEventBusException {
130: try {
131: // initialize event bus marshaler
132: Class<?> messageSerializerClazz = Class.forName(MESSAGE_SERIALIZER);
133: if (ServiceEventMarshaler.class.isAssignableFrom(messageSerializerClazz)) {
134: eventBusMarshaler = (ServiceEventMarshaler) messageSerializerClazz.newInstance();
135: } else {
136: throw new ServiceEventBusException(String.format("Wrong message serializer Object type ('%s')!", messageSerializerClazz));
137: }
138: eventBusJMSConnectionBridge.start();
139: } catch (JMSException | ClassNotFoundException | NamingException | InstantiationException | IllegalAccessException e) {
140: throw new ServiceEventBusException(e);
141: }
142: }
143:
144: @Override
145: public void publish(String address, ServiceEvent kapuaEvent)
146: throws ServiceEventBusException {
147: eventBusJMSConnectionBridge.publish(address, kapuaEvent);
148: }
149:
150: @Override
151: public synchronized void subscribe(String address, String name, final ServiceEventBusListener kapuaEventListener)
152: throws ServiceEventBusException {
153: try {
154: Subscription subscription = new Subscription(address, name, kapuaEventListener);
155: subscriptionList.add(subscription);
156: eventBusJMSConnectionBridge.subscribe(subscription);
157: } catch (ServiceEventBusException e) {
158: throw new ServiceEventBusException(e);
159: }
160: }
161:
162: private void setSession(ServiceEvent kapuaEvent) {
163: KapuaSession.createFrom(kapuaEvent.getScopeId(), kapuaEvent.getUserId());
164: }
165:
166: /**
167: * Stop the event bus
168: *
169: * @throws ServiceEventBusException
170: */
171: @Override
172: public void stop() throws ServiceEventBusException {
173: eventBusJMSConnectionBridge.stop();
174: }
175:
176: @Override
177: public ServiceEventBus getEventBus() {
178: return this;
179: }
180:
181: private void waitBeforeRetry() {
182: // wait a bit
183: try {
184: Thread.sleep(2000);// TODO move to configuration
185: } catch (InterruptedException e) {
186: LOGGER.error("Wait for connect interrupted!", e);
187: }
188: }
189:
190: private synchronized void restart() throws ServiceEventBusException, JMSException {
191: // restart the event bus connection bridge with a new instance
192: // so no synchronization is needed
193: EventBusJMSConnectionBridge instanceToCleanUp = null;
194: EventBusJMSConnectionBridge newInstance = null;
195: try {
196: newInstance = new EventBusJMSConnectionBridge();
197: newInstance.start();
198: // restore subscriptions
199: for (Subscription subscription : subscriptionList) {
200: newInstance.subscribe(subscription);
201: }
202: instanceToCleanUp = eventBusJMSConnectionBridge;
203: eventBusJMSConnectionBridge = newInstance;
204: } catch (Exception e) {
205: LOGGER.warn("Error while creating new Service Event Bus instance: {}", e.getMessage());
206: //try to cleanup the messy instance
207: if (newInstance!=null) {
208: try {
209: LOGGER.warn("Stopping new Service Event Bus instance...");
210: newInstance.stop();
211: LOGGER.warn("Stopping new Service Event Bus instance... DONE");
212: }
213: catch(Exception e1) {
214: //don't throw this exception since the real exception is the first one
215: LOGGER.warn("Stopping new Service Event Bus instance error: {}", e1.getMessage(), e1);
216: }
217: }
218: throw new ServiceEventBusException(e);
219: } finally {
220: try {
221: if (instanceToCleanUp != null) {
222: LOGGER.info("Stopping old Service Event Bus instance...");
223: instanceToCleanUp.stop();
224: } else {
225: LOGGER.warn("Stopping old Service Event Bus instance. Null instance found so nothig to do...");
226: }
227: } catch (ServiceEventBusException e) {
228: LOGGER.error("Stopping old Service Event Bus instance. Cannot destroy instance: {}", e.getMessage(), e);
229: } finally {
230: instanceToCleanUp = null;
231: }
232: }
233: }
234:
235: private class EventBusJMSConnectionBridge {
236:
237: private Connection jmsConnection;
238: private Map<String, SenderPool> senders = new HashMap<>();
239: private ExceptionListenerImpl exceptionListener;
240:
241: public EventBusJMSConnectionBridge() {
242: this.exceptionListener = new ExceptionListenerImpl();
243: }
244:
245: void start() throws JMSException, NamingException, ServiceEventBusException {
246: stop();
247: String eventbusUrl = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_URL);
248: String eventbusUsername = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_USERNAME);
249: String eventbusPassword = SystemSetting.getInstance().getString(SystemSettingKey.EVENT_BUS_PASSWORD);
250:
251: Hashtable<String, String> environment = new Hashtable<>();
252: environment.put("connectionfactory.eventBusUrl", eventbusUrl);
253: environment.put("transport.useEpoll", TRANSPORT_USE_EPOLL);
254:
255: JmsInitialContextFactory initialContextFactory = new JmsInitialContextFactory();
256: Context context = initialContextFactory.getInitialContext(environment);
257: ConnectionFactory jmsConnectionFactory = (ConnectionFactory) context.lookup("eventBusUrl");
258:
259: jmsConnection = jmsConnectionFactory.createConnection(eventbusUsername, eventbusPassword);
260: jmsConnection.setExceptionListener(exceptionListener);
261: jmsConnection.start();
262: }
263:
264: void stop() throws ServiceEventBusException {
265: try {
266: if (jmsConnection != null) {
267: exceptionListener.stop();
268: jmsConnection.setExceptionListener(null);
269: jmsConnection.close();
270: }
271: } catch (JMSException e) {
272: throw new ServiceEventBusException(e);
273: } finally {
274: jmsConnection = null;
275: }
276:
277: // iterate over all possibles entries
278: Iterator<String> senderIterator = senders.keySet().iterator();
279: while (senderIterator.hasNext()) {
280: SenderPool senderPool = senders.get(senderIterator.next());
281: senderPool.close();
282: senderPool.clear();
283: // borrowed object will be returned to the pool soon (since the connection is gone bad) and then destroyed by the pool (since the pool is stopped)
284: senderIterator.remove();
285: }
286: }
287:
288: void publish(String address, ServiceEvent kapuaEvent)
289: throws ServiceEventBusException {
290: if (address != null && address.trim().length() > 0) {
291: SenderPool senderPool = senders.get(address);
292: Sender sender = null;
293: try {
294: if (senderPool == null) {
295: synchronized (SenderPool.class) {
296: senderPool = senders.get(address);
297: if (senderPool == null) {
298: senderPool = new SenderPool(new PooledSenderFactory(address));
299: senders.put(address, senderPool);
300: }
301: }
302: }
303: sender = senderPool.borrowObject();
304: sender.sendMessage(kapuaEvent);
305: } catch (Exception e) {
306: throw new ServiceEventBusException(e);
307: } finally {
308: if (sender != null) {
309: senderPool.returnObject(sender);
310: }
311: }
312: } else {
313: LOGGER.warn("Discarded event publish since the publish address is empty!");
314: }
315: }
316:
317: synchronized void subscribe(Subscription subscription)
318: throws ServiceEventBusException {
319: try {
320: String subscriptionStr = String.format("events.%s", subscription.getAddress());
321: // create a bunch of sessions to allow parallel event processing
322: LOGGER.info("Subscribing to address {} - name {} ...", subscriptionStr, subscription.getName());
323: for (int i = 0; i < CONSUMER_POOL_SIZE; i++) {
324: final Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
325: Topic jmsTopic = jmsSession.createTopic(subscriptionStr);
326: MessageConsumer jmsConsumer = jmsSession.createSharedDurableConsumer(jmsTopic, subscription.getName());
327: jmsConsumer.setMessageListener(message -> {
328: try {
329: if (message instanceof TextMessage) {
330: TextMessage textMessage = (TextMessage) message;
331: final ServiceEvent kapuaEvent = eventBusMarshaler.unmarshal(textMessage.getText());
332: setSession(kapuaEvent);
333: KapuaSecurityUtils.doPrivileged(() -> {
334: try {
335: // restore event context
336: ServiceEventScope.set(kapuaEvent);
337: subscription.getKapuaEventListener().onKapuaEvent(kapuaEvent);
338: } finally {
339: ServiceEventScope.end();
340: }
341: });
342:
343: } else {
344: LOGGER.error("Discarding wrong event message type '{}'", message != null ? message.getClass() : "null");
345: }
346: } catch (Throwable t) {
347: LOGGER.error(t.getMessage(), t);
348: // throwing the exception to prevent the message acknowledging (https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#AUTO_ACKNOWLEDGE)
349: throw KapuaRuntimeException.internalError(t);
350: }
351: });
352: }
353: LOGGER.info("Subscribing to address {} - name {} - pool size {} ...DONE", subscriptionStr, subscription.getName(), CONSUMER_POOL_SIZE);
354: } catch (JMSException e) {
355: throw new ServiceEventBusException(e);
356: }
357: }
358:
359: private class Sender {
360:
361: // TODO manage the session/producer in a stronger way (if the client disconnects due to a network error the connection will not be restored)
362: private Session jmsSession;
363: private MessageProducer jmsProducer;
364:
365: public Sender(Connection jmsConnection, String address) throws JMSException {
366: address = String.format("events.%s", address);
367: jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
368: Topic jmsTopic = jmsSession.createTopic(address);
369: jmsProducer = jmsSession.createProducer(jmsTopic);
370: }
371:
372: public void sendMessage(ServiceEvent kapuaEvent) throws Exception {
373: try {
374: TextMessage message = jmsSession.createTextMessage();
375: // Serialize outgoing kapua event based on platform configuration
376: message.setText(eventBusMarshaler.marshal(kapuaEvent));
377: message.setStringProperty(ServiceEventMarshaler.CONTENT_TYPE_KEY, eventBusMarshaler.getContentType());
378: jmsProducer.send(message);
379: } catch (JMSException | KapuaException e) {
380: LOGGER.error("Message publish interrupted: {}", e.getMessage());
381: throw e;
382: }
383: }
384:
385: public void close() {
386: try {
387: jmsSession.close();
388: } catch (JMSException e) {
389: LOGGER.warn("Cannot close the Sender connection!", e);
390: }
391: }
392:
393: }
394:
395: private class PooledSenderFactory extends BasePooledObjectFactory<Sender> {
396:
397: private String address;
398:
399: public PooledSenderFactory(String address) {
400: this.address = address;
401: }
402:
403: @Override
404: public Sender create()
405: throws Exception {
406: try {
407: return new Sender(jmsConnection, address);
408: } catch (JMSException e) {
409: throw new ServiceEventBusException(e);
410: }
411: }
412:
413: @Override
414: public PooledObject<Sender> wrap(Sender sender) {
415: return new DefaultPooledObject<>(sender);
416: }
417:
418: @Override
419: public void destroyObject(PooledObject<Sender> pooledSender)
420: throws Exception {
421: Sender sender = pooledSender.getObject();
422:• if (sender != null) {
423: sender.close();
424: }
425: super.destroyObject(pooledSender);
426: }
427:
428: }
429:
430: private class SenderPool extends GenericObjectPool<Sender> {
431:
432: public SenderPool(PooledSenderFactory factory) {
433: super(factory);
434:
435: GenericObjectPoolConfig senderPoolConfig = new GenericObjectPoolConfig();
436: senderPoolConfig.setMinIdle(PRODUCER_POOL_MIN_SIZE);
437: senderPoolConfig.setMaxIdle(PRODUCER_POOL_MAX_SIZE);
438: senderPoolConfig.setMaxTotal(PRODUCER_POOL_MAX_SIZE);
439: senderPoolConfig.setMaxWaitMillis(PRODUCER_POOL_BORROW_WAIT);
440: senderPoolConfig.setTestOnReturn(true);
441: senderPoolConfig.setTestOnBorrow(true);
442: senderPoolConfig.setTestWhileIdle(false);
443: senderPoolConfig.setBlockWhenExhausted(true);
444: senderPoolConfig.setTimeBetweenEvictionRunsMillis(PRODUCER_POOL_EVICTION_INTERVAL);
445: setConfig(senderPoolConfig);
446: }
447:
448: }
449:
450: private class ExceptionListenerImpl implements ExceptionListener {
451:
452: private boolean active = true;
453:
454: @Override
455: public void onException(JMSException e) {
456: LOGGER.error("EventBus Listener {} - Connection thrown exception: {}", this, e.getMessage(), e);
457: connectionErrorCount.inc();
458: int i = 1;
459: while (active) {
460: LOGGER.info("EventBus Listener {} - restarting attempt... {}", this, i);
461: try {
462: reconnectionRetryCount.inc();
463: restart();
464: LOGGER.info("EventBus Listener {} - EventBus restarting attempt... {} DONE (Connection restored)", this, i);
465: break;
466: } catch (ServiceEventBusException | JMSException e1) {
467: LOGGER.error("EventBus Listener {} - Cannot start new event bus connection... try again...", this, e1);
468: waitBeforeRetry();
469: }
470: i++;
471: }
472: }
473:
474: public void stop() {
475: active = false;
476: }
477: }
478: }
479:
480: private class Subscription {
481:
482: String name;
483: String address;
484: ServiceEventBusListener kapuaEventListener;
485:
486: public Subscription(String address, String name, ServiceEventBusListener kapuaEventListener) {
487: this.name = name;
488: this.address = address;
489: this.kapuaEventListener = kapuaEventListener;
490: }
491:
492: public String getName() {
493: return name;
494: }
495:
496: public String getAddress() {
497: return address;
498: }
499:
500: public ServiceEventBusListener getKapuaEventListener() {
501: return kapuaEventListener;
502: }
503:
504: }
505:
506: }