Skip to content

Package: DeviceManagementNotificationMessageProcessor

DeviceManagementNotificationMessageProcessor

nameinstructionbranchcomplexitylinemethod
DeviceManagementNotificationMessageProcessor()
M: 78 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%
processCommunicationErrorMessage(Exchange, CamelKapuaMessage)
M: 15 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
processConfigurationErrorMessage(Exchange, CamelKapuaMessage)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
processGenericErrorMessage(Exchange, CamelKapuaMessage)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
processMessage(CamelKapuaMessage)
M: 82 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 23 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 14 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2018, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.broker.core.listener;
14:
15: import com.codahale.metrics.Counter;
16: import com.google.common.base.MoreObjects;
17: import org.apache.camel.Exchange;
18: import org.apache.camel.spi.UriEndpoint;
19: import org.eclipse.kapua.KapuaException;
20: import org.eclipse.kapua.broker.core.message.CamelKapuaMessage;
21: import org.eclipse.kapua.commons.metric.MetricServiceFactory;
22: import org.eclipse.kapua.commons.metric.MetricsService;
23: import org.eclipse.kapua.locator.KapuaLocator;
24: import org.eclipse.kapua.service.device.management.job.manager.JobDeviceManagementOperationManagerService;
25: import org.eclipse.kapua.service.device.management.message.notification.KapuaNotifyChannel;
26: import org.eclipse.kapua.service.device.management.message.notification.KapuaNotifyMessage;
27: import org.eclipse.kapua.service.device.management.message.notification.KapuaNotifyPayload;
28: import org.eclipse.kapua.service.device.management.registry.manager.DeviceManagementRegistryManagerService;
29: import org.slf4j.Logger;
30: import org.slf4j.LoggerFactory;
31:
32: /**
33: * {@link DeviceManagementNotificationMessageProcessor}
34: *
35: * @since 1.0.0
36: */
37: @UriEndpoint(title = "Device management notification storage message processor", syntax = "bean:deviceManagementNotificationMessageProcessor", scheme = "bean")
38: public class DeviceManagementNotificationMessageProcessor extends AbstractProcessor<CamelKapuaMessage<?>> {
39:
40: private static final Logger LOG = LoggerFactory.getLogger(DeviceManagementNotificationMessageProcessor.class);
41:
42: private static final DeviceManagementRegistryManagerService DEVICE_MANAGEMENT_REGISTRY_MANAGER_SERVICE = KapuaLocator.getInstance().getService(DeviceManagementRegistryManagerService.class);
43: private static final JobDeviceManagementOperationManagerService JOB_DEVICE_MANAGEMENT_OPERATION_MANAGER_SERVICE = KapuaLocator.getInstance().getService(JobDeviceManagementOperationManagerService.class);
44:
45: // queues counters
46: private final Counter metricQueueCommunicationErrorCount;
47: private final Counter metricQueueConfigurationErrorCount;
48: private final Counter metricQueueGenericErrorCount;
49:
50: public DeviceManagementNotificationMessageProcessor() {
51: super("Device Management Notify Processor");
52: MetricsService metricService = MetricServiceFactory.getInstance();
53:
54: metricQueueCommunicationErrorCount = metricService.getCounter(DeviceManagementRegistryNotificationMetrics.METRIC_MODULE_NAME, DeviceManagementRegistryNotificationMetrics.METRIC_COMPONENT_NOTIFICATION, DeviceManagementRegistryNotificationMetrics.METRIC_PROCESS_QUEUE, DeviceManagementRegistryNotificationMetrics.METRIC_COMMUNICATION, DeviceManagementRegistryNotificationMetrics.METRIC_ERROR, DeviceManagementRegistryNotificationMetrics.METRIC_COUNT);
55: metricQueueConfigurationErrorCount = metricService.getCounter(DeviceManagementRegistryNotificationMetrics.METRIC_MODULE_NAME, DeviceManagementRegistryNotificationMetrics.METRIC_COMPONENT_NOTIFICATION, DeviceManagementRegistryNotificationMetrics.METRIC_PROCESS_QUEUE, DeviceManagementRegistryNotificationMetrics.METRIC_CONFIGURATION, DeviceManagementRegistryNotificationMetrics.METRIC_ERROR, DeviceManagementRegistryNotificationMetrics.METRIC_COUNT);
56: metricQueueGenericErrorCount = metricService.getCounter(DeviceManagementRegistryNotificationMetrics.METRIC_MODULE_NAME, DeviceManagementRegistryNotificationMetrics.METRIC_COMPONENT_NOTIFICATION, DeviceManagementRegistryNotificationMetrics.METRIC_PROCESS_QUEUE, DeviceManagementRegistryNotificationMetrics.METRIC_GENERIC, DeviceManagementRegistryNotificationMetrics.METRIC_ERROR, DeviceManagementRegistryNotificationMetrics.METRIC_COUNT);
57: }
58:
59: /**
60: * Process a device management {@link KapuaNotifyMessage}.
61: *
62: * @throws KapuaException
63: */
64: @Override
65: public void processMessage(CamelKapuaMessage<?> message) throws KapuaException {
66: LOG.debug("Received notification message from device channel: client id '{}' - {}", message.getMessage().getClientId(), message.getMessage().getChannel());
67:
68: KapuaNotifyMessage notifyMessage = (KapuaNotifyMessage) message.getMessage();
69: KapuaNotifyPayload notifyPayload = notifyMessage.getPayload();
70: KapuaNotifyChannel notifyChannel = notifyMessage.getChannel();
71:
72: try {
73: DEVICE_MANAGEMENT_REGISTRY_MANAGER_SERVICE.processOperationNotification(
74: notifyMessage.getScopeId(),
75: notifyPayload.getOperationId(),
76: MoreObjects.firstNonNull(notifyMessage.getSentOn(), notifyMessage.getReceivedOn()),
77:• notifyPayload.getResource() != null ? notifyPayload.getResource() : notifyChannel.getResources()[0],
78: notifyPayload.getStatus(),
79: notifyPayload.getProgress(),
80: notifyPayload.getMessage());
81:
82: JOB_DEVICE_MANAGEMENT_OPERATION_MANAGER_SERVICE.processJobTargetOnNotification(
83: notifyMessage.getScopeId(),
84: notifyPayload.getOperationId(),
85: MoreObjects.firstNonNull(notifyMessage.getSentOn(), notifyMessage.getReceivedOn()),
86:• notifyPayload.getResource() != null ? notifyPayload.getResource() : notifyChannel.getResources()[0],
87: notifyPayload.getStatus());
88: } catch (Exception e) {
89: LOG.error("Error while processing Device Management Operation Notification message!", e);
90: throw e;
91: }
92: }
93:
94: public void processCommunicationErrorMessage(Exchange exchange, CamelKapuaMessage<?> message) throws KapuaException {
95: LOG.info("Message datastoreId: '{}' - Message Id: '{}'", message.getDatastoreId(), message.getMessage().getId());
96: processMessage(message);
97: metricQueueCommunicationErrorCount.dec();
98: }
99:
100: public void processConfigurationErrorMessage(Exchange exchange, CamelKapuaMessage<?> message) throws KapuaException {
101: LOG.info("Message datastoreId: '{}' - Message Id '{}'", message.getDatastoreId(), message.getMessage().getId());
102: metricQueueConfigurationErrorCount.dec();
103: }
104:
105: public void processGenericErrorMessage(Exchange exchange, CamelKapuaMessage<?> message) throws KapuaException {
106: LOG.info("Message datastoreId: '{}' - Message Id '{}'", message.getDatastoreId(), message.getMessage().getId());
107: metricQueueGenericErrorCount.dec();
108: }
109:
110: }