Skip to content

Package: DeviceMessageListener

DeviceMessageListener

nameinstructionbranchcomplexitylinemethod
DeviceMessageListener()
M: 94 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
processAppsMessage(CamelKapuaMessage)
M: 20 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
processBirthMessage(CamelKapuaMessage)
M: 36 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 12 C: 0
0%
M: 1 C: 0
0%
processDisconnectMessage(CamelKapuaMessage)
M: 20 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
processMissingMessage(CamelKapuaMessage)
M: 20 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 14 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.broker.core.listener;
14:
15: import com.codahale.metrics.Counter;
16: import org.apache.camel.spi.UriEndpoint;
17: import org.eclipse.kapua.KapuaException;
18: import org.eclipse.kapua.broker.core.message.CamelKapuaMessage;
19: import org.eclipse.kapua.locator.KapuaLocator;
20: import org.eclipse.kapua.message.device.lifecycle.KapuaAppsMessage;
21: import org.eclipse.kapua.message.device.lifecycle.KapuaBirthMessage;
22: import org.eclipse.kapua.message.device.lifecycle.KapuaDisconnectMessage;
23: import org.eclipse.kapua.message.device.lifecycle.KapuaMissingMessage;
24: import org.eclipse.kapua.service.device.management.job.scheduler.manager.JobDeviceManagementTriggerManagerService;
25: import org.eclipse.kapua.service.device.registry.lifecycle.DeviceLifeCycleService;
26: import org.slf4j.Logger;
27: import org.slf4j.LoggerFactory;
28:
29: /**
30: * Device messages listener (device life cycle).
31: * <p>
32: * Manage:<br>
33: * - BIRTH/DC/LWT/APPS device messages<br>
34: * Republish of the lifecycle messages (once processed by the broker) isn't supported yet (see #136).
35: *
36: * @since 1.0.0
37: */
38: @UriEndpoint(title = "device message processor", syntax = "bean:deviceMessageListener", scheme = "bean")
39: public class DeviceMessageListener extends AbstractListener {
40:
41: private static final Logger LOG = LoggerFactory.getLogger(DeviceMessageListener.class);
42:
43: private static DeviceLifeCycleService deviceLifeCycleService = KapuaLocator.getInstance().getService(DeviceLifeCycleService.class);
44: private static JobDeviceManagementTriggerManagerService jobDeviceManagementTriggerManagerService = KapuaLocator.getInstance().getService(JobDeviceManagementTriggerManagerService.class);
45:
46: // metrics
47: private Counter metricDeviceBirthMessage;
48: private Counter metricDeviceDisconnectMessage;
49: private Counter metricDeviceMissingMessage;
50: private Counter metricDeviceAppsMessage;
51: private Counter metricDeviceErrorMessage;
52:
53: public DeviceMessageListener() {
54: super(DeviceManagementRegistryNotificationMetrics.METRIC_COMPONENT_DEVICE_LIFE_CYCLE);
55: metricDeviceBirthMessage = registerCounter(DeviceManagementRegistryNotificationMetrics.METRIC_MESSAGES, DeviceManagementRegistryNotificationMetrics.METRIC_BIRTH, DeviceManagementRegistryNotificationMetrics.METRIC_COUNT);
56: metricDeviceDisconnectMessage = registerCounter(DeviceManagementRegistryNotificationMetrics.METRIC_MESSAGES, DeviceManagementRegistryNotificationMetrics.METRIC_DC, DeviceManagementRegistryNotificationMetrics.METRIC_COUNT);
57: metricDeviceMissingMessage = registerCounter(DeviceManagementRegistryNotificationMetrics.METRIC_MESSAGES, DeviceManagementRegistryNotificationMetrics.METRIC_MISSING, DeviceManagementRegistryNotificationMetrics.METRIC_COUNT);
58: metricDeviceAppsMessage = registerCounter(DeviceManagementRegistryNotificationMetrics.METRIC_MESSAGES, DeviceManagementRegistryNotificationMetrics.METRIC_APPS, DeviceManagementRegistryNotificationMetrics.METRIC_COUNT);
59: metricDeviceErrorMessage = registerCounter(DeviceManagementRegistryNotificationMetrics.METRIC_MESSAGES, DeviceManagementRegistryNotificationMetrics.METRIC_ERROR, DeviceManagementRegistryNotificationMetrics.METRIC_COUNT);
60: }
61:
62: /**
63: * Process a birth message.
64: *
65: * @param birthMessage The birth message to process.
66: * @since 1.0.0
67: */
68: public void processBirthMessage(CamelKapuaMessage<KapuaBirthMessage> birthMessage) {
69: try {
70: deviceLifeCycleService.birth(birthMessage.getConnectionId(), birthMessage.getMessage());
71: metricDeviceBirthMessage.inc();
72: } catch (KapuaException e) {
73: metricDeviceErrorMessage.inc();
74: LOG.error("Error while processing device birth life-cycle event", e);
75: }
76:
77: try {
78: KapuaBirthMessage kapuaBirthMessage = birthMessage.getMessage();
79:
80: jobDeviceManagementTriggerManagerService.processOnConnect(kapuaBirthMessage.getScopeId(), kapuaBirthMessage.getDeviceId());
81: } catch (Exception e) {
82: LOG.error("Error while processing device birth to trigger jobs", e);
83: }
84: }
85:
86: /**
87: * Process a disconnect message.
88: *
89: * @param disconnectMessage The disconnect message to process.
90: * @since 1.0.0
91: */
92: public void processDisconnectMessage(CamelKapuaMessage<KapuaDisconnectMessage> disconnectMessage) {
93: try {
94: deviceLifeCycleService.death(disconnectMessage.getConnectionId(), disconnectMessage.getMessage());
95: metricDeviceDisconnectMessage.inc();
96: } catch (KapuaException e) {
97: metricDeviceErrorMessage.inc();
98: LOG.error("Error while processing device disconnect life-cycle event", e);
99: }
100: }
101:
102: /**
103: * Process an application message.
104: *
105: * @param appsMessage The apps message to process.
106: * @since 1.0.0
107: */
108: public void processAppsMessage(CamelKapuaMessage<KapuaAppsMessage> appsMessage) {
109: try {
110: deviceLifeCycleService.applications(appsMessage.getConnectionId(), appsMessage.getMessage());
111: metricDeviceAppsMessage.inc();
112: } catch (KapuaException e) {
113: metricDeviceErrorMessage.inc();
114: LOG.error("Error while processing device apps life-cycle event", e);
115: }
116: }
117:
118: /**
119: * Process a missing message.
120: *
121: * @param missingMessage The missing message to process.
122: * @since 1.0.0
123: */
124: public void processMissingMessage(CamelKapuaMessage<KapuaMissingMessage> missingMessage) {
125: try {
126: deviceLifeCycleService.missing(missingMessage.getConnectionId(), missingMessage.getMessage());
127: metricDeviceMissingMessage.inc();
128: } catch (KapuaException e) {
129: metricDeviceErrorMessage.inc();
130: LOG.error("Error while processing device missing life-cycle event", e);
131: }
132: }
133: }