Skip to content

Package: MetricInfoRegistryFacade

MetricInfoRegistryFacade

nameinstructionbranchcomplexitylinemethod
MetricInfoRegistryFacade(ConfigurationProvider, MetricInfoRegistryMediator)
M: 7 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
count(MetricInfoQuery)
M: 35 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
delete(KapuaId, StorableId)
M: 31 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
delete(MetricInfoQuery)
M: 34 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, StorableId)
M: 34 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
query(MetricInfoQuery)
M: 51 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 16 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
upstore(MetricInfo)
M: 100 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 16 C: 0
0%
M: 1 C: 0
0%
upstore(MetricInfo[])
M: 166 C: 0
0%
M: 18 C: 0
0%
M: 10 C: 0
0%
M: 36 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.service.datastore.internal;
14:
15: import org.eclipse.kapua.KapuaIllegalArgumentException;
16: import org.eclipse.kapua.commons.util.ArgumentValidator;
17: import org.eclipse.kapua.locator.KapuaLocator;
18: import org.eclipse.kapua.model.id.KapuaId;
19: import org.eclipse.kapua.service.datastore.internal.mediator.ConfigurationException;
20: import org.eclipse.kapua.service.datastore.internal.mediator.MetricInfoField;
21: import org.eclipse.kapua.service.datastore.internal.mediator.MetricInfoRegistryMediator;
22: import org.eclipse.kapua.service.datastore.internal.model.MetricInfoListResultImpl;
23: import org.eclipse.kapua.service.datastore.internal.model.query.MetricInfoQueryImpl;
24: import org.eclipse.kapua.service.datastore.internal.schema.Metadata;
25: import org.eclipse.kapua.service.datastore.internal.schema.MetricInfoSchema;
26: import org.eclipse.kapua.service.datastore.internal.schema.SchemaUtil;
27: import org.eclipse.kapua.service.datastore.model.MetricInfo;
28: import org.eclipse.kapua.service.datastore.model.MetricInfoListResult;
29: import org.eclipse.kapua.service.datastore.model.query.MetricInfoQuery;
30: import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
31: import org.eclipse.kapua.service.elasticsearch.client.model.BulkUpdateRequest;
32: import org.eclipse.kapua.service.elasticsearch.client.model.BulkUpdateResponse;
33: import org.eclipse.kapua.service.elasticsearch.client.model.ResultList;
34: import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
35: import org.eclipse.kapua.service.elasticsearch.client.model.UpdateRequest;
36: import org.eclipse.kapua.service.elasticsearch.client.model.UpdateResponse;
37: import org.eclipse.kapua.service.storable.exception.MappingException;
38: import org.eclipse.kapua.service.storable.model.id.StorableId;
39: import org.eclipse.kapua.service.storable.model.id.StorableIdFactory;
40: import org.eclipse.kapua.service.storable.model.query.predicate.IdsPredicate;
41: import org.eclipse.kapua.service.storable.model.query.predicate.StorablePredicateFactory;
42: import org.slf4j.Logger;
43: import org.slf4j.LoggerFactory;
44:
45: /**
46: * Metric information registry facade
47: *
48: * @since 1.0.0
49: */
50: public class MetricInfoRegistryFacade extends AbstractRegistryFacade {
51:
52: private static final Logger LOG = LoggerFactory.getLogger(MetricInfoRegistryFacade.class);
53:
54: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
55: private static final StorableIdFactory STORABLE_ID_FACTORY = LOCATOR.getFactory(StorableIdFactory.class);
56: private static final StorablePredicateFactory STORABLE_PREDICATE_FACTORY = LOCATOR.getFactory(StorablePredicateFactory.class);
57:
58: private final MetricInfoRegistryMediator mediator;
59:
60: private static final String QUERY = "query";
61: private static final String QUERY_SCOPE_ID = "query.scopeId";
62: private static final String STORAGE_NOT_ENABLED = "Storage not enabled for account {}, returning empty result";
63:
64: /**
65: * Constructs the metric info registry facade
66: *
67: * @param configProvider
68: * @param mediator
69: * @since 1.0.0
70: */
71: public MetricInfoRegistryFacade(ConfigurationProvider configProvider, MetricInfoRegistryMediator mediator) {
72: super(configProvider);
73:
74: this.mediator = mediator;
75: }
76:
77: /**
78: * Update the metric information after a message store operation (for a single metric)
79: *
80: * @param metricInfo
81: * @return
82: * @throws KapuaIllegalArgumentException
83: * @throws ConfigurationException
84: * @throws ClientException
85: */
86: public StorableId upstore(MetricInfo metricInfo) throws KapuaIllegalArgumentException, ConfigurationException, ClientException, MappingException {
87: ArgumentValidator.notNull(metricInfo, "metricInfo");
88: ArgumentValidator.notNull(metricInfo.getScopeId(), "metricInfo.scopeId");
89: ArgumentValidator.notNull(metricInfo.getFirstMessageId(), "metricInfoCreator.firstPublishedMessageId");
90: ArgumentValidator.notNull(metricInfo.getFirstMessageOn(), "metricInfoCreator.firstPublishedMessageTimestamp");
91:
92: String metricInfoId = MetricInfoField.getOrDeriveId(metricInfo.getId(), metricInfo);
93: StorableId storableId = STORABLE_ID_FACTORY.newStorableId(metricInfoId);
94:
95: UpdateResponse response;
96: // Store channel. Look up channel in the cache, and cache it if it doesn't exist
97:• if (!DatastoreCacheManager.getInstance().getMetricsCache().get(metricInfoId)) {
98: // fix #REPLACE_ISSUE_NUMBER
99: MetricInfo storedField = find(metricInfo.getScopeId(), storableId);
100:• if (storedField == null) {
101: Metadata metadata = mediator.getMetadata(metricInfo.getScopeId(), metricInfo.getFirstMessageOn().getTime());
102:
103: String kapuaIndexName = metadata.getMetricRegistryIndexName();
104:
105: UpdateRequest request = new UpdateRequest(metricInfo.getId().toString(), new TypeDescriptor(metadata.getMetricRegistryIndexName(), MetricInfoSchema.METRIC_TYPE_NAME), metricInfo);
106: response = getElasticsearchClient().upsert(request);
107:
108: LOG.debug("Upsert on metric successfully executed [{}.{}, {} - {}]", kapuaIndexName, MetricInfoSchema.METRIC_TYPE_NAME, metricInfoId, response.getId());
109: }
110: // Update cache if metric update is completed successfully
111: DatastoreCacheManager.getInstance().getMetricsCache().put(metricInfoId, true);
112: }
113: return storableId;
114: }
115:
116: /**
117: * Update the metrics informations after a message store operation (for few metrics)
118: *
119: * @param metricInfos
120: * @return
121: * @throws KapuaIllegalArgumentException
122: * @throws ConfigurationException
123: * @throws ClientException
124: */
125: public BulkUpdateResponse upstore(MetricInfo[] metricInfos)
126: throws KapuaIllegalArgumentException,
127: ConfigurationException,
128: ClientException,
129: MappingException {
130: ArgumentValidator.notNull(metricInfos, "metricInfos");
131:
132: BulkUpdateRequest bulkRequest = new BulkUpdateRequest();
133: boolean performUpdate = false;
134: // Create a bulk request
135:• for (MetricInfo metricInfo : metricInfos) {
136: String metricInfoId = MetricInfoField.getOrDeriveId(metricInfo.getId(), metricInfo);
137: // fix #REPLACE_ISSUE_NUMBER
138:• if (!DatastoreCacheManager.getInstance().getMetricsCache().get(metricInfoId)) {
139: StorableId storableId = STORABLE_ID_FACTORY.newStorableId(metricInfoId);
140: MetricInfo storedField = find(metricInfo.getScopeId(), storableId);
141:• if (storedField != null) {
142: DatastoreCacheManager.getInstance().getMetricsCache().put(metricInfoId, true);
143: continue;
144: }
145: performUpdate = true;
146: Metadata metadata = mediator.getMetadata(metricInfo.getScopeId(), metricInfo.getFirstMessageOn().getTime());
147:
148: bulkRequest.add(
149: new UpdateRequest(
150: metricInfo.getId().toString(),
151: new TypeDescriptor(metadata.getMetricRegistryIndexName(),
152: MetricInfoSchema.METRIC_TYPE_NAME),
153: metricInfo)
154: );
155: }
156: }
157:
158: BulkUpdateResponse upsertResponse = null;
159:• if (performUpdate) {
160: // execute the upstore
161: try {
162: upsertResponse = getElasticsearchClient().upsert(bulkRequest);
163: } catch (ClientException e) {
164: LOG.trace("Upsert failed {}", e.getMessage());
165: throw e;
166: }
167:
168:• if (upsertResponse != null) {
169:• if (upsertResponse.getResponse().isEmpty()) {
170: return upsertResponse;
171: }
172:
173:• for (UpdateResponse response : upsertResponse.getResponse()) {
174: String index = response.getTypeDescriptor().getIndex();
175: String type = response.getTypeDescriptor().getType();
176: String id = response.getId();
177: LOG.debug("Upsert on channel metric successfully executed [{}.{}, {}]", index, type, id);
178:
179:• if (id == null || DatastoreCacheManager.getInstance().getMetricsCache().get(id)) {
180: continue;
181: }
182:
183: // Update cache if channel metric update is completed successfully
184: DatastoreCacheManager.getInstance().getMetricsCache().put(id, true);
185: }
186: }
187: }
188: return upsertResponse;
189: }
190:
191: /**
192: * Delete metric information by identifier.<br>
193: * <b>Be careful using this function since it doesn't guarantee the datastore consistency.<br>
194: * It just deletes the metric info registry entry by id without checking the consistency of the others registries or the message store.</b>
195: *
196: * @param scopeId
197: * @param id
198: * @throws KapuaIllegalArgumentException
199: * @throws ConfigurationException
200: * @throws ClientException
201: */
202: public void delete(KapuaId scopeId, StorableId id) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
203: ArgumentValidator.notNull(scopeId, "scopeId");
204: ArgumentValidator.notNull(id, "id");
205:
206:• if (!isDatastoreServiceEnabled(scopeId)) {
207: LOG.debug("Storage not enabled for account {}, return", scopeId);
208: return;
209: }
210:
211: String indexName = SchemaUtil.getMetricIndexName(scopeId);
212: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MetricInfoSchema.METRIC_TYPE_NAME);
213: getElasticsearchClient().delete(typeDescriptor, id.toString());
214: }
215:
216: /**
217: * Find metric information by identifier
218: *
219: * @param scopeId
220: * @param id
221: * @return
222: * @throws KapuaIllegalArgumentException
223: * @throws ConfigurationException
224: * @throws ClientException
225: */
226: public MetricInfo find(KapuaId scopeId, StorableId id) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
227: ArgumentValidator.notNull(scopeId, "scopeId");
228: ArgumentValidator.notNull(id, "id");
229:
230: MetricInfoQueryImpl idsQuery = new MetricInfoQueryImpl(scopeId);
231: idsQuery.setLimit(1);
232:
233: IdsPredicate idsPredicate = STORABLE_PREDICATE_FACTORY.newIdsPredicate(MetricInfoSchema.METRIC_TYPE_NAME);
234: idsPredicate.addId(id);
235: idsQuery.setPredicate(idsPredicate);
236:
237: MetricInfoListResult result = query(idsQuery);
238: return result.getFirstItem();
239: }
240:
241: /**
242: * Find metrics informations matching the given query
243: *
244: * @param query
245: * @return
246: * @throws KapuaIllegalArgumentException
247: * @throws ConfigurationException
248: * @throws ClientException
249: */
250: public MetricInfoListResult query(MetricInfoQuery query) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
251: ArgumentValidator.notNull(query, QUERY);
252: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
253:
254:• if (!isDatastoreServiceEnabled(query.getScopeId())) {
255: LOG.debug(STORAGE_NOT_ENABLED, query.getScopeId());
256: return new MetricInfoListResultImpl();
257: }
258:
259: String indexNme = SchemaUtil.getMetricIndexName(query.getScopeId());
260: TypeDescriptor typeDescriptor = new TypeDescriptor(indexNme, MetricInfoSchema.METRIC_TYPE_NAME);
261: ResultList<MetricInfo> rl = getElasticsearchClient().query(typeDescriptor, query, MetricInfo.class);
262: MetricInfoListResult result = new MetricInfoListResultImpl(rl);
263: setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
264: return result;
265: }
266:
267: /**
268: * Get metrics informations count matching the given query
269: *
270: * @param query
271: * @return
272: * @throws KapuaIllegalArgumentException
273: * @throws ConfigurationException
274: * @throws ClientException
275: */
276: public long count(MetricInfoQuery query) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
277: ArgumentValidator.notNull(query, QUERY);
278: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
279:
280:• if (!isDatastoreServiceEnabled(query.getScopeId())) {
281: LOG.debug(STORAGE_NOT_ENABLED, query.getScopeId());
282: return 0;
283: }
284:
285: String indexName = SchemaUtil.getMetricIndexName(query.getScopeId());
286: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MetricInfoSchema.METRIC_TYPE_NAME);
287: return getElasticsearchClient().count(typeDescriptor, query);
288: }
289:
290: /**
291: * Delete metrics informations count matching the given query.<br>
292: * <b>Be careful using this function since it doesn't guarantee the datastore consistency.<br>
293: * It just deletes the metric info registry entries that matching the query without checking the consistency of the others registries or the message store.</b>
294: *
295: * @param query
296: * @throws KapuaIllegalArgumentException
297: * @throws ConfigurationException
298: * @throws ClientException
299: */
300: public void delete(MetricInfoQuery query) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
301: ArgumentValidator.notNull(query, QUERY);
302: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
303:
304:• if (!isDatastoreServiceEnabled(query.getScopeId())) {
305: LOG.debug(STORAGE_NOT_ENABLED, query.getScopeId());
306: return;
307: }
308:
309: String indexName = SchemaUtil.getMetricIndexName(query.getScopeId());
310: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MetricInfoSchema.METRIC_TYPE_NAME);
311: getElasticsearchClient().deleteByQuery(typeDescriptor, query);
312: }
313: }