Skip to content

Package: MetricInfoRegistryServiceImpl

MetricInfoRegistryServiceImpl

nameinstructionbranchcomplexitylinemethod
MetricInfoRegistryServiceImpl()
M: 59 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 12 C: 0
0%
M: 1 C: 0
0%
checkDataAccess(KapuaId, Actions)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
count(MetricInfoQuery)
M: 33 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
delete(KapuaId, StorableId)
M: 32 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
delete(MetricInfoQuery)
M: 34 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, StorableId)
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, StorableId, StorableFetchStyle)
M: 38 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
isServiceEnabled(KapuaId)
M: 9 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
query(MetricInfoQuery)
M: 58 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 13 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 11 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
updateLastPublishedFields(MetricInfo)
M: 132 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 28 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: * Red Hat Inc
13: *******************************************************************************/
14: package org.eclipse.kapua.service.datastore.internal;
15:
16: import org.eclipse.kapua.KapuaException;
17: import org.eclipse.kapua.commons.service.internal.AbstractKapuaService;
18: import org.eclipse.kapua.commons.service.internal.KapuaServiceDisabledException;
19: import org.eclipse.kapua.commons.util.ArgumentValidator;
20: import org.eclipse.kapua.locator.KapuaLocator;
21: import org.eclipse.kapua.locator.KapuaProvider;
22: import org.eclipse.kapua.model.domain.Actions;
23: import org.eclipse.kapua.model.id.KapuaId;
24: import org.eclipse.kapua.service.account.AccountService;
25: import org.eclipse.kapua.service.authorization.AuthorizationService;
26: import org.eclipse.kapua.service.authorization.permission.Permission;
27: import org.eclipse.kapua.service.authorization.permission.PermissionFactory;
28: import org.eclipse.kapua.service.datastore.DatastoreDomains;
29: import org.eclipse.kapua.service.datastore.MessageStoreService;
30: import org.eclipse.kapua.service.datastore.MetricInfoRegistryService;
31: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreMediator;
32: import org.eclipse.kapua.service.datastore.internal.mediator.MessageField;
33: import org.eclipse.kapua.service.datastore.internal.mediator.MetricInfoField;
34: import org.eclipse.kapua.service.datastore.internal.model.query.MessageQueryImpl;
35: import org.eclipse.kapua.service.datastore.internal.schema.MessageSchema;
36: import org.eclipse.kapua.service.datastore.internal.setting.DatastoreSettings;
37: import org.eclipse.kapua.service.datastore.internal.setting.DatastoreSettingsKey;
38: import org.eclipse.kapua.service.datastore.model.MessageListResult;
39: import org.eclipse.kapua.service.datastore.model.MetricInfo;
40: import org.eclipse.kapua.service.datastore.model.MetricInfoListResult;
41: import org.eclipse.kapua.service.datastore.model.query.MessageQuery;
42: import org.eclipse.kapua.service.datastore.model.query.MetricInfoQuery;
43: import org.eclipse.kapua.service.datastore.model.query.predicate.DatastorePredicateFactory;
44: import org.eclipse.kapua.service.elasticsearch.client.exception.ClientInitializationException;
45: import org.eclipse.kapua.service.storable.model.id.StorableId;
46: import org.eclipse.kapua.service.storable.model.query.SortField;
47: import org.eclipse.kapua.service.storable.model.query.StorableFetchStyle;
48: import org.eclipse.kapua.service.storable.model.query.predicate.AndPredicate;
49: import org.eclipse.kapua.service.storable.model.query.predicate.ExistsPredicate;
50: import org.eclipse.kapua.service.storable.model.query.predicate.RangePredicate;
51: import org.eclipse.kapua.service.storable.model.query.predicate.StorablePredicateFactory;
52: import org.eclipse.kapua.service.storable.model.query.predicate.TermPredicate;
53: import org.slf4j.Logger;
54: import org.slf4j.LoggerFactory;
55:
56: import java.util.ArrayList;
57: import java.util.Date;
58: import java.util.List;
59:
60: /**
61: * Metric information registry implementation.
62: *
63: * @since 1.0.0
64: */
65: @KapuaProvider
66: public class MetricInfoRegistryServiceImpl extends AbstractKapuaService implements MetricInfoRegistryService {
67:
68: private static final Logger LOG = LoggerFactory.getLogger(MetricInfoRegistryServiceImpl.class);
69:
70: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
71: private static final StorablePredicateFactory STORABLE_PREDICATE_FACTORY = LOCATOR.getFactory(StorablePredicateFactory.class);
72:
73:
74: private final AccountService accountService;
75: private final AuthorizationService authorizationService;
76: private final PermissionFactory permissionFactory;
77: private final MetricInfoRegistryFacade metricInfoRegistryFacade;
78: private final MessageStoreService messageStoreService;
79: private final DatastorePredicateFactory datastorePredicateFactory;
80:
81: private static final String QUERY = "query";
82: private static final String QUERY_SCOPE_ID = "query.scopeId";
83:
84: /**
85: * Default constructor
86: *
87: * @throws ClientInitializationException
88: */
89: public MetricInfoRegistryServiceImpl() throws ClientInitializationException {
90: super(DatastoreEntityManagerFactory.getInstance());
91:
92: KapuaLocator locator = KapuaLocator.getInstance();
93: accountService = locator.getService(AccountService.class);
94: authorizationService = locator.getService(AuthorizationService.class);
95: permissionFactory = locator.getFactory(PermissionFactory.class);
96: messageStoreService = locator.getService(MessageStoreService.class);
97: datastorePredicateFactory = KapuaLocator.getInstance().getFactory(DatastorePredicateFactory.class);
98:
99: MessageStoreService messageStoreService = KapuaLocator.getInstance().getService(MessageStoreService.class);
100: ConfigurationProviderImpl configurationProvider = new ConfigurationProviderImpl(messageStoreService, accountService);
101: metricInfoRegistryFacade = new MetricInfoRegistryFacade(configurationProvider, DatastoreMediator.getInstance());
102: DatastoreMediator.getInstance().setMetricInfoStoreFacade(metricInfoRegistryFacade);
103: }
104:
105: @Override
106: public MetricInfo find(KapuaId scopeId, StorableId id) throws KapuaException {
107: return find(scopeId, id, StorableFetchStyle.SOURCE_FULL);
108: }
109:
110: @Override
111: public MetricInfo find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle) throws KapuaException {
112:• if (!isServiceEnabled(scopeId)) {
113: throw new KapuaServiceDisabledException(this.getClass().getName());
114: }
115:
116: ArgumentValidator.notNull(scopeId, "scopeId");
117: ArgumentValidator.notNull(id, "id");
118:
119: checkDataAccess(scopeId, Actions.read);
120: try {
121: // populate the lastMessageTimestamp
122: MetricInfo metricInfo = metricInfoRegistryFacade.find(scopeId, id);
123:• if (metricInfo != null) {
124:
125: updateLastPublishedFields(metricInfo);
126: }
127: return metricInfo;
128: } catch (Exception e) {
129: throw KapuaException.internalError(e);
130: }
131: }
132:
133: @Override
134: public MetricInfoListResult query(MetricInfoQuery query)
135: throws KapuaException {
136:• if (!isServiceEnabled(query.getScopeId())) {
137: throw new KapuaServiceDisabledException(this.getClass().getName());
138: }
139:
140: ArgumentValidator.notNull(query, QUERY);
141: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
142:
143: checkDataAccess(query.getScopeId(), Actions.read);
144: try {
145: MetricInfoListResult result = metricInfoRegistryFacade.query(query);
146:• if (result != null && query.getFetchAttributes().contains(MetricInfoField.TIMESTAMP_FULL.field())) {
147: // populate the lastMessageTimestamp
148:• for (MetricInfo metricInfo : result.getItems()) {
149: updateLastPublishedFields(metricInfo);
150: }
151: }
152: return result;
153: } catch (Exception e) {
154: throw KapuaException.internalError(e);
155: }
156: }
157:
158: @Override
159: public long count(MetricInfoQuery query)
160: throws KapuaException {
161:• if (!isServiceEnabled(query.getScopeId())) {
162: throw new KapuaServiceDisabledException(this.getClass().getName());
163: }
164:
165: ArgumentValidator.notNull(query, QUERY);
166: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
167:
168: checkDataAccess(query.getScopeId(), Actions.read);
169: try {
170: return metricInfoRegistryFacade.count(query);
171: } catch (Exception e) {
172: throw KapuaException.internalError(e);
173: }
174: }
175:
176: void delete(MetricInfoQuery query)
177: throws KapuaException {
178:• if (!isServiceEnabled(query.getScopeId())) {
179: throw new KapuaServiceDisabledException(this.getClass().getName());
180: }
181:
182: ArgumentValidator.notNull(query, QUERY);
183: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
184:
185: checkDataAccess(query.getScopeId(), Actions.delete);
186: try {
187: metricInfoRegistryFacade.delete(query);
188: } catch (Exception e) {
189: throw KapuaException.internalError(e);
190: }
191: }
192:
193: void delete(KapuaId scopeId, StorableId id)
194: throws KapuaException {
195:• if (!isServiceEnabled(scopeId)) {
196: throw new KapuaServiceDisabledException(this.getClass().getName());
197: }
198:
199: ArgumentValidator.notNull(scopeId, "scopeId");
200: ArgumentValidator.notNull(id, "id");
201:
202: checkDataAccess(scopeId, Actions.delete);
203: try {
204: metricInfoRegistryFacade.delete(scopeId, id);
205: } catch (Exception e) {
206: throw KapuaException.internalError(e);
207: }
208: }
209:
210: private void checkDataAccess(KapuaId scopeId, Actions action)
211: throws KapuaException {
212: Permission permission = permissionFactory.newPermission(DatastoreDomains.DATASTORE_DOMAIN, action, scopeId);
213: authorizationService.checkPermission(permission);
214: }
215:
216: /**
217: * Update the last published date and last published message identifier for the specified metric info, so it gets the timestamp and the message identifier of the last published message for the
218: * account/clientId in the metric info
219: *
220: * @throws KapuaException
221: */
222: private void updateLastPublishedFields(MetricInfo metricInfo) throws KapuaException {
223: List<SortField> sort = new ArrayList<>();
224: sort.add(SortField.descending(MessageSchema.MESSAGE_TIMESTAMP));
225:
226: MessageQuery messageQuery = new MessageQueryImpl(metricInfo.getScopeId());
227: messageQuery.setAskTotalCount(true);
228: messageQuery.setFetchStyle(StorableFetchStyle.FIELDS);
229: messageQuery.setLimit(1);
230: messageQuery.setOffset(0);
231: messageQuery.setSortFields(sort);
232:
233: RangePredicate messageIdPredicate = STORABLE_PREDICATE_FACTORY.newRangePredicate(MetricInfoField.TIMESTAMP, metricInfo.getFirstMessageOn(), null);
234: TermPredicate clientIdPredicate = datastorePredicateFactory.newTermPredicate(MessageField.CLIENT_ID, metricInfo.getClientId());
235: ExistsPredicate metricPredicate = STORABLE_PREDICATE_FACTORY.newExistsPredicate(MessageField.METRICS.field(), metricInfo.getName());
236:
237: AndPredicate andPredicate = STORABLE_PREDICATE_FACTORY.newAndPredicate();
238: andPredicate.getPredicates().add(messageIdPredicate);
239: andPredicate.getPredicates().add(clientIdPredicate);
240: andPredicate.getPredicates().add(metricPredicate);
241: messageQuery.setPredicate(andPredicate);
242:
243: MessageListResult messageList = messageStoreService.query(messageQuery);
244:
245: StorableId lastPublishedMessageId = null;
246: Date lastPublishedMessageTimestamp = null;
247:• if (messageList.getSize() == 1) {
248: lastPublishedMessageId = messageList.getFirstItem().getDatastoreId();
249: lastPublishedMessageTimestamp = messageList.getFirstItem().getTimestamp();
250:• } else if (messageList.isEmpty()) {
251: // this condition could happens due to the ttl of the messages (so if it happens, it does not necessarily mean there has been an error!)
252: LOG.warn("Cannot find last timestamp for the specified client id '{}' - account '{}'", metricInfo.getClientId(), metricInfo.getScopeId());
253: } else {
254: // this condition shouldn't never happens since the query has a limit 1
255: // if happens it means than an elasticsearch internal error happens and/or our driver didn't set it correctly!
256: LOG.error("Cannot find last timestamp for the specified client id '{}' - account '{}'. More than one result returned by the query!", metricInfo.getClientId(), metricInfo.getScopeId());
257: }
258:
259: metricInfo.setLastMessageId(lastPublishedMessageId);
260: metricInfo.setLastMessageOn(lastPublishedMessageTimestamp);
261: }
262:
263: @Override
264: public boolean isServiceEnabled(KapuaId scopeId) {
265:• return !DatastoreSettings.getInstance().getBoolean(DatastoreSettingsKey.DISABLE_DATASTORE, false);
266: }
267:
268: }