Skip to content

Package: MessageStoreFacade

MessageStoreFacade

nameinstructionbranchcomplexitylinemethod
MessageStoreFacade(ConfigurationProvider, MessageStoreMediator)
M: 33 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
convertTo(KapuaMessage, String)
M: 63 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 16 C: 0
0%
M: 1 C: 0
0%
count(MessageQuery)
M: 35 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
delete(KapuaId, StorableId)
M: 59 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 18 C: 0
0%
M: 1 C: 0
0%
delete(MessageQuery)
M: 34 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
deleteAllIndexes()
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
deleteIndexes(String)
M: 10 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, StorableId, StorableFetchStyle)
M: 11 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, String, StorableId, StorableFetchStyle)
M: 43 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
isAnyClientId(String)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
query(MessageQuery)
M: 51 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
refreshAllIndexes()
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
resetCache(KapuaId, KapuaId, String, String)
M: 382 C: 0
0%
M: 38 C: 0
0%
M: 20 C: 0
0%
M: 83 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 16 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
store(KapuaMessage, String, boolean)
M: 228 C: 0
0%
M: 22 C: 0
0%
M: 12 C: 0
0%
M: 47 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.service.datastore.internal;
14:
15: import com.codahale.metrics.Counter;
16: import org.eclipse.kapua.KapuaException;
17: import org.eclipse.kapua.KapuaIllegalArgumentException;
18: import org.eclipse.kapua.commons.cache.LocalCache;
19: import org.eclipse.kapua.commons.metric.MetricServiceFactory;
20: import org.eclipse.kapua.commons.metric.MetricsService;
21: import org.eclipse.kapua.commons.util.ArgumentValidator;
22: import org.eclipse.kapua.commons.util.KapuaDateUtils;
23: import org.eclipse.kapua.locator.KapuaLocator;
24: import org.eclipse.kapua.message.KapuaMessage;
25: import org.eclipse.kapua.message.device.data.KapuaDataChannel;
26: import org.eclipse.kapua.message.internal.device.data.KapuaDataChannelImpl;
27: import org.eclipse.kapua.model.id.KapuaId;
28: import org.eclipse.kapua.service.datastore.exception.DatastoreDisabledException;
29: import org.eclipse.kapua.service.datastore.internal.mediator.ConfigurationException;
30: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreChannel;
31: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreUtils;
32: import org.eclipse.kapua.service.datastore.internal.mediator.MessageField;
33: import org.eclipse.kapua.service.datastore.internal.mediator.MessageInfo;
34: import org.eclipse.kapua.service.datastore.internal.mediator.MessageStoreConfiguration;
35: import org.eclipse.kapua.service.datastore.internal.mediator.MessageStoreMediator;
36: import org.eclipse.kapua.service.datastore.internal.mediator.Metric;
37: import org.eclipse.kapua.service.datastore.internal.model.DataIndexBy;
38: import org.eclipse.kapua.service.datastore.internal.model.DatastoreMessageImpl;
39: import org.eclipse.kapua.service.datastore.internal.model.MessageListResultImpl;
40: import org.eclipse.kapua.service.datastore.internal.model.MessageUniquenessCheck;
41: import org.eclipse.kapua.service.datastore.internal.model.query.ChannelInfoQueryImpl;
42: import org.eclipse.kapua.service.datastore.internal.model.query.ClientInfoQueryImpl;
43: import org.eclipse.kapua.service.datastore.internal.model.query.MessageQueryImpl;
44: import org.eclipse.kapua.service.datastore.internal.model.query.MetricInfoQueryImpl;
45: import org.eclipse.kapua.service.datastore.internal.model.query.predicate.ChannelMatchPredicateImpl;
46: import org.eclipse.kapua.service.datastore.internal.schema.ChannelInfoSchema;
47: import org.eclipse.kapua.service.datastore.internal.schema.ClientInfoSchema;
48: import org.eclipse.kapua.service.datastore.internal.schema.MessageSchema;
49: import org.eclipse.kapua.service.datastore.internal.schema.Metadata;
50: import org.eclipse.kapua.service.datastore.internal.schema.MetricInfoSchema;
51: import org.eclipse.kapua.service.datastore.internal.schema.SchemaUtil;
52: import org.eclipse.kapua.service.datastore.model.ChannelInfo;
53: import org.eclipse.kapua.service.datastore.model.ClientInfo;
54: import org.eclipse.kapua.service.datastore.model.DatastoreMessage;
55: import org.eclipse.kapua.service.datastore.model.MessageListResult;
56: import org.eclipse.kapua.service.datastore.model.MetricInfo;
57: import org.eclipse.kapua.service.datastore.model.query.MessageQuery;
58: import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
59: import org.eclipse.kapua.service.elasticsearch.client.exception.QueryMappingException;
60: import org.eclipse.kapua.service.elasticsearch.client.model.InsertRequest;
61: import org.eclipse.kapua.service.elasticsearch.client.model.InsertResponse;
62: import org.eclipse.kapua.service.elasticsearch.client.model.ResultList;
63: import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
64: import org.eclipse.kapua.service.storable.exception.MappingException;
65: import org.eclipse.kapua.service.storable.model.id.StorableId;
66: import org.eclipse.kapua.service.storable.model.id.StorableIdFactory;
67: import org.eclipse.kapua.service.storable.model.query.StorableFetchStyle;
68: import org.eclipse.kapua.service.storable.model.query.predicate.IdsPredicate;
69: import org.eclipse.kapua.service.storable.model.query.predicate.StorablePredicateFactory;
70: import org.slf4j.Logger;
71: import org.slf4j.LoggerFactory;
72:
73: import java.util.Date;
74: import java.util.HashMap;
75: import java.util.Map;
76:
77: /**
78: * Message store facade
79: *
80: * @since 1.0.0
81: */
82: public final class MessageStoreFacade extends AbstractRegistryFacade {
83:
84: private static final Logger LOG = LoggerFactory.getLogger(MessageStoreFacade.class);
85:
86: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
87: private static final StorableIdFactory STORABLE_ID_FACTORY = LOCATOR.getFactory(StorableIdFactory.class);
88: private static final StorablePredicateFactory STORABLE_PREDICATE_FACTORY = LOCATOR.getFactory(StorablePredicateFactory.class);
89:
90: private final Counter metricMessagesAlreadyInTheDatastoreCount;
91:
92: private final MessageStoreMediator mediator;
93:
94: private static final String QUERY = "query";
95: private static final String QUERY_SCOPE_ID = "query.scopeId";
96: private static final String SCOPE_ID = "scopeId";
97:
98: /**
99: * Constructs the message store facade
100: *
101: * @param confProvider
102: * @param mediator
103: * @since 1.0.0
104: */
105: public MessageStoreFacade(ConfigurationProvider confProvider, MessageStoreMediator mediator) {
106: super(confProvider);
107:
108: this.mediator = mediator;
109:
110: MetricsService metricService = MetricServiceFactory.getInstance();
111: metricMessagesAlreadyInTheDatastoreCount = metricService.getCounter(DataStoreDriverMetrics.METRIC_MODULE_NAME, DataStoreDriverMetrics.METRIC_COMPONENT_NAME, DataStoreDriverMetrics.METRIC_STORE, DataStoreDriverMetrics.METRIC_MESSAGES, DataStoreDriverMetrics.METRIC_ALREADY_IN_THE_DATASTORE, DataStoreDriverMetrics.METRIC_COUNT);
112: }
113:
114: /**
115: * Store a message
116: *
117: * @param message
118: * @return
119: * @throws KapuaIllegalArgumentException
120: * @throws ConfigurationException
121: * @throws ClientException
122: */
123: public StorableId store(KapuaMessage<?, ?> message, String messageId, boolean newInsert)
124: throws KapuaIllegalArgumentException,
125: DatastoreDisabledException,
126: ConfigurationException,
127: ClientException, MappingException {
128: ArgumentValidator.notNull(message, "message");
129: ArgumentValidator.notNull(message.getScopeId(), SCOPE_ID);
130: ArgumentValidator.notNull(message.getReceivedOn(), "receivedOn");
131: ArgumentValidator.notNull(messageId, "messageId");
132:
133: // Define data TTL
134:• if (!isDatastoreServiceEnabled(message.getScopeId())) {
135: throw new DatastoreDisabledException(message.getScopeId());
136: }
137:
138: Date capturedOn = message.getCapturedOn();
139: // Overwrite timestamp if necessary
140: // Use the account service plan to determine whether we will give
141: // precede to the device time
142: MessageStoreConfiguration accountServicePlan = getConfigProvider().getConfiguration(message.getScopeId());
143: long indexedOn = KapuaDateUtils.getKapuaSysDate().toEpochMilli();
144:• if (DataIndexBy.DEVICE_TIMESTAMP.equals(accountServicePlan.getDataIndexBy())) {
145:• if (capturedOn != null) {
146: indexedOn = capturedOn.getTime();
147: } else {
148: LOG.debug("The account is set to use, as date indexing, the device timestamp but the device timestamp is null! Current system date will be used to indexing the message by date!");
149: }
150: }
151: // Extract schema metadata
152: Metadata schemaMetadata = mediator.getMetadata(message.getScopeId(), indexedOn);
153:
154: Date indexedOnDate = new Date(indexedOn);
155: String indexName = schemaMetadata.getDataIndexName();
156: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);
157:
158:• if (!newInsert && !MessageUniquenessCheck.NONE.equals(accountServicePlan.getMessageUniquenessCheck())) {
159:• DatastoreMessage datastoreMessage = MessageUniquenessCheck.FULL.equals(accountServicePlan.getMessageUniquenessCheck()) ?
160: find(message.getScopeId(), STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT) :
161: find(message.getScopeId(), indexName, STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT);
162:• if (datastoreMessage != null) {
163: LOG.debug("Message with datatstore id '{}' already found", messageId);
164: metricMessagesAlreadyInTheDatastoreCount.inc();
165: return STORABLE_ID_FACTORY.newStorableId(messageId);
166: }
167: }
168:
169: // Save message (the big one)
170: DatastoreMessage messageToStore = convertTo(message, messageId);
171: messageToStore.setTimestamp(indexedOnDate);
172: InsertRequest insertRequest = new InsertRequest(messageToStore.getDatastoreId().toString(), typeDescriptor, messageToStore);
173: // Possibly update the schema with new metric mappings
174: Map<String, Metric> metrics = new HashMap<>();
175:• if (message.getPayload() != null && message.getPayload().getMetrics() != null && !message.getPayload().getMetrics().isEmpty()) {
176:
177: Map<String, Object> messageMetrics = message.getPayload().getMetrics();
178:• for (Map.Entry<String, Object> messageMetric : messageMetrics.entrySet()) {
179: String metricName = DatastoreUtils.normalizeMetricName(messageMetric.getKey());
180: String clientMetricType = DatastoreUtils.getClientMetricFromType(messageMetric.getValue().getClass());
181: Metric metric = new Metric(metricName, clientMetricType);
182:
183: // each metric is potentially a dynamic field so report it a new mapping
184: String mappedName = DatastoreUtils.getMetricValueQualifier(metricName, clientMetricType);
185: metrics.put(mappedName, metric);
186: }
187: }
188: try {
189: mediator.onUpdatedMappings(message.getScopeId(), indexedOn, metrics);
190: } catch (KapuaException e) {
191: e.printStackTrace();
192: }
193:
194: InsertResponse insertResponse = getElasticsearchClient().insert(insertRequest);
195: messageToStore.setDatastoreId(STORABLE_ID_FACTORY.newStorableId(insertResponse.getId()));
196:
197: MessageInfo messageInfo = getConfigProvider().getInfo(message.getScopeId());
198: mediator.onAfterMessageStore(messageInfo, messageToStore);
199:
200: return STORABLE_ID_FACTORY.newStorableId(insertResponse.getId());
201: }
202:
203: /**
204: * Delete message by identifier.<br>
205: * <b>Be careful using this function since it doesn't guarantee the datastore consistency.<br>
206: * It just deletes the message by id without checking the consistency of the registries.</b>
207: *
208: * @param scopeId
209: * @param id
210: * @throws KapuaIllegalArgumentException
211: * @throws ConfigurationException
212: * @throws ClientException
213: */
214: public void delete(KapuaId scopeId, StorableId id)
215: throws KapuaIllegalArgumentException,
216: ConfigurationException,
217: ClientException {
218: ArgumentValidator.notNull(scopeId, SCOPE_ID);
219: ArgumentValidator.notNull(id, "id");
220:
221:• if (!isDatastoreServiceEnabled(scopeId)) {
222: LOG.debug("Storage not enabled for account {}, return", scopeId);
223: return;
224: }
225:
226: // get the index by finding the object by id
227: DatastoreMessage messageToBeDeleted = find(scopeId, id, StorableFetchStyle.FIELDS);
228:• if (messageToBeDeleted != null) {
229: Metadata schemaMetadata = null;
230: try {
231: schemaMetadata = mediator.getMetadata(scopeId, messageToBeDeleted.getTimestamp().getTime());
232: } catch (KapuaException e) {
233: e.printStackTrace();
234: }
235: String indexName = schemaMetadata.getDataIndexName();
236: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);
237: getElasticsearchClient().delete(typeDescriptor, id.toString());
238: } else {
239: LOG.warn("Cannot find the message to be deleted. scopeId: '{}' - id: '{}'", scopeId, id);
240: }
241: // otherwise no message to be deleted found
242: }
243:
244: /**
245: * Find message by identifier
246: *
247: * @param scopeId
248: * @param id
249: * @param fetchStyle
250: * @return
251: * @throws KapuaIllegalArgumentException
252: * @throws QueryMappingException
253: * @throws ClientException
254: */
255: public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle) throws KapuaIllegalArgumentException, ClientException {
256: ArgumentValidator.notNull(scopeId, SCOPE_ID);
257: return find(scopeId, SchemaUtil.getDataIndexName(scopeId), id, fetchStyle);
258: }
259:
260: /**
261: * Find message by identifier
262: *
263: * @param scopeId
264: * @param id
265: * @param fetchStyle
266: * @return
267: * @throws KapuaIllegalArgumentException
268: * @throws QueryMappingException
269: * @throws ClientException
270: */
271: public DatastoreMessage find(KapuaId scopeId, String indexName, StorableId id, StorableFetchStyle fetchStyle)
272: throws KapuaIllegalArgumentException, ClientException {
273:
274: ArgumentValidator.notNull(scopeId, SCOPE_ID);
275: ArgumentValidator.notNull(id, "id");
276: ArgumentValidator.notNull(fetchStyle, "fetchStyle");
277:
278: MessageQueryImpl idsQuery = new MessageQueryImpl(scopeId);
279: idsQuery.setLimit(1);
280:
281: IdsPredicate idsPredicate = STORABLE_PREDICATE_FACTORY.newIdsPredicate(MessageSchema.MESSAGE_TYPE_NAME);
282: idsPredicate.addId(id);
283: idsQuery.setPredicate(idsPredicate);
284:
285: // String indexName = SchemaUtil.getDataIndexName(scopeId);
286: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);
287: return getElasticsearchClient().find(typeDescriptor, idsQuery, DatastoreMessage.class);
288: }
289:
290: /**
291: * Find messages matching the given query
292: *
293: * @param query
294: * @return
295: * @throws KapuaIllegalArgumentException
296: * @throws ConfigurationException
297: * @throws QueryMappingException
298: * @throws ClientException
299: */
300: public MessageListResult query(MessageQuery query)
301: throws KapuaIllegalArgumentException,
302: ConfigurationException,
303: ClientException {
304: ArgumentValidator.notNull(query, QUERY);
305: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
306:
307:• if (!isDatastoreServiceEnabled(query.getScopeId())) {
308: LOG.debug("Storage not enabled for account {}, returning empty result", query.getScopeId());
309: return new MessageListResultImpl();
310: }
311:
312: String dataIndexName = SchemaUtil.getDataIndexName(query.getScopeId());
313: TypeDescriptor typeDescriptor = new TypeDescriptor(dataIndexName, MessageSchema.MESSAGE_TYPE_NAME);
314: ResultList<DatastoreMessage> rl = getElasticsearchClient().query(typeDescriptor, query, DatastoreMessage.class);
315: MessageListResult result = new MessageListResultImpl(rl);
316: setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
317: return result;
318: }
319:
320: /**
321: * Get messages count matching the given query
322: *
323: * @param query
324: * @return
325: * @throws KapuaIllegalArgumentException
326: * @throws ConfigurationException
327: * @throws ClientException
328: */
329: public long count(MessageQuery query)
330: throws KapuaIllegalArgumentException,
331: ConfigurationException,
332: ClientException {
333: ArgumentValidator.notNull(query, QUERY);
334: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
335:
336:• if (!isDatastoreServiceEnabled(query.getScopeId())) {
337: LOG.debug("Storage not enabled for account {}, returning empty result", query.getScopeId());
338: return 0;
339: }
340:
341: String indexName = SchemaUtil.getDataIndexName(query.getScopeId());
342: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);
343: return getElasticsearchClient().count(typeDescriptor, query);
344: }
345:
346: /**
347: * Delete messages count matching the given query.<br>
348: * <b>Be careful using this function since it doesn't guarantee the datastore consistency.<br>
349: * It just deletes the messages that matching the query without checking the consistency of the registries.</b>
350: *
351: * @param query
352: * @throws KapuaIllegalArgumentException
353: * @throws ConfigurationException
354: * @throws ClientException
355: */
356: public void delete(MessageQuery query)
357: throws KapuaIllegalArgumentException,
358: ConfigurationException,
359: ClientException {
360: ArgumentValidator.notNull(query, QUERY);
361: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
362:
363:• if (!isDatastoreServiceEnabled(query.getScopeId())) {
364: LOG.debug("Storage not enabled for account {}, skipping delete", query.getScopeId());
365: return;
366: }
367:
368: String indexName = SchemaUtil.getDataIndexName(query.getScopeId());
369: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);
370: getElasticsearchClient().deleteByQuery(typeDescriptor, query);
371: }
372:
373: // TODO cache will not be reset from the client code it should be automatically reset
374: // after some time.
375: private void resetCache(KapuaId scopeId, KapuaId deviceId, String channel, String clientId) throws Exception {
376:
377: boolean isAnyClientId;
378: boolean isClientToDelete = false;
379: String semTopic;
380:
381:• if (channel != null) {
382:
383: // determine if we should delete an client if topic = account/clientId/#
384: isAnyClientId = isAnyClientId(channel);
385: semTopic = channel;
386:
387:• if (semTopic.isEmpty() && !isAnyClientId) {
388: isClientToDelete = true;
389: }
390: } else {
391: isClientToDelete = true;
392: }
393:
394: // Find all topics
395: String dataIndexName = SchemaUtil.getDataIndexName(scopeId);
396:
397: int pageSize = 1000;
398: int offset = 0;
399: long totalHits = 1;
400:
401: MetricInfoQueryImpl metricQuery = new MetricInfoQueryImpl(scopeId);
402: metricQuery.setLimit(pageSize + 1);
403: metricQuery.setOffset(offset);
404:
405: ChannelMatchPredicateImpl channelPredicate = new ChannelMatchPredicateImpl(MessageField.CHANNEL, channel);
406: metricQuery.setPredicate(channelPredicate);
407:
408: // Remove metrics
409:• while (totalHits > 0) {
410: TypeDescriptor typeDescriptor = new TypeDescriptor(dataIndexName, MetricInfoSchema.METRIC_TYPE_NAME);
411: ResultList<MetricInfo> metrics = getElasticsearchClient().query(typeDescriptor, metricQuery, MetricInfo.class);
412:
413: totalHits = metrics.getTotalCount();
414: LocalCache<String, Boolean> metricsCache = DatastoreCacheManager.getInstance().getMetricsCache();
415:• long toBeProcessed = totalHits > pageSize ? pageSize : totalHits;
416:
417:• for (int i = 0; i < toBeProcessed; i++) {
418: String id = metrics.getResult().get(i).getId().toString();
419:• if (metricsCache.get(id)) {
420: metricsCache.remove(id);
421: }
422: }
423:
424:• if (totalHits > pageSize) {
425: offset += pageSize + 1;
426: }
427: }
428: LOG.debug("Removed cached channel metrics for: {}", channel);
429: TypeDescriptor typeMetricDescriptor = new TypeDescriptor(dataIndexName, MetricInfoSchema.METRIC_TYPE_NAME);
430: getElasticsearchClient().deleteByQuery(typeMetricDescriptor, metricQuery);
431: LOG.debug("Removed channel metrics for: {}", channel);
432: ChannelInfoQueryImpl channelQuery = new ChannelInfoQueryImpl(scopeId);
433: channelQuery.setLimit(pageSize + 1);
434: channelQuery.setOffset(offset);
435:
436: channelPredicate = new ChannelMatchPredicateImpl(MessageField.CHANNEL, channel);
437: channelQuery.setPredicate(channelPredicate);
438:
439: // Remove channel
440: offset = 0;
441: totalHits = 1;
442:• while (totalHits > 0) {
443: TypeDescriptor typeDescriptor = new TypeDescriptor(dataIndexName, ChannelInfoSchema.CHANNEL_TYPE_NAME);
444: ResultList<ChannelInfo> channels = getElasticsearchClient().query(typeDescriptor, channelQuery, ChannelInfo.class);
445:
446: totalHits = channels.getTotalCount();
447: LocalCache<String, Boolean> channelsCache = DatastoreCacheManager.getInstance().getChannelsCache();
448:• long toBeProcessed = totalHits > pageSize ? pageSize : totalHits;
449:
450:• for (int i = 0; i < toBeProcessed; i++) {
451: String id = channels.getResult().get(0).getId().toString();
452:• if (channelsCache.get(id)) {
453: channelsCache.remove(id);
454: }
455: }
456:• if (totalHits > pageSize) {
457: offset += pageSize + 1;
458: }
459: }
460:
461: LOG.debug("Removed cached channels for: {}", channel);
462: TypeDescriptor typeChannelDescriptor = new TypeDescriptor(dataIndexName, ChannelInfoSchema.CHANNEL_TYPE_NAME);
463: getElasticsearchClient().deleteByQuery(typeChannelDescriptor, channelQuery);
464:
465: LOG.debug("Removed channels for: {}", channel);
466: // Remove client
467:• if (isClientToDelete) {
468: ClientInfoQueryImpl clientInfoQuery = new ClientInfoQueryImpl(scopeId);
469: clientInfoQuery.setLimit(pageSize + 1);
470: clientInfoQuery.setOffset(offset);
471:
472: channelPredicate = new ChannelMatchPredicateImpl(MessageField.CHANNEL, channel);
473: clientInfoQuery.setPredicate(channelPredicate);
474: offset = 0;
475: totalHits = 1;
476:• while (totalHits > 0) {
477: TypeDescriptor typeDescriptor = new TypeDescriptor(dataIndexName, ClientInfoSchema.CLIENT_TYPE_NAME);
478: ResultList<ClientInfo> clients = getElasticsearchClient().query(typeDescriptor, clientInfoQuery, ClientInfo.class);
479:
480: totalHits = clients.getTotalCount();
481: LocalCache<String, Boolean> clientsCache = DatastoreCacheManager.getInstance().getClientsCache();
482:• long toBeProcessed = totalHits > pageSize ? pageSize : totalHits;
483:
484:• for (int i = 0; i < toBeProcessed; i++) {
485: String id = clients.getResult().get(i).getId().toString();
486:• if (clientsCache.get(id)) {
487: clientsCache.remove(id);
488: }
489: }
490:• if (totalHits > pageSize) {
491: offset += pageSize + 1;
492: }
493: }
494:
495: LOG.debug("Removed cached clients for: {}", channel);
496: TypeDescriptor typeClientDescriptor = new TypeDescriptor(dataIndexName, ClientInfoSchema.CLIENT_TYPE_NAME);
497: getElasticsearchClient().deleteByQuery(typeClientDescriptor, clientInfoQuery);
498:
499: LOG.debug("Removed clients for: {}", channel);
500: }
501: }
502:
503: // Utility methods
504:
505: /**
506: * Check if the channel admit any client identifier (so if the channel has a specific wildcard in the second topic level).<br>
507: * In the MQTT word this method return true if the topic starts with 'account/+/'.
508: *
509: * @param clientId
510: * @return
511: * @since 1.0.0
512: */
513: private boolean isAnyClientId(String clientId) {
514: return DatastoreChannel.SINGLE_LEVEL_WCARD.equals(clientId);
515: }
516:
517: /**
518: * This constructor should be used for wrapping Kapua message into datastore message for insert purpose
519: *
520: * @param message
521: */
522: private DatastoreMessage convertTo(KapuaMessage<?, ?> message, String messageId) {
523: KapuaDataChannel datastoreChannel = new KapuaDataChannelImpl();
524: datastoreChannel.setSemanticParts(message.getChannel().getSemanticParts());
525:
526: DatastoreMessage datastoreMessage = new DatastoreMessageImpl();
527: datastoreMessage.setCapturedOn(message.getCapturedOn());
528: datastoreMessage.setChannel(datastoreChannel);
529: datastoreMessage.setClientId(message.getClientId());
530: datastoreMessage.setDeviceId(message.getDeviceId());
531: datastoreMessage.setId(message.getId());
532: datastoreMessage.setPayload(message.getPayload());
533: datastoreMessage.setPosition(message.getPosition());
534: datastoreMessage.setReceivedOn(message.getReceivedOn());
535: datastoreMessage.setScopeId(message.getScopeId());
536: datastoreMessage.setSentOn(message.getSentOn());
537:
538: // generate uuid
539: datastoreMessage.setId(message.getId());
540: datastoreMessage.setDatastoreId(STORABLE_ID_FACTORY.newStorableId(messageId));
541: return datastoreMessage;
542: }
543:
544: public void refreshAllIndexes() throws ClientException {
545: getElasticsearchClient().refreshAllIndexes();
546: }
547:
548: public void deleteAllIndexes() throws ClientException {
549: getElasticsearchClient().deleteAllIndexes();
550: }
551:
552: public void deleteIndexes(String indexExp) throws ClientException {
553: getElasticsearchClient().deleteIndexes(indexExp);
554: }
555: }