Package: DataServiceImpl$1

DataServiceImpl$1

nameinstructionbranchcomplexitylinemethod
addingService(ServiceReference)
M: 0 C: 17
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 5
100%
M: 0 C: 1
100%
modifiedService(ServiceReference, H2DbService)
M: 0 C: 28
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 7
100%
M: 0 C: 1
100%
removedService(ServiceReference, H2DbService)
M: 0 C: 17
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 4
100%
M: 0 C: 1
100%
{...}
M: 0 C: 6
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 2
100%
M: 0 C: 1
100%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2011, 2018 Eurotech and/or its affiliates
3: *
4: * All rights reserved. This program and the accompanying materials
5: * are made available under the terms of the Eclipse Public License v1.0
6: * which accompanies this distribution, and is available at
7: * http://www.eclipse.org/legal/epl-v10.html
8: *
9: * Contributors:
10: * Eurotech
11: *******************************************************************************/
12: package org.eclipse.kura.core.data;
13:
14: import java.util.ArrayList;
15: import java.util.HashMap;
16: import java.util.List;
17: import java.util.Map;
18: import java.util.Random;
19: import java.util.concurrent.ConcurrentHashMap;
20: import java.util.concurrent.ExecutorService;
21: import java.util.concurrent.Executors;
22: import java.util.concurrent.ScheduledExecutorService;
23: import java.util.concurrent.ScheduledFuture;
24: import java.util.concurrent.TimeUnit;
25: import java.util.concurrent.atomic.AtomicBoolean;
26: import java.util.concurrent.atomic.AtomicInteger;
27: import java.util.concurrent.locks.Condition;
28: import java.util.concurrent.locks.Lock;
29: import java.util.concurrent.locks.ReentrantLock;
30: import java.util.regex.Pattern;
31:
32: import org.eclipse.kura.KuraConnectException;
33: import org.eclipse.kura.KuraException;
34: import org.eclipse.kura.KuraNotConnectedException;
35: import org.eclipse.kura.KuraStoreException;
36: import org.eclipse.kura.KuraTooManyInflightMessagesException;
37: import org.eclipse.kura.configuration.ConfigurableComponent;
38: import org.eclipse.kura.configuration.ConfigurationService;
39: import org.eclipse.kura.core.data.store.DbDataStore;
40: import org.eclipse.kura.core.internal.data.TokenBucket;
41: import org.eclipse.kura.data.DataService;
42: import org.eclipse.kura.data.DataTransportService;
43: import org.eclipse.kura.data.DataTransportToken;
44: import org.eclipse.kura.data.listener.DataServiceListener;
45: import org.eclipse.kura.data.transport.listener.DataTransportListener;
46: import org.eclipse.kura.db.H2DbService;
47: import org.eclipse.kura.status.CloudConnectionStatusComponent;
48: import org.eclipse.kura.status.CloudConnectionStatusEnum;
49: import org.eclipse.kura.status.CloudConnectionStatusService;
50: import org.eclipse.kura.watchdog.CriticalComponent;
51: import org.eclipse.kura.watchdog.WatchdogService;
52: import org.eclipse.paho.client.mqttv3.MqttException;
53: import org.osgi.framework.Filter;
54: import org.osgi.framework.FrameworkUtil;
55: import org.osgi.framework.InvalidSyntaxException;
56: import org.osgi.framework.ServiceReference;
57: import org.osgi.service.component.ComponentContext;
58: import org.osgi.service.component.ComponentException;
59: import org.osgi.util.tracker.ServiceTracker;
60: import org.osgi.util.tracker.ServiceTrackerCustomizer;
61: import org.slf4j.Logger;
62: import org.slf4j.LoggerFactory;
63:
64: public class DataServiceImpl implements DataService, DataTransportListener, ConfigurableComponent,
65: CloudConnectionStatusComponent, CriticalComponent {
66:
67: private static final Logger logger = LoggerFactory.getLogger(DataServiceImpl.class);
68:
69: private static final int TRANSPORT_TASK_TIMEOUT = 1; // In seconds
70:
71: private DataServiceOptions dataServiceOptions;
72:
73: private DataTransportService dataTransportService;
74: private H2DbService dbService;
75: private DataServiceListenerS dataServiceListeners;
76:
77: protected ScheduledExecutorService connectionMonitorExecutor;
78: private ScheduledFuture<?> connectionMonitorFuture;
79:
80: // A dedicated executor for the publishing task
81: private ExecutorService publisherExecutor;
82:
83: private DataStore store;
84:
85: private Map<DataTransportToken, Integer> inFlightMsgIds;
86:
87: private ScheduledExecutorService congestionExecutor;
88: private ScheduledFuture<?> congestionFuture;
89:
90: private CloudConnectionStatusService cloudConnectionStatusService;
91: private CloudConnectionStatusEnum notificationStatus = CloudConnectionStatusEnum.OFF;
92:
93: private TokenBucket throttle;
94:
95: private final Lock lock = new ReentrantLock();
96: private boolean notifyPending;
97: private final Condition lockCondition = this.lock.newCondition();
98:
99: private final AtomicBoolean publisherEnabled = new AtomicBoolean();
100:
101: private ServiceTracker<H2DbService, H2DbService> dbServiceTracker;
102: private ComponentContext componentContext;
103:
104: private WatchdogService watchdogService;
105:
106: private AtomicInteger connectionAttempts;
107:
108: // ----------------------------------------------------------------
109: //
110: // Activation APIs
111: //
112: // ----------------------------------------------------------------
113:
114: protected void activate(ComponentContext componentContext, Map<String, Object> properties) {
115: String pid = (String) properties.get(ConfigurationService.KURA_SERVICE_PID);
116: logger.info("Activating {}...", pid);
117:
118: this.componentContext = componentContext;
119:
120: this.dataServiceOptions = new DataServiceOptions(properties);
121:
122: this.connectionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
123: this.publisherExecutor = Executors.newSingleThreadExecutor();
124: this.congestionExecutor = Executors.newSingleThreadScheduledExecutor();
125:
126: createThrottle();
127: submitPublishingWork();
128:
129: this.store = new DbDataStore(pid);
130:
131: restartDbServiceTracker(this.dataServiceOptions.getDbServiceInstancePid());
132:
133: this.dataServiceListeners = new DataServiceListenerS(componentContext);
134:
135: // Register the component in the CloudConnectionStatus Service
136: this.cloudConnectionStatusService.register(this);
137:
138: this.dataTransportService.addDataTransportListener(this);
139:
140: startConnectionMonitorTask();
141: }
142:
143: private void restartDbServiceTracker(String kuraServicePid) {
144: stopDbServiceTracker();
145: try {
146: final Filter filter = FrameworkUtil
147: .createFilter("(" + ConfigurationService.KURA_SERVICE_PID + "=" + kuraServicePid + ")");
148: this.dbServiceTracker = new ServiceTracker<>(this.componentContext.getBundleContext(), filter,
149: new ServiceTrackerCustomizer<H2DbService, H2DbService>() {
150:
151: @Override
152: public H2DbService addingService(ServiceReference<H2DbService> reference) {
153: logger.info("H2DbService instance found");
154: H2DbService dbService = DataServiceImpl.this.componentContext.getBundleContext()
155: .getService(reference);
156: setH2DbService(dbService);
157: return dbService;
158: }
159:
160: @Override
161: public void modifiedService(ServiceReference<H2DbService> reference, H2DbService service) {
162: logger.info("H2DbService instance updated, recreating table if needed...");
163: synchronized (DataServiceImpl.this) {
164: DataServiceImpl.this.store.update(
165: DataServiceImpl.this.dataServiceOptions.getStoreHousekeeperInterval(),
166: DataServiceImpl.this.dataServiceOptions.getStorePurgeAge(),
167: DataServiceImpl.this.dataServiceOptions.getStoreCapacity());
168: }
169: }
170:
171: @Override
172: public void removedService(ServiceReference<H2DbService> reference, H2DbService service) {
173: logger.info("H2DbService instance removed");
174: unsetH2DbService(DataServiceImpl.this.dbService);
175: DataServiceImpl.this.componentContext.getBundleContext().ungetService(reference);
176: }
177: });
178: this.dbServiceTracker.open();
179: } catch (InvalidSyntaxException e) {
180: throw new ComponentException(e);
181: }
182: }
183:
184: private void stopDbServiceTracker() {
185: if (this.dbServiceTracker != null) {
186: this.dbServiceTracker.close();
187: this.dbServiceTracker = null;
188: }
189: }
190:
191: private synchronized void startDbStore() {
192: try {
193: this.store.start(this.dbService, this.dataServiceOptions.getStoreHousekeeperInterval(),
194: this.dataServiceOptions.getStorePurgeAge(), this.dataServiceOptions.getStoreCapacity());
195:
196: // The initial list of in-flight messages
197: List<DataMessage> inFlightMsgs = this.store.allInFlightMessagesNoPayload();
198:
199: // The map associating a DataTransportToken with a message ID
200: this.inFlightMsgIds = new ConcurrentHashMap<>();
201:
202: if (inFlightMsgs != null) {
203: for (DataMessage message : inFlightMsgs) {
204:
205: DataTransportToken token = new DataTransportToken(message.getPublishedMessageId(),
206: message.getSessionId());
207: this.inFlightMsgIds.put(token, message.getId());
208:
209: logger.debug("Restored in-fligh messages from store. Topic: {}, ID: {}, MQTT message ID: {}",
210: new Object[] { message.getTopic(), message.getId(), message.getPublishedMessageId() });
211: }
212: }
213: } catch (KuraStoreException e) {
214: logger.error("Failed to start store", e);
215: }
216: }
217:
218: public synchronized void updated(Map<String, Object> properties) {
219: logger.info("Updating {}...", properties.get(ConfigurationService.KURA_SERVICE_PID));
220:
221: stopConnectionMonitorTask();
222:
223: final String oldDbServicePid = this.dataServiceOptions.getDbServiceInstancePid();
224:
225: this.dataServiceOptions = new DataServiceOptions(properties);
226:
227: createThrottle();
228:
229: final String currentDbServicePid = this.dataServiceOptions.getDbServiceInstancePid();
230:
231: if (oldDbServicePid.equals(currentDbServicePid)) {
232: if (this.dbService != null) {
233: this.store.update(this.dataServiceOptions.getStoreHousekeeperInterval(),
234: this.dataServiceOptions.getStorePurgeAge(), this.dataServiceOptions.getStoreCapacity());
235: }
236: } else {
237: restartDbServiceTracker(currentDbServicePid);
238: }
239:
240: if (!this.dataTransportService.isConnected()) {
241: startConnectionMonitorTask();
242: }
243: }
244:
245: protected void deactivate(ComponentContext componentContext) {
246: logger.info("Deactivating {}...", this.dataServiceOptions.getKuraServicePid());
247:
248: stopConnectionMonitorTask();
249: this.connectionMonitorExecutor.shutdownNow();
250:
251: this.congestionExecutor.shutdownNow();
252:
253: disconnect();
254:
255: // Await termination of the publisher executor tasks
256: try {
257: // Waits to publish latest messages e.g. disconnect message
258: Thread.sleep(TRANSPORT_TASK_TIMEOUT * 1000L);
259:
260: // Clean publisher thread shutdown
261: this.publisherEnabled.set(false);
262: signalPublisher();
263: } catch (InterruptedException e) {
264: Thread.currentThread().interrupt();
265: logger.info("Interrupted", e);
266: }
267: this.publisherExecutor.shutdownNow();
268:
269: this.dataTransportService.removeDataTransportListener(this);
270:
271: this.store.stop();
272:
273: stopDbServiceTracker();
274: }
275:
276: // ----------------------------------------------------------------
277: //
278: // Dependencies
279: //
280: // ----------------------------------------------------------------
281:
282: public void setDataTransportService(DataTransportService dataTransportService) {
283: this.dataTransportService = dataTransportService;
284: }
285:
286: public void unsetDataTransportService(DataTransportService dataTransportService) {
287: this.dataTransportService = null;
288: }
289:
290: public synchronized void setH2DbService(H2DbService dbService) {
291: this.dbService = dbService;
292: startDbStore();
293: signalPublisher();
294: }
295:
296: public synchronized void unsetH2DbService(H2DbService dbService) {
297: this.dbService = null;
298: disconnect();
299: this.store.stop();
300: }
301:
302: public void setCloudConnectionStatusService(CloudConnectionStatusService cloudConnectionStatusService) {
303: this.cloudConnectionStatusService = cloudConnectionStatusService;
304: }
305:
306: public void unsetCloudConnectionStatusService(CloudConnectionStatusService cloudConnectionStatusService) {
307: this.cloudConnectionStatusService = null;
308: }
309:
310: public void setWatchdogService(WatchdogService watchdogService) {
311: this.watchdogService = watchdogService;
312: }
313:
314: public void unsetWatchdogService(WatchdogService watchdogService) {
315: this.watchdogService = null;
316: }
317:
318: @Override
319: public void addDataServiceListener(DataServiceListener listener) {
320: this.dataServiceListeners.add(listener);
321: }
322:
323: @Override
324: public void removeDataServiceListener(DataServiceListener listener) {
325: this.dataServiceListeners.remove(listener);
326: }
327:
328: @Override
329: public void onConnectionEstablished(boolean newSession) {
330:
331: logger.info("Notified connected");
332: this.cloudConnectionStatusService.updateStatus(this, CloudConnectionStatusEnum.ON);
333:
334: // On a new session all messages the were in-flight in the previous session
335: // would be lost and never confirmed by the DataPublisherService.
336: //
337: // If the DataPublisherService is configured with Clean Start flag set to true,
338: // then the session and connection boundaries are the same.
339: // Otherwise, a session spans multiple connections as far as the client connects
340: // to the same broker instance with the same client ID.
341: //
342: // We have two options here:
343: // Forget them.
344: // Unpublish them so they will be republished on the new session.
345: //
346: // The latter has the potential drawback that duplicates can be generated with any QoS.
347: // This can occur for example if the DataPublisherService is connecting with a different client ID
348: // or to a different broker URL resolved to the same broker instance.
349: //
350: // Also note that unpublished messages will be republished accordingly to their
351: // original priority. Thus a message reordering may occur too.
352: // Even if we artificially upgraded the priority of unpublished messages to -1 so to
353: // republish them first, their relative order would not necessarily match the order
354: // in the DataPublisherService persistence.
355:
356: if (newSession) {
357: if (this.dataServiceOptions.isPublishInFlightMessages()) {
358: logger.info(
359: "New session established. Unpublishing all in-flight messages. Disregarding the QoS level, this may cause duplicate messages.");
360: try {
361: this.store.unpublishAllInFlighMessages();
362: this.inFlightMsgIds.clear();
363: } catch (KuraStoreException e) {
364: logger.error("Failed to unpublish in-flight messages", e);
365: }
366: } else {
367: logger.info("New session established. Dropping all in-flight messages.");
368: try {
369: this.store.dropAllInFlightMessages();
370: this.inFlightMsgIds.clear();
371: } catch (KuraStoreException e) {
372: logger.error("Failed to drop in-flight messages", e);
373: }
374: }
375: }
376:
377: // Notify the listeners
378: this.dataServiceListeners.onConnectionEstablished();
379:
380: signalPublisher();
381: }
382:
383: @Override
384: public void onDisconnecting() {
385: logger.info("Notified disconnecting");
386:
387: // Notify the listeners
388: this.dataServiceListeners.onDisconnecting();
389: }
390:
391: @Override
392: public void onDisconnected() {
393: logger.info("Notified disconnected");
394: this.cloudConnectionStatusService.updateStatus(this, CloudConnectionStatusEnum.OFF);
395:
396: // Notify the listeners
397: this.dataServiceListeners.onDisconnected();
398: }
399:
400: @Override
401: public void onConfigurationUpdating(boolean wasConnected) {
402: logger.info("Notified DataTransportService configuration updating...");
403: stopConnectionMonitorTask();
404: disconnect(0);
405: }
406:
407: @Override
408: public void onConfigurationUpdated(boolean wasConnected) {
409: logger.info("Notified DataTransportService configuration updated.");
410: boolean autoConnect = startConnectionMonitorTask();
411: if (!autoConnect && wasConnected) {
412: try {
413: connect();
414: } catch (KuraConnectException e) {
415: logger.error("Error during re-connect after configuration update.", e);
416: }
417: }
418: }
419:
420: @Override
421: public void onConnectionLost(Throwable cause) {
422: logger.info("connectionLost");
423:
424: stopConnectionMonitorTask(); // Just in case...
425: startConnectionMonitorTask();
426:
427: // Notify the listeners
428: this.dataServiceListeners.onConnectionLost(cause);
429: }
430:
431: @Override
432: public void onMessageArrived(String topic, byte[] payload, int qos, boolean retained) {
433:
434: logger.debug("Message arrived on topic: {}", topic);
435:
436: // Notify the listeners
437: this.dataServiceListeners.onMessageArrived(topic, payload, qos, retained);
438:
439: signalPublisher();
440: }
441:
442: @Override
443: // It's very important that the publishInternal and messageConfirmed methods are synchronized
444: public synchronized void onMessageConfirmed(DataTransportToken token) {
445:
446: logger.debug("Confirmed message with MQTT message ID: {} on session ID: {}", token.getMessageId(),
447: token.getSessionId());
448:
449: Integer messageId = this.inFlightMsgIds.remove(token);
450: if (messageId == null) {
451: logger.info(
452: "Confirmed message published with MQTT message ID: {} not tracked in the map of in-flight messages",
453: token.getMessageId());
454: } else {
455:
456: DataMessage confirmedMessage = null;
457: try {
458: logger.info("Confirmed message ID: {} to store", messageId);
459: this.store.confirmed(messageId);
460: confirmedMessage = this.store.get(messageId);
461: } catch (KuraStoreException e) {
462: logger.error("Cannot confirm message to store", e);
463: }
464:
465: // Notify the listeners
466: if (confirmedMessage != null) {
467: String topic = confirmedMessage.getTopic();
468: this.dataServiceListeners.onMessageConfirmed(messageId, topic);
469: } else {
470: logger.error("Confirmed Message with ID {} could not be loaded from the DataStore.", messageId);
471: }
472: }
473:
474: if (this.inFlightMsgIds.size() < this.dataServiceOptions.getMaxInFlightMessages()) {
475: handleInFlightDecongestion();
476: }
477:
478: signalPublisher();
479: }
480:
481: @Override
482: public void connect() throws KuraConnectException {
483: stopConnectionMonitorTask();
484: if (this.dbService == null) {
485: throw new KuraConnectException("H2DbService instance not attached, not connecting");
486: }
487:
488: if (!this.dataTransportService.isConnected()) {
489: this.dataTransportService.connect();
490: }
491: }
492:
493: @Override
494: public boolean isConnected() {
495: return this.dataTransportService.isConnected();
496: }
497:
498: @Override
499: public boolean isAutoConnectEnabled() {
500: return this.dataServiceOptions.isAutoConnect();
501: }
502:
503: @Override
504: public int getRetryInterval() {
505: return this.dataServiceOptions.getConnectDelay();
506: }
507:
508: @Override
509: public void disconnect(long quiesceTimeout) {
510: stopConnectionMonitorTask();
511: this.dataTransportService.disconnect(quiesceTimeout);
512: }
513:
514: @Override
515: public void subscribe(String topic, int qos) throws KuraException {
516: this.dataTransportService.subscribe(topic, qos);
517: }
518:
519: @Override
520: public void unsubscribe(String topic) throws KuraException {
521: this.dataTransportService.unsubscribe(topic);
522: }
523:
524: @Override
525: public int publish(String topic, byte[] payload, int qos, boolean retain, int priority) throws KuraStoreException {
526:
527: logger.info("Storing message on topic: {}, priority: {}", topic, priority);
528:
529: DataMessage dataMsg = this.store.store(topic, payload, qos, retain, priority);
530: logger.info("Stored message on topic: {}, priority: {}", topic, priority);
531:
532: signalPublisher();
533:
534: return dataMsg.getId();
535: }
536:
537: @Override
538: public List<Integer> getUnpublishedMessageIds(String topicRegex) throws KuraStoreException {
539: List<DataMessage> messages = this.store.allUnpublishedMessagesNoPayload();
540: return buildMessageIds(messages, topicRegex);
541: }
542:
543: @Override
544: public List<Integer> getInFlightMessageIds(String topicRegex) throws KuraStoreException {
545: List<DataMessage> messages = this.store.allInFlightMessagesNoPayload();
546: return buildMessageIds(messages, topicRegex);
547: }
548:
549: @Override
550: public List<Integer> getDroppedInFlightMessageIds(String topicRegex) throws KuraStoreException {
551: List<DataMessage> messages = this.store.allDroppedInFlightMessagesNoPayload();
552: return buildMessageIds(messages, topicRegex);
553: }
554:
555: private void signalPublisher() {
556: this.lock.lock();
557: this.notifyPending = true;
558: this.lockCondition.signal();
559: this.lock.unlock();
560: }
561:
562: private boolean startConnectionMonitorTask() {
563: if (this.connectionMonitorFuture != null && !this.connectionMonitorFuture.isDone()) {
564: logger.error("Reconnect task already running");
565: throw new IllegalStateException("Reconnect task already running");
566: }
567:
568: //
569: // Establish a reconnect Thread based on the reconnect interval
570: boolean autoConnect = this.dataServiceOptions.isAutoConnect();
571: int reconnectInterval = this.dataServiceOptions.getConnectDelay();
572: if (autoConnect) {
573:
574: if (this.dataServiceOptions.isConnectionRecoveryEnabled()) {
575: this.watchdogService.registerCriticalComponent(this);
576: this.watchdogService.checkin(this);
577: this.connectionAttempts = new AtomicInteger(0);
578: }
579:
580: // Change notification status to slow blinking when connection is expected to happen in the future
581: this.cloudConnectionStatusService.updateStatus(this, CloudConnectionStatusEnum.SLOW_BLINKING);
582: // add a delay on the reconnect
583: int maxDelay = reconnectInterval / 5;
584: maxDelay = maxDelay > 0 ? maxDelay : 1;
585: int initialDelay = new Random().nextInt(maxDelay);
586:
587: logger.info("Starting reconnect task with initial delay {}", initialDelay);
588: this.connectionMonitorFuture = this.connectionMonitorExecutor.scheduleAtFixedRate(new Runnable() {
589:
590: @Override
591: public void run() {
592: String originalName = Thread.currentThread().getName();
593: Thread.currentThread().setName("DataServiceImpl:ReconnectTask");
594: boolean connected = false;
595: try {
596: if (DataServiceImpl.this.dbService == null) {
597: logger.warn("H2DbService instance not attached, not connecting");
598: return;
599: }
600: logger.info("Connecting...");
601: if (DataServiceImpl.this.dataTransportService.isConnected()) {
602: logger.info("Already connected. Reconnect task will be terminated.");
603:
604: } else {
605: DataServiceImpl.this.dataTransportService.connect();
606: logger.info("Connected. Reconnect task will be terminated.");
607: }
608: connected = true;
609: } catch (KuraConnectException e) {
610: logger.warn("Connect failed", e);
611:
612: if (DataServiceImpl.this.dataServiceOptions.isConnectionRecoveryEnabled()) {
613: if (isAuthenticationException(e) || DataServiceImpl.this.connectionAttempts
614: .getAndIncrement() < DataServiceImpl.this.dataServiceOptions
615: .getRecoveryMaximumAllowedFailures()) {
616: logger.info("Checkin done.");
617: DataServiceImpl.this.watchdogService.checkin(DataServiceImpl.this);
618: } else {
619: logger.info("Maximum number of connection attempts reached. Requested reboot...");
620: }
621: }
622: } catch (Error e) {
623: // There's nothing we can do here but log an exception.
624: logger.error("Unexpected Error. Task will be terminated", e);
625: throw e;
626: } finally {
627: Thread.currentThread().setName(originalName);
628: if (connected) {
629: unregisterAsCriticalComponent();
630: // Throwing an exception will suppress subsequent executions of this periodic task.
631: throw new RuntimeException("Connected. Reconnect task will be terminated.");
632: }
633: }
634: }
635:
636: private boolean isAuthenticationException(KuraConnectException e) {
637: boolean authenticationException = false;
638: if (e.getCause() instanceof MqttException) {
639: MqttException mqttException = (MqttException) e.getCause();
640: if (mqttException.getReasonCode() == MqttException.REASON_CODE_FAILED_AUTHENTICATION
641: || mqttException.getReasonCode() == MqttException.REASON_CODE_INVALID_CLIENT_ID
642: || mqttException.getReasonCode() == MqttException.REASON_CODE_NOT_AUTHORIZED) {
643: logger.info("Authentication exception encountered.");
644: authenticationException = true;
645: }
646: }
647: return authenticationException;
648: }
649: }, initialDelay, reconnectInterval, TimeUnit.SECONDS);
650: } else {
651: // Change notification status to off. Connection is not expected to happen in the future
652: this.cloudConnectionStatusService.updateStatus(this, CloudConnectionStatusEnum.OFF);
653: unregisterAsCriticalComponent();
654: }
655: return autoConnect;
656: }
657:
658: private void createThrottle() {
659: if (this.dataServiceOptions.isRateLimitEnabled()) {
660: int publishRate = this.dataServiceOptions.getRateLimitAverageRate();
661: int burstLength = this.dataServiceOptions.getRateLimitBurstSize();
662:
663: long publishPeriod = this.dataServiceOptions.getRateLimitTimeUnit() / publishRate;
664:
665: logger.info("Get Throttle with burst length {} and send a message every {} millis", burstLength,
666: publishPeriod);
667: this.throttle = new TokenBucket(burstLength, publishPeriod);
668: }
669: }
670:
671: private void stopConnectionMonitorTask() {
672: if (this.connectionMonitorFuture != null && !this.connectionMonitorFuture.isDone()) {
673:
674: logger.info("Reconnect task running. Stopping it");
675:
676: this.connectionMonitorFuture.cancel(true);
677: }
678: unregisterAsCriticalComponent();
679: }
680:
681: private void unregisterAsCriticalComponent() {
682: this.watchdogService.unregisterCriticalComponent(this);
683: }
684:
685: private void disconnect() {
686: long millis = this.dataServiceOptions.getDisconnectDelay() * 1000L;
687: this.dataTransportService.disconnect(millis);
688: }
689:
690: private void submitPublishingWork() {
691: this.publisherEnabled.set(true);
692:
693: this.publisherExecutor.execute(new PublishManager());
694: }
695:
696: // It's very important that the publishInternal and messageConfirmed methods are synchronized
697: private synchronized void publishInternal(DataMessage message) throws KuraException {
698:
699: String topic = message.getTopic();
700: byte[] payload = message.getPayload();
701: int qos = message.getQos();
702: boolean retain = message.isRetain();
703: int msgId = message.getId();
704:
705: logger.debug("Publishing message with ID: {} on topic: {}, priority: {}", msgId, topic, message.getPriority());
706:
707: DataTransportToken token = DataServiceImpl.this.dataTransportService.publish(topic, payload, qos, retain);
708:
709: if (token == null) {
710: DataServiceImpl.this.store.published(msgId);
711: logger.debug("Published message with ID: {}", msgId);
712: } else {
713:
714: // Check if the token is already tracked in the map (in which case we are in trouble)
715: Integer trackedMsgId = DataServiceImpl.this.inFlightMsgIds.get(token);
716: if (trackedMsgId != null) {
717: logger.error("Token already tracked: {} - {}", token.getSessionId(), token.getMessageId());
718: }
719:
720: DataServiceImpl.this.inFlightMsgIds.put(token, msgId);
721: DataServiceImpl.this.store.published(msgId, token.getMessageId(), token.getSessionId());
722: logger.debug("Published message with ID: {} and MQTT message ID: {}", msgId, token.getMessageId());
723: }
724: }
725:
726: private List<Integer> buildMessageIds(List<DataMessage> messages, String topicRegex) {
727: Pattern topicPattern = Pattern.compile(topicRegex);
728: List<Integer> ids = new ArrayList<>();
729:
730: if (messages != null) {
731: for (DataMessage message : messages) {
732: String topic = message.getTopic();
733: if (topicPattern.matcher(topic).matches()) {
734: ids.add(message.getId());
735: }
736: }
737: }
738:
739: return ids;
740: }
741:
742: private void handleInFlightDecongestion() {
743: if (this.congestionFuture != null && !this.congestionFuture.isDone()) {
744: this.congestionFuture.cancel(true);
745: }
746: }
747:
748: @Override
749: public int getNotificationPriority() {
750: return CloudConnectionStatusService.PRIORITY_LOW;
751: }
752:
753: @Override
754: public CloudConnectionStatusEnum getNotificationStatus() {
755: return this.notificationStatus;
756: }
757:
758: @Override
759: public void setNotificationStatus(CloudConnectionStatusEnum status) {
760: this.notificationStatus = status;
761: }
762:
763: private final class PublishManager implements Runnable {
764:
765: @Override
766: public void run() {
767: Thread.currentThread().setName("DataServiceImpl:Submit");
768: while (DataServiceImpl.this.publisherEnabled.get()) {
769: long sleepingTime = -1;
770: boolean messagePublished = false;
771:
772: if (DataServiceImpl.this.dataTransportService.isConnected()) {
773: try {
774: DataMessage message = DataServiceImpl.this.store.getNextMessage();
775:
776: if (message != null) {
777: checkInFlightMessages(message);
778:
779: if (DataServiceImpl.this.dataServiceOptions.isRateLimitEnabled()
780: && message.getPriority() >= 5) {
781: messagePublished = publishMessageTokenBucket(message);
782: sleepingTime = DataServiceImpl.this.throttle.getTokenWaitTime();
783: } else {
784: publishMessageUnbound(message);
785: messagePublished = true;
786: }
787: }
788: } catch (KuraNotConnectedException e) {
789: logger.info("DataPublisherService is not connected");
790: } catch (KuraTooManyInflightMessagesException e) {
791: logger.info("Too many in-flight messages");
792: handleInFlightCongestion();
793: } catch (Exception e) {
794: logger.error("Probably an unrecoverable exception", e);
795: }
796: } else {
797: logger.info("DataPublisherService not connected");
798: }
799:
800: if (!messagePublished) {
801: suspendPublisher(sleepingTime, TimeUnit.MILLISECONDS);
802: }
803: }
804: logger.debug("Exited publisher loop.");
805: }
806:
807: private void checkInFlightMessages(DataMessage message) throws KuraTooManyInflightMessagesException {
808: if (message.getQos() > 0 && DataServiceImpl.this.inFlightMsgIds
809: .size() >= DataServiceImpl.this.dataServiceOptions.getMaxInFlightMessages()) {
810: logger.warn("The configured maximum number of in-flight messages has been reached");
811: throw new KuraTooManyInflightMessagesException("Too many in-flight messages");
812: }
813: }
814:
815: private void suspendPublisher(long timeout, TimeUnit timeUnit) {
816: if (!DataServiceImpl.this.publisherEnabled.get()) {
817: return;
818: }
819: try {
820: DataServiceImpl.this.lock.lock();
821: if (!DataServiceImpl.this.notifyPending) {
822: if (timeout == -1) {
823: logger.debug("Suspending publishing thread indefinitely");
824: DataServiceImpl.this.lockCondition.await();
825: } else {
826: logger.debug("Suspending publishing thread for {} milliseconds", timeout);
827: DataServiceImpl.this.lockCondition.await(timeout, timeUnit);
828: }
829: }
830: DataServiceImpl.this.notifyPending = false;
831: } catch (InterruptedException e) {
832: Thread.currentThread().interrupt();
833: } finally {
834: DataServiceImpl.this.lock.unlock();
835: }
836: }
837:
838: private void publishMessageUnbound(DataMessage message) throws KuraException {
839: publishInternal(message);
840: // Notify the listeners
841: DataServiceImpl.this.dataServiceListeners.onMessagePublished(message.getId(), message.getTopic());
842: }
843:
844: private boolean publishMessageTokenBucket(DataMessage message) throws KuraException {
845: boolean tokenAvailable = DataServiceImpl.this.throttle.getToken();
846:
847: if (tokenAvailable) {
848: publishMessageUnbound(message);
849: return true;
850: }
851: return false;
852: }
853:
854: private void handleInFlightCongestion() {
855: int timeout = DataServiceImpl.this.dataServiceOptions.getInFlightMessagesCongestionTimeout();
856:
857: // Do not schedule more that one task at a time
858: if (timeout != 0 && (DataServiceImpl.this.congestionFuture == null
859: || DataServiceImpl.this.congestionFuture.isDone())) {
860: logger.warn("In-flight message congestion timeout started");
861: DataServiceImpl.this.congestionFuture = DataServiceImpl.this.congestionExecutor.schedule(() -> {
862: Thread.currentThread().setName("DataServiceImpl:InFlightCongestion");
863: logger.warn("In-flight message congestion timeout elapsed. Disconnecting and reconnecting again");
864: disconnect();
865: startConnectionMonitorTask();
866: }, timeout, TimeUnit.SECONDS);
867: }
868: }
869: }
870:
871: @Override
872: public String getCriticalComponentName() {
873: return "DataServiceImpl";
874: }
875:
876: @Override
877: public int getCriticalComponentTimeout() {
878: return this.dataServiceOptions.getCriticalComponentTimeout();
879: }
880:
881: public Map<String, String> getConnectionInfo() {
882: Map<String, String> result = new HashMap<>();
883: result.put("Broker URL", this.dataTransportService.getBrokerUrl());
884: result.put("Account", this.dataTransportService.getAccountName());
885: result.put("Username", this.dataTransportService.getUsername());
886: result.put("Client ID", this.dataTransportService.getClientId());
887: return result;
888: }
889: }