Skip to content

Package: DatastoreMediator

DatastoreMediator

nameinstructionbranchcomplexitylinemethod
DatastoreMediator()
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
clearCache()
M: 13 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
deleteAllIndexes()
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
deleteIndexes(String)
M: 7 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
getInstance()
M: 2 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getMetadata(KapuaId, long)
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
onAfterChannelInfoDelete(ChannelInfo)
M: 1 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
onAfterClientInfoDelete(KapuaId, ClientInfo)
M: 1 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
onAfterMessageStore(MessageInfo, DatastoreMessage)
M: 154 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 35 C: 0
0%
M: 1 C: 0
0%
onAfterMetricInfoDelete(KapuaId, MetricInfo)
M: 1 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
onBeforeChannelInfoDelete(ChannelInfo)
M: 1 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
onUpdatedMappings(KapuaId, long, Map)
M: 7 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
refreshAllIndexes()
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
setChannelInfoStoreFacade(ChannelInfoRegistryFacade)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
setClientInfoStoreFacade(ClientInfoRegistryFacade)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
setMessageStoreFacade(MessageStoreFacade)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
setMetricInfoStoreFacade(MetricInfoRegistryFacade)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 24 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: * Red Hat Inc
13: *******************************************************************************/
14: package org.eclipse.kapua.service.datastore.internal.mediator;
15:
16: import org.eclipse.kapua.KapuaIllegalArgumentException;
17: import org.eclipse.kapua.locator.KapuaLocator;
18: import org.eclipse.kapua.message.KapuaPayload;
19: import org.eclipse.kapua.model.id.KapuaId;
20: import org.eclipse.kapua.service.datastore.ChannelInfoRegistryService;
21: import org.eclipse.kapua.service.datastore.ClientInfoRegistryService;
22: import org.eclipse.kapua.service.datastore.MetricInfoRegistryService;
23: import org.eclipse.kapua.service.datastore.internal.ChannelInfoRegistryFacade;
24: import org.eclipse.kapua.service.datastore.internal.ClientInfoRegistryFacade;
25: import org.eclipse.kapua.service.datastore.internal.DatastoreCacheManager;
26: import org.eclipse.kapua.service.datastore.internal.MessageStoreFacade;
27: import org.eclipse.kapua.service.datastore.internal.MetricInfoRegistryFacade;
28: import org.eclipse.kapua.service.datastore.internal.model.ChannelInfoImpl;
29: import org.eclipse.kapua.service.datastore.internal.model.ClientInfoImpl;
30: import org.eclipse.kapua.service.datastore.internal.model.MetricInfoImpl;
31: import org.eclipse.kapua.service.datastore.internal.schema.Metadata;
32: import org.eclipse.kapua.service.datastore.internal.schema.Schema;
33: import org.eclipse.kapua.service.datastore.model.ChannelInfo;
34: import org.eclipse.kapua.service.datastore.model.ClientInfo;
35: import org.eclipse.kapua.service.datastore.model.DatastoreMessage;
36: import org.eclipse.kapua.service.datastore.model.MetricInfo;
37: import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
38: import org.eclipse.kapua.service.storable.exception.MappingException;
39: import org.eclipse.kapua.service.storable.model.id.StorableIdFactory;
40:
41: import java.util.Map;
42:
43: /**
44: * Datastore mediator definition
45: *
46: * @since 1.0.0
47: */
48: public class DatastoreMediator implements MessageStoreMediator,
49: ClientInfoRegistryMediator,
50: ChannelInfoRegistryMediator,
51: MetricInfoRegistryMediator {
52:
53: private static final DatastoreMediator INSTANCE;
54:
55: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
56: private static final StorableIdFactory STORABLE_ID_FACTORY = LOCATOR.getFactory(StorableIdFactory.class);
57:
58: private final Schema esSchema;
59:
60: private MessageStoreFacade messageStoreFacade;
61: private ClientInfoRegistryFacade clientInfoRegistryFacade;
62: private ChannelInfoRegistryFacade channelInfoStoreFacade;
63: private MetricInfoRegistryFacade metricInfoStoreFacade;
64:
65: static {
66: INSTANCE = new DatastoreMediator();
67:
68: // Be sure the data registry services are instantiated
69: KapuaLocator.getInstance().getService(ClientInfoRegistryService.class);
70: KapuaLocator.getInstance().getService(ChannelInfoRegistryService.class);
71: KapuaLocator.getInstance().getService(MetricInfoRegistryService.class);
72: }
73:
74: private DatastoreMediator() {
75: esSchema = new Schema();
76: }
77:
78: /**
79: * Gets the {@link DatastoreMediator} instance.
80: *
81: * @return The {@link DatastoreMediator} instance.
82: * @since 1.0.0
83: */
84: public static DatastoreMediator getInstance() {
85: return INSTANCE;
86: }
87:
88: /**
89: * Sets the {@link MessageStoreFacade}.
90: *
91: * @param messageStoreFacade The {@link MessageStoreFacade}.
92: * @since 1.0.0
93: */
94: public void setMessageStoreFacade(MessageStoreFacade messageStoreFacade) {
95: this.messageStoreFacade = messageStoreFacade;
96: }
97:
98: /**
99: * Sets the {@link ClientInfoRegistryFacade}.
100: *
101: * @param clientInfoRegistryFacade The {@link ClientInfoRegistryFacade}.
102: * @since 1.0.0
103: */
104: public void setClientInfoStoreFacade(ClientInfoRegistryFacade clientInfoRegistryFacade) {
105: this.clientInfoRegistryFacade = clientInfoRegistryFacade;
106: }
107:
108: /**
109: * Sets the {@link ChannelInfoRegistryFacade}.
110: *
111: * @param channelInfoStoreFacade The {@link ChannelInfoRegistryFacade}.
112: * @since 1.0.0
113: */
114: public void setChannelInfoStoreFacade(ChannelInfoRegistryFacade channelInfoStoreFacade) {
115: this.channelInfoStoreFacade = channelInfoStoreFacade;
116: }
117:
118: /**
119: * Sets the {@link MetricInfoRegistryFacade}.
120: *
121: * @param metricInfoStoreFacade The {@link MetricInfoRegistryFacade}.
122: * @since 1.0.0
123: */
124: public void setMetricInfoStoreFacade(MetricInfoRegistryFacade metricInfoStoreFacade) {
125: this.metricInfoStoreFacade = metricInfoStoreFacade;
126: }
127:
128: //
129: // Message Store Mediator methods
130: //
131:
132: @Override
133: public Metadata getMetadata(KapuaId scopeId, long indexedOn) throws ClientException, MappingException {
134: return esSchema.synch(scopeId, indexedOn);
135: }
136:
137: @Override
138: public void onUpdatedMappings(KapuaId scopeId, long indexedOn, Map<String, Metric> metrics) throws ClientException, MappingException {
139: esSchema.updateMessageMappings(scopeId, indexedOn, metrics);
140: }
141:
142: @Override
143: public void onAfterMessageStore(MessageInfo messageInfo, DatastoreMessage message)
144: throws KapuaIllegalArgumentException,
145: ConfigurationException,
146: MappingException,
147: ClientException {
148:
149: // convert semantic channel to String
150:• String semanticChannel = message.getChannel() != null ? message.getChannel().toString() : "";
151:
152: ClientInfoImpl clientInfo = new ClientInfoImpl(message.getScopeId());
153: clientInfo.setClientId(message.getClientId());
154: clientInfo.setId(STORABLE_ID_FACTORY.newStorableId(ClientInfoField.getOrDeriveId(null, message.getScopeId(), message.getClientId())));
155: clientInfo.setFirstMessageId(message.getDatastoreId());
156: clientInfo.setFirstMessageOn(message.getTimestamp());
157: clientInfoRegistryFacade.upstore(clientInfo);
158:
159: ChannelInfoImpl channelInfo = new ChannelInfoImpl(message.getScopeId());
160: channelInfo.setClientId(message.getClientId());
161: channelInfo.setName(semanticChannel);
162: channelInfo.setFirstMessageId(message.getDatastoreId());
163: channelInfo.setFirstMessageOn(message.getTimestamp());
164: channelInfo.setId(STORABLE_ID_FACTORY.newStorableId(ChannelInfoField.getOrDeriveId(null, channelInfo)));
165: channelInfoStoreFacade.upstore(channelInfo);
166:
167: KapuaPayload payload = message.getPayload();
168:• if (payload == null) {
169: return;
170: }
171:
172: Map<String, Object> metrics = payload.getMetrics();
173:• if (metrics == null) {
174: return;
175: }
176:
177: int i = 0;
178: MetricInfoImpl[] messageMetrics = new MetricInfoImpl[metrics.size()];
179:• for (Map.Entry<String, Object> entry : metrics.entrySet()) {
180: MetricInfoImpl metricInfo = new MetricInfoImpl(message.getScopeId());
181: metricInfo.setClientId(message.getClientId());
182: metricInfo.setChannel(semanticChannel);
183: metricInfo.setName(entry.getKey());
184: metricInfo.setMetricType(entry.getValue().getClass());
185: metricInfo.setId(STORABLE_ID_FACTORY.newStorableId(MetricInfoField.getOrDeriveId(null, metricInfo)));
186: metricInfo.setFirstMessageId(message.getDatastoreId());
187: metricInfo.setFirstMessageOn(message.getTimestamp());
188: messageMetrics[i++] = metricInfo;
189: }
190:
191: metricInfoStoreFacade.upstore(messageMetrics);
192: }
193:
194: /*
195: *
196: * ClientInfo Store Mediator methods
197: */
198: @Override
199: public void onAfterClientInfoDelete(KapuaId scopeId, ClientInfo clientInfo) {
200: // nothing to do at the present
201: // the datastore coherence will be guarantee by a periodic task that will scan the datastore looking for a no more referenced info registry record
202: // otherwise the computational cost for each delete operation will be too high
203: }
204:
205: /*
206: * ChannelInfo Store Mediator methods
207: */
208: @Override
209: public void onBeforeChannelInfoDelete(ChannelInfo channelInfo) {
210: // nothing to do at the present
211: // the datastore coherence will be guarantee by a periodic task that will scan the datastore looking for a no more referenced info registry record
212: // otherwise the computational cost for each delete operation will be too high
213: }
214:
215: @Override
216: public void onAfterChannelInfoDelete(ChannelInfo channelInfo) {
217: // nothing to do at the present
218: // the datastore coherence will be guarantee by a periodic task that will scan the datastore looking for a no more referenced info registry record
219: // otherwise the computational cost for each delete operation will be too high
220: }
221:
222: /*
223: *
224: * MetricInfo Store Mediator methods
225: */
226: @Override
227: public void onAfterMetricInfoDelete(KapuaId scopeId, MetricInfo metricInfo) {
228: // nothing to do at the present
229: // the datastore coherence will be guarantee by a periodic task that will scan the datastore looking for a no more referenced info registry record
230: // otherwise the computational cost for each delete operation will be too high
231: }
232:
233: public void refreshAllIndexes() throws ClientException {
234: messageStoreFacade.refreshAllIndexes();
235: }
236:
237: public void deleteAllIndexes() throws ClientException {
238: messageStoreFacade.deleteAllIndexes();
239: clearCache();
240: }
241:
242: public void deleteIndexes(String indexExp) throws ClientException {
243: messageStoreFacade.deleteIndexes(indexExp);
244: clearCache();
245: }
246:
247: public void clearCache() {
248: DatastoreCacheManager.getInstance().getChannelsCache().invalidateAll();
249: DatastoreCacheManager.getInstance().getClientsCache().invalidateAll();
250: DatastoreCacheManager.getInstance().getMetricsCache().invalidateAll();
251: DatastoreCacheManager.getInstance().getMetadataCache().invalidateAll();
252: }
253:
254: }