Skip to content

Package: ChannelInfoRegistryFacade

ChannelInfoRegistryFacade

nameinstructionbranchcomplexitylinemethod
ChannelInfoRegistryFacade(ConfigurationProvider, ChannelInfoRegistryMediator)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
count(ChannelInfoQuery)
M: 35 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
delete(ChannelInfoQuery)
M: 54 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 13 C: 0
0%
M: 1 C: 0
0%
delete(KapuaId, StorableId)
M: 42 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 12 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, StorableId)
M: 34 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
query(ChannelInfoQuery)
M: 51 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 16 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
upstore(ChannelInfo)
M: 119 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 20 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.service.datastore.internal;
14:
15: import org.eclipse.kapua.KapuaIllegalArgumentException;
16: import org.eclipse.kapua.commons.util.ArgumentValidator;
17: import org.eclipse.kapua.locator.KapuaLocator;
18: import org.eclipse.kapua.model.id.KapuaId;
19: import org.eclipse.kapua.service.datastore.internal.mediator.ChannelInfoField;
20: import org.eclipse.kapua.service.datastore.internal.mediator.ChannelInfoRegistryMediator;
21: import org.eclipse.kapua.service.datastore.internal.mediator.ConfigurationException;
22: import org.eclipse.kapua.service.datastore.internal.model.ChannelInfoListResultImpl;
23: import org.eclipse.kapua.service.datastore.internal.model.query.ChannelInfoQueryImpl;
24: import org.eclipse.kapua.service.datastore.internal.schema.ChannelInfoSchema;
25: import org.eclipse.kapua.service.datastore.internal.schema.Metadata;
26: import org.eclipse.kapua.service.datastore.internal.schema.SchemaUtil;
27: import org.eclipse.kapua.service.datastore.model.ChannelInfo;
28: import org.eclipse.kapua.service.datastore.model.ChannelInfoListResult;
29: import org.eclipse.kapua.service.datastore.model.query.ChannelInfoQuery;
30: import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
31: import org.eclipse.kapua.service.elasticsearch.client.model.ResultList;
32: import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
33: import org.eclipse.kapua.service.elasticsearch.client.model.UpdateRequest;
34: import org.eclipse.kapua.service.elasticsearch.client.model.UpdateResponse;
35: import org.eclipse.kapua.service.storable.exception.MappingException;
36: import org.eclipse.kapua.service.storable.model.id.StorableId;
37: import org.eclipse.kapua.service.storable.model.id.StorableIdFactory;
38: import org.eclipse.kapua.service.storable.model.query.predicate.IdsPredicate;
39: import org.eclipse.kapua.service.storable.model.query.predicate.StorablePredicateFactory;
40: import org.slf4j.Logger;
41: import org.slf4j.LoggerFactory;
42:
43: /**
44: * Channel information registry facade
45: *
46: * @since 1.0.0
47: */
48: public class ChannelInfoRegistryFacade extends AbstractRegistryFacade {
49:
50: private static final Logger LOG = LoggerFactory.getLogger(ChannelInfoRegistryFacade.class);
51:
52: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
53: private static final StorableIdFactory STORABLE_ID_FACTORY = LOCATOR.getFactory(StorableIdFactory.class);
54: private static final StorablePredicateFactory STORABLE_PREDICATE_FACTORY = LOCATOR.getFactory(StorablePredicateFactory.class);
55:
56: private final ChannelInfoRegistryMediator mediator;
57: private final Object metadataUpdateSync = new Object();
58:
59: private static final String QUERY = "query";
60: private static final String QUERY_SCOPE_ID = "query.scopeId";
61:
62: /**
63: * Constructs the channel info registry facade
64: *
65: * @param configProvider
66: * @param mediator
67: * @since 1.0.0
68: */
69: public ChannelInfoRegistryFacade(ConfigurationProvider configProvider, ChannelInfoRegistryMediator mediator) {
70: super(configProvider);
71:
72: this.mediator = mediator;
73: }
74:
75: /**
76: * Update the channel information after a message store operation
77: *
78: * @param channelInfo
79: * @return
80: * @throws KapuaIllegalArgumentException
81: * @throws ConfigurationException
82: * @throws ClientException
83: */
84: public StorableId upstore(ChannelInfo channelInfo) throws KapuaIllegalArgumentException, ConfigurationException, ClientException, MappingException {
85: ArgumentValidator.notNull(channelInfo, "channelInfo");
86: ArgumentValidator.notNull(channelInfo.getScopeId(), "channelInfo.scopeId");
87: ArgumentValidator.notNull(channelInfo.getName(), "channelInfo.name");
88: ArgumentValidator.notNull(channelInfo.getFirstMessageId(), "channelInfo.messageId");
89: ArgumentValidator.notNull(channelInfo.getFirstMessageOn(), "channelInfo.messageTimestamp");
90:
91: String channelInfoId = ChannelInfoField.getOrDeriveId(channelInfo.getId(), channelInfo);
92: StorableId storableId = STORABLE_ID_FACTORY.newStorableId(channelInfoId);
93:
94: UpdateResponse response;
95: // Store channel. Look up channel in the cache, and cache it if it doesn't exist
96:• if (!DatastoreCacheManager.getInstance().getChannelsCache().get(channelInfoId)) {
97: // The code is safe even without the synchronized block
98: // Synchronize in order to let the first thread complete its
99: // update then the others of the same type will find the cache
100: // updated and skip the update.
101: synchronized (metadataUpdateSync) {
102:• if (!DatastoreCacheManager.getInstance().getChannelsCache().get(channelInfoId)) {
103: ChannelInfo storedField = find(channelInfo.getScopeId(), storableId);
104:• if (storedField == null) {
105: Metadata metadata = mediator.getMetadata(channelInfo.getScopeId(), channelInfo.getFirstMessageOn().getTime());
106: String registryIndexName = metadata.getChannelRegistryIndexName();
107:
108: UpdateRequest request = new UpdateRequest(channelInfo.getId().toString(), new TypeDescriptor(metadata.getChannelRegistryIndexName(), ChannelInfoSchema.CHANNEL_TYPE_NAME), channelInfo);
109: response = getElasticsearchClient().upsert(request);
110:
111: LOG.debug("Upsert on channel successfully executed [{}.{}, {} - {}]", registryIndexName, ChannelInfoSchema.CHANNEL_TYPE_NAME, channelInfoId, response.getId());
112: }
113: // Update cache if channel update is completed successfully
114: DatastoreCacheManager.getInstance().getChannelsCache().put(channelInfoId, true);
115: }
116: }
117: }
118: return storableId;
119: }
120:
121: /**
122: * Delete channel information by identifier.
123: *
124: * <b>Be careful using this function since it doesn't guarantee the datastore consistency.<br>
125: * It just deletes the channel info registry entry by id without checking the consistency of the others registries or the message store.</b>
126: *
127: * @param scopeId
128: * @param id
129: * @throws KapuaIllegalArgumentException
130: * @throws ConfigurationException
131: * @throws ClientException
132: */
133: public void delete(KapuaId scopeId, StorableId id) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
134: ArgumentValidator.notNull(scopeId, "scopeId");
135: ArgumentValidator.notNull(id, "id");
136:
137:• if (!isDatastoreServiceEnabled(scopeId)) {
138: LOG.debug("Storage not enabled for account {}, return", scopeId);
139: return;
140: }
141:
142: String indexName = SchemaUtil.getChannelIndexName(scopeId);
143: ChannelInfo channelInfo = find(scopeId, id);
144:• if (channelInfo != null) {
145: mediator.onBeforeChannelInfoDelete(channelInfo);
146:
147: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, ChannelInfoSchema.CHANNEL_TYPE_NAME);
148: getElasticsearchClient().delete(typeDescriptor, id.toString());
149: }
150: }
151:
152: /**
153: * Find channel information by identifier
154: *
155: * @param scopeId
156: * @param id
157: * @return
158: * @throws KapuaIllegalArgumentException
159: * @throws ConfigurationException
160: * @throws ClientException
161: */
162: public ChannelInfo find(KapuaId scopeId, StorableId id) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
163: ArgumentValidator.notNull(scopeId, "scopeId");
164: ArgumentValidator.notNull(id, "id");
165:
166: ChannelInfoQueryImpl idsQuery = new ChannelInfoQueryImpl(scopeId);
167: idsQuery.setLimit(1);
168:
169: IdsPredicate idsPredicate = STORABLE_PREDICATE_FACTORY.newIdsPredicate(ChannelInfoSchema.CHANNEL_TYPE_NAME);
170: idsPredicate.addId(id);
171: idsQuery.setPredicate(idsPredicate);
172:
173: ChannelInfoListResult result = query(idsQuery);
174: return result.getFirstItem();
175: }
176:
177: /**
178: * Find channels informations matching the given query
179: *
180: * @param query
181: * @return
182: * @throws KapuaIllegalArgumentException
183: * @throws ConfigurationException
184: * @throws ClientException
185: */
186: public ChannelInfoListResult query(ChannelInfoQuery query) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
187: ArgumentValidator.notNull(query, QUERY);
188: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
189:
190:• if (!isDatastoreServiceEnabled(query.getScopeId())) {
191: LOG.debug("Storage not enabled for account {}, returning empty result", query.getScopeId());
192: return new ChannelInfoListResultImpl();
193: }
194:
195: String indexName = SchemaUtil.getChannelIndexName(query.getScopeId());
196: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, ChannelInfoSchema.CHANNEL_TYPE_NAME);
197: ResultList<ChannelInfo> rl = getElasticsearchClient().query(typeDescriptor, query, ChannelInfo.class);
198: ChannelInfoListResult result = new ChannelInfoListResultImpl(rl);
199: setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
200: return result;
201: }
202:
203: /**
204: * Get channels informations count matching the given query
205: *
206: * @param query
207: * @return
208: * @throws KapuaIllegalArgumentException
209: * @throws ConfigurationException
210: * @throws ClientException
211: */
212: public long count(ChannelInfoQuery query) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
213: ArgumentValidator.notNull(query, QUERY);
214: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
215:
216:• if (!isDatastoreServiceEnabled(query.getScopeId())) {
217: LOG.debug("Storage not enabled for account {}, returning empty result", query.getScopeId());
218: return 0;
219: }
220:
221: String indexName = SchemaUtil.getChannelIndexName(query.getScopeId());
222: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, ChannelInfoSchema.CHANNEL_TYPE_NAME);
223: return getElasticsearchClient().count(typeDescriptor, query);
224: }
225:
226: /**
227: * Delete channels informations count matching the given query.
228: *
229: * <b>Be careful using this function since it doesn't guarantee the datastore consistency.<br>
230: * It just deletes the channel info registry entries that matching the query without checking the consistency of the others registries or the message store.</b>
231: *
232: * @param query
233: * @throws KapuaIllegalArgumentException
234: * @throws ConfigurationException
235: * @throws ClientException
236: */
237: void delete(ChannelInfoQuery query) throws KapuaIllegalArgumentException, ConfigurationException, ClientException {
238: ArgumentValidator.notNull(query, QUERY);
239: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
240:
241:• if (!isDatastoreServiceEnabled(query.getScopeId())) {
242: LOG.debug("Storage not enabled for account {}, skipping delete", query.getScopeId());
243: return;
244: }
245:
246: String indexName = SchemaUtil.getChannelIndexName(query.getScopeId());
247: ChannelInfoListResult channels = query(query);
248:
249:• for (ChannelInfo channelInfo : channels.getItems()) {
250: mediator.onBeforeChannelInfoDelete(channelInfo);
251: }
252:
253: TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, ChannelInfoSchema.CHANNEL_TYPE_NAME);
254: getElasticsearchClient().deleteByQuery(typeDescriptor, query);
255: }
256: }