Skip to content

Package: DataStorageMessageProcessor

DataStorageMessageProcessor

nameinstructionbranchcomplexitylinemethod
DataStorageMessageProcessor()
M: 102 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
processCommunicationErrorMessage(Exchange, CamelKapuaMessage)
M: 15 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
processConfigurationErrorMessage(Exchange, CamelKapuaMessage)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
processGenericErrorMessage(Exchange, CamelKapuaMessage)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
processMessage(CamelKapuaMessage)
M: 65 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 14 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: * Red Hat Inc
13: *******************************************************************************/
14: package org.eclipse.kapua.broker.core.listener;
15:
16: import com.codahale.metrics.Counter;
17: import org.apache.camel.Exchange;
18: import org.apache.camel.spi.UriEndpoint;
19: import org.eclipse.kapua.KapuaException;
20: import org.eclipse.kapua.broker.core.message.CamelKapuaMessage;
21: import org.eclipse.kapua.commons.metric.MetricServiceFactory;
22: import org.eclipse.kapua.commons.metric.MetricsService;
23: import org.eclipse.kapua.locator.KapuaLocator;
24: import org.eclipse.kapua.message.device.data.KapuaDataMessage;
25: import org.eclipse.kapua.model.id.KapuaId;
26: import org.eclipse.kapua.service.datastore.MessageStoreService;
27: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreCommunicationException;
28: import org.eclipse.kapua.service.device.management.asset.store.DeviceAssetStoreService;
29: import org.slf4j.Logger;
30: import org.slf4j.LoggerFactory;
31:
32: /**
33: * Data storage message listener
34: *
35: * @since 1.0
36: */
37: @UriEndpoint(title = "Data storage message processor", syntax = "bean:dataMessageProcessor", scheme = "bean")
38: public class DataStorageMessageProcessor extends AbstractProcessor<CamelKapuaMessage<?>> {
39:
40: private static final Logger LOG = LoggerFactory.getLogger(DataStorageMessageProcessor.class);
41:
42: private final MessageStoreService messageStoreService = KapuaLocator.getInstance().getService(MessageStoreService.class);
43:
44: private final DeviceAssetStoreService deviceAssetStoreService = KapuaLocator.getInstance().getService(DeviceAssetStoreService.class);
45:
46: // queues counters
47: private final Counter metricQueueCommunicationErrorCount;
48: private final Counter metricQueueConfigurationErrorCount;
49: private final Counter metricQueueGenericErrorCount;
50:
51: public DataStorageMessageProcessor() {
52: super("DataStorage");
53: MetricsService metricService = MetricServiceFactory.getInstance();
54:
55: metricQueueCommunicationErrorCount = metricService.getCounter(DataStoreMetrics.METRIC_MODULE_NAME, DataStoreMetrics.METRIC_COMPONENT_NAME, DataStoreMetrics.METRIC_STORE, DataStoreMetrics.METRIC_QUEUE, DataStoreMetrics.METRIC_COMMUNICATION, DataStoreMetrics.METRIC_ERROR, DataStoreMetrics.METRIC_COUNT);
56: metricQueueConfigurationErrorCount = metricService.getCounter(DataStoreMetrics.METRIC_MODULE_NAME, DataStoreMetrics.METRIC_COMPONENT_NAME, DataStoreMetrics.METRIC_STORE, DataStoreMetrics.METRIC_QUEUE, DataStoreMetrics.METRIC_CONFIGURATION, DataStoreMetrics.METRIC_ERROR, DataStoreMetrics.METRIC_COUNT);
57: metricQueueGenericErrorCount = metricService.getCounter(DataStoreMetrics.METRIC_MODULE_NAME, DataStoreMetrics.METRIC_COMPONENT_NAME, DataStoreMetrics.METRIC_STORE, DataStoreMetrics.METRIC_QUEUE, DataStoreMetrics.METRIC_GENERIC, DataStoreMetrics.METRIC_ERROR, DataStoreMetrics.METRIC_COUNT);
58: }
59:
60: /**
61: * Process a data message.
62: *
63: * @throws KapuaException
64: */
65: @Override
66: public void processMessage(CamelKapuaMessage<?> message) throws KapuaException {
67: // data messages
68: LOG.debug("Received data message from device channel: client id '{}' - {}", message.getMessage().getClientId(), message.getMessage().getChannel());
69: try {
70: messageStoreService.store(message.getMessage(), message.getDatastoreId());
71: } catch (DatastoreCommunicationException e) {
72: message.setDatastoreId(e.getUuid());
73: throw e;
74: }
75:
76: // Update asset values in AssetStoreService
77:• if (message.getMessage().getChannel().toString().startsWith("W1/A1")) {
78: KapuaId scopeId = message.getMessage().getScopeId();
79: KapuaId deviceId = message.getMessage().getDeviceId();
80:• if (deviceAssetStoreService.isServiceEnabled(scopeId) && deviceAssetStoreService.isApplicationEnabled(scopeId,deviceId)) {
81: deviceAssetStoreService.storeAssetValues((KapuaDataMessage) message.getMessage());
82: }
83: } else {
84: LOG.debug("This message did not matched W1/A1. Channel was: {}", message.getMessage().getChannel());
85: }
86: }
87:
88: public void processCommunicationErrorMessage(Exchange exchange, CamelKapuaMessage<?> message) throws KapuaException {
89: LOG.info("Message datastoreId: '{}' - Message Id: '{}'", message.getDatastoreId(), message.getMessage().getId());
90: processMessage(message);
91: metricQueueCommunicationErrorCount.dec();
92: }
93:
94: public void processConfigurationErrorMessage(Exchange exchange, CamelKapuaMessage<?> message) throws KapuaException {
95: LOG.info("Message datastoreId: '{}' - Message Id '{}'", message.getDatastoreId(), message.getMessage().getId());
96: metricQueueConfigurationErrorCount.dec();
97: }
98:
99: public void processGenericErrorMessage(Exchange exchange, CamelKapuaMessage<?> message) throws KapuaException {
100: LOG.info("Message datastoreId: '{}' - Message Id '{}'", message.getDatastoreId(), message.getMessage().getId());
101: metricQueueGenericErrorCount.dec();
102: }
103:
104: }