Skip to content

Package: MessageStoreServiceImpl

MessageStoreServiceImpl

nameinstructionbranchcomplexitylinemethod
MessageStoreServiceImpl()
M: 285 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 18 C: 0
0%
M: 1 C: 0
0%
checkDataAccess(KapuaId, Actions)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
count(MessageQuery)
M: 22 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
delete(KapuaId, StorableId)
M: 23 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
delete(MessageQuery)
M: 33 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, StorableId)
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, StorableId, StorableFetchStyle)
M: 23 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
isServiceEnabled(KapuaId)
M: 9 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
logException(Exception)
M: 8 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
query(MessageQuery)
M: 43 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 16 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
store(KapuaMessage)
M: 76 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 23 C: 0
0%
M: 1 C: 0
0%
store(KapuaMessage, String)
M: 76 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 23 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.service.datastore.internal;
14:
15: import com.codahale.metrics.Counter;
16: import com.codahale.metrics.Timer;
17: import com.codahale.metrics.Timer.Context;
18: import org.eclipse.kapua.KapuaErrorCodes;
19: import org.eclipse.kapua.KapuaException;
20: import org.eclipse.kapua.KapuaIllegalArgumentException;
21: import org.eclipse.kapua.commons.configuration.AbstractKapuaConfigurableService;
22: import org.eclipse.kapua.commons.metric.MetricServiceFactory;
23: import org.eclipse.kapua.commons.metric.MetricsService;
24: import org.eclipse.kapua.commons.util.ArgumentValidator;
25: import org.eclipse.kapua.locator.KapuaLocator;
26: import org.eclipse.kapua.locator.KapuaProvider;
27: import org.eclipse.kapua.message.KapuaMessage;
28: import org.eclipse.kapua.model.domain.Actions;
29: import org.eclipse.kapua.model.id.KapuaId;
30: import org.eclipse.kapua.service.account.AccountService;
31: import org.eclipse.kapua.service.authorization.AuthorizationService;
32: import org.eclipse.kapua.service.authorization.permission.Permission;
33: import org.eclipse.kapua.service.authorization.permission.PermissionFactory;
34: import org.eclipse.kapua.service.datastore.DatastoreDomains;
35: import org.eclipse.kapua.service.datastore.MessageStoreService;
36: import org.eclipse.kapua.service.datastore.internal.mediator.ConfigurationException;
37: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreCommunicationException;
38: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreException;
39: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreMediator;
40: import org.eclipse.kapua.service.datastore.internal.setting.DatastoreSettings;
41: import org.eclipse.kapua.service.datastore.internal.setting.DatastoreSettingsKey;
42: import org.eclipse.kapua.service.datastore.model.DatastoreMessage;
43: import org.eclipse.kapua.service.datastore.model.MessageListResult;
44: import org.eclipse.kapua.service.datastore.model.query.MessageQuery;
45: import org.eclipse.kapua.service.elasticsearch.client.exception.ClientCommunicationException;
46: import org.eclipse.kapua.service.storable.model.id.StorableId;
47: import org.eclipse.kapua.service.storable.model.query.StorableFetchStyle;
48: import org.slf4j.Logger;
49: import org.slf4j.LoggerFactory;
50:
51: import java.util.UUID;
52:
53: /**
54: * Message store service implementation.
55: *
56: * @since 1.0.0
57: */
58: @KapuaProvider
59: public class MessageStoreServiceImpl extends AbstractKapuaConfigurableService implements MessageStoreService {
60:
61: private static final Logger logger = LoggerFactory.getLogger(MessageStoreServiceImpl.class);
62:
63: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
64:
65: // metrics
66: private final Counter metricMessageCount;
67: private final Counter metricCommunicationErrorCount;
68: private final Counter metricConfigurationErrorCount;
69: private final Counter metricGenericErrorCount;
70: private final Counter metricValidationErrorCount;
71: // store timers
72: private final Timer metricDataSaveTime;
73: // queues counters
74: private final Counter metricQueueCommunicationErrorCount;
75: private final Counter metricQueueConfigurationErrorCount;
76: private final Counter metricQueueGenericErrorCount;
77:
78: protected final AccountService accountService = LOCATOR.getService(AccountService.class);
79: protected final AuthorizationService authorizationService = LOCATOR.getService(AuthorizationService.class);
80: protected final PermissionFactory permissionFactory = LOCATOR.getFactory(PermissionFactory.class);
81: protected static final Integer MAX_ENTRIES_ON_DELETE = DatastoreSettings.getInstance().getInt(DatastoreSettingsKey.CONFIG_MAX_ENTRIES_ON_DELETE);
82: protected static final Integer MAX_LIMIT_VALUE = DatastoreSettings.getInstance().getInt(DatastoreSettingsKey.MAX_LIMIT_VALUE);
83:
84: protected final MessageStoreFacade messageStoreFacade;
85:
86: /**
87: * Constructor.
88: * <p>
89: * Initializes {@link ConfigurationProvider} and {@link MetricsService}
90: *
91: * @since 1.0.0
92: */
93: public MessageStoreServiceImpl() {
94: super(MessageStoreService.class.getName(), DatastoreDomains.DATASTORE_DOMAIN, DatastoreEntityManagerFactory.getInstance());
95: ConfigurationProviderImpl configurationProvider = new ConfigurationProviderImpl(this, accountService);
96: messageStoreFacade = new MessageStoreFacade(configurationProvider, DatastoreMediator.getInstance());
97: DatastoreMediator.getInstance().setMessageStoreFacade(messageStoreFacade);
98:
99: // data message
100: MetricsService metricService = MetricServiceFactory.getInstance();
101: metricMessageCount = metricService.getCounter(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_MESSAGES, DataStoreDriverMetrics.METRIC_COUNT);
102: metricCommunicationErrorCount = metricService.getCounter(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_MESSAGES, DataStoreDriverMetrics.METRIC_COMMUNICATION, DataStoreDriverMetrics.METRIC_ERROR, DataStoreDriverMetrics.METRIC_COUNT);
103: metricConfigurationErrorCount = metricService.getCounter(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_MESSAGES, DataStoreDriverMetrics.METRIC_CONFIGURATION, DataStoreDriverMetrics.METRIC_ERROR, DataStoreDriverMetrics.METRIC_COUNT);
104: metricGenericErrorCount = metricService.getCounter(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_MESSAGES, DataStoreDriverMetrics.METRIC_GENERIC, DataStoreDriverMetrics.METRIC_ERROR, DataStoreDriverMetrics.METRIC_COUNT);
105: metricValidationErrorCount = metricService.getCounter(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_MESSAGES, DataStoreDriverMetrics.METRIC_VALIDATION, DataStoreDriverMetrics.METRIC_ERROR, DataStoreDriverMetrics.METRIC_COUNT);
106:
107: // error messages queues size
108: metricQueueCommunicationErrorCount = metricService.getCounter(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_QUEUE, DataStoreDriverMetrics.METRIC_COMMUNICATION, DataStoreDriverMetrics.METRIC_ERROR, DataStoreDriverMetrics.METRIC_COUNT);
109: metricQueueConfigurationErrorCount = metricService.getCounter(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_QUEUE, DataStoreDriverMetrics.METRIC_CONFIGURATION, DataStoreDriverMetrics.METRIC_ERROR, DataStoreDriverMetrics.METRIC_COUNT);
110: metricQueueGenericErrorCount = metricService.getCounter(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_QUEUE, DataStoreDriverMetrics.METRIC_GENERIC, DataStoreDriverMetrics.METRIC_ERROR, DataStoreDriverMetrics.METRIC_COUNT);
111:
112: // store timers
113: metricDataSaveTime = metricService.getTimer(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_MESSAGES, DataStoreDriverMetrics.METRIC_TIME, DataStoreDriverMetrics.METRIC_S);
114: }
115:
116: @Override
117: public StorableId store(KapuaMessage<?, ?> message)
118: throws KapuaException {
119: String datastoreId = UUID.randomUUID().toString();
120: Context metricDataSaveTimeContext = metricDataSaveTime.time();
121: try {
122: checkDataAccess(message.getScopeId(), Actions.write);
123: metricMessageCount.inc();
124: return messageStoreFacade.store(message, datastoreId, true);
125: } catch (ConfigurationException e) {
126: metricConfigurationErrorCount.inc();
127: metricQueueConfigurationErrorCount.inc();
128: throw e;
129: } catch (KapuaIllegalArgumentException e) {
130: metricValidationErrorCount.inc();
131: metricQueueGenericErrorCount.inc();
132: throw e;
133: } catch (ClientCommunicationException e) {
134: metricCommunicationErrorCount.inc();
135: metricQueueCommunicationErrorCount.inc();
136: throw new DatastoreCommunicationException(datastoreId, e);
137: } catch (Exception e) {
138: metricGenericErrorCount.inc();
139: metricQueueGenericErrorCount.inc();
140: logException(e);
141: throw new DatastoreException(KapuaErrorCodes.INTERNAL_ERROR, e, e.getMessage());
142: } finally {
143: metricDataSaveTimeContext.stop();
144: }
145: }
146:
147: @Override
148: public StorableId store(KapuaMessage<?, ?> message, String datastoreId)
149: throws KapuaException {
150: ArgumentValidator.notEmptyOrNull(datastoreId, "datastoreId");
151: Context metricDataSaveTimeContext = metricDataSaveTime.time();
152: try {
153: checkDataAccess(message.getScopeId(), Actions.write);
154: metricMessageCount.inc();
155: return messageStoreFacade.store(message, datastoreId, false);
156: } catch (ConfigurationException e) {
157: metricConfigurationErrorCount.inc();
158: metricQueueConfigurationErrorCount.inc();
159: throw e;
160: } catch (KapuaIllegalArgumentException e) {
161: metricValidationErrorCount.inc();
162: metricQueueGenericErrorCount.inc();
163: throw e;
164: } catch (ClientCommunicationException e) {
165: metricCommunicationErrorCount.inc();
166: metricQueueCommunicationErrorCount.inc();
167: throw new DatastoreCommunicationException(datastoreId, e);
168: } catch (Exception e) {
169: metricGenericErrorCount.inc();
170: metricQueueGenericErrorCount.inc();
171: logException(e);
172: throw new DatastoreException(KapuaErrorCodes.INTERNAL_ERROR, e, e.getMessage());
173: } finally {
174: metricDataSaveTimeContext.stop();
175: }
176: }
177:
178: @Override
179: public DatastoreMessage find(KapuaId scopeId, StorableId id) throws KapuaException {
180: return find(scopeId, id, StorableFetchStyle.SOURCE_FULL);
181: }
182:
183: @Override
184: public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle) throws KapuaException {
185: checkDataAccess(scopeId, Actions.read);
186: try {
187: return messageStoreFacade.find(scopeId, id, fetchStyle);
188: } catch (Exception e) {
189: logException(e);
190: throw new DatastoreException(KapuaErrorCodes.INTERNAL_ERROR, e, e.getMessage());
191: }
192: }
193:
194: @Override
195: public MessageListResult query(MessageQuery query)
196: throws KapuaException {
197: checkDataAccess(query.getScopeId(), Actions.read);
198:• if (query.getLimit() != null) {
199: ArgumentValidator.numRange(query.getLimit(), 0, MAX_LIMIT_VALUE, "limit");
200: }
201: try {
202: return messageStoreFacade.query(query);
203: } catch (Exception e) {
204: logException(e);
205: throw new DatastoreException(
206: KapuaErrorCodes.INTERNAL_ERROR,
207: e,
208:• e.getCause().getMessage() != null ? e.getCause().getMessage() : e.getMessage()
209: );
210: }
211: }
212:
213: @Override
214: public long count(MessageQuery query)
215: throws KapuaException {
216: checkDataAccess(query.getScopeId(), Actions.read);
217: try {
218: return messageStoreFacade.count(query);
219: } catch (Exception e) {
220: logException(e);
221: throw new DatastoreException(KapuaErrorCodes.INTERNAL_ERROR, e, e.getMessage());
222: }
223: }
224:
225: @Override
226: public void delete(KapuaId scopeId, StorableId id)
227: throws KapuaException {
228: checkDataAccess(scopeId, Actions.delete);
229: try {
230: messageStoreFacade.delete(scopeId, id);
231: } catch (Exception e) {
232: logException(e);
233: throw new DatastoreException(KapuaErrorCodes.INTERNAL_ERROR, e, e.getMessage());
234: }
235: }
236:
237: @Override
238: public void delete(MessageQuery query)
239: throws KapuaException {
240: ArgumentValidator.numRange(query.getLimit(), 0, MAX_ENTRIES_ON_DELETE, "limit");
241:
242: checkDataAccess(query.getScopeId(), Actions.delete);
243: try {
244: messageStoreFacade.delete(query);
245: } catch (Exception e) {
246: logException(e);
247: throw new DatastoreException(KapuaErrorCodes.INTERNAL_ERROR, e, e.getMessage());
248: }
249: }
250:
251: @Override
252: public boolean isServiceEnabled(KapuaId scopeId) {
253:• return !DatastoreSettings.getInstance().getBoolean(DatastoreSettingsKey.DISABLE_DATASTORE, false);
254: }
255:
256: protected void checkDataAccess(KapuaId scopeId, Actions action)
257: throws KapuaException {
258: Permission permission = permissionFactory.newPermission(DatastoreDomains.DATASTORE_DOMAIN, action, scopeId);
259: authorizationService.checkPermission(permission);
260: }
261:
262: private void logException(Exception e) {
263:• if (e instanceof RuntimeException) {
264: logger.debug("", e);
265: }
266: }
267: }