Skip to content

Package: Schema

Schema

nameinstructionbranchcomplexitylinemethod
Schema()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getMappingSchema(String)
M: 89 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
getMessageMappingDiffs(Metadata, Map)
M: 51 C: 0
0%
M: 10 C: 0
0%
M: 6 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
getNewMessageMappingsBuilder(Map)
M: 141 C: 0
0%
M: 7 C: 0
0%
M: 5 C: 0
0%
M: 24 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
synch(KapuaId, long)
M: 195 C: 0
0%
M: 10 C: 0
0%
M: 6 C: 0
0%
M: 41 C: 0
0%
M: 1 C: 0
0%
updateMessageMappings(KapuaId, long, Map)
M: 79 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 19 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.service.datastore.internal.schema;
14:
15: import com.fasterxml.jackson.databind.node.ObjectNode;
16: import org.eclipse.kapua.KapuaException;
17: import org.eclipse.kapua.model.id.KapuaId;
18: import org.eclipse.kapua.service.datastore.internal.DatastoreCacheManager;
19: import org.eclipse.kapua.service.datastore.internal.client.DatastoreClientFactory;
20: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreUtils;
21: import org.eclipse.kapua.service.datastore.internal.mediator.Metric;
22: import org.eclipse.kapua.service.datastore.internal.setting.DatastoreSettings;
23: import org.eclipse.kapua.service.datastore.internal.setting.DatastoreSettingsKey;
24: import org.eclipse.kapua.service.elasticsearch.client.ElasticsearchClient;
25: import org.eclipse.kapua.service.elasticsearch.client.SchemaKeys;
26: import org.eclipse.kapua.service.elasticsearch.client.exception.ClientErrorCodes;
27: import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
28: import org.eclipse.kapua.service.elasticsearch.client.exception.DatamodelMappingException;
29: import org.eclipse.kapua.service.elasticsearch.client.model.IndexRequest;
30: import org.eclipse.kapua.service.elasticsearch.client.model.IndexResponse;
31: import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
32: import org.eclipse.kapua.service.storable.exception.MappingException;
33: import org.eclipse.kapua.service.storable.model.utils.KeyValueEntry;
34: import org.eclipse.kapua.service.storable.model.utils.MappingUtils;
35:
36: import org.slf4j.Logger;
37: import org.slf4j.LoggerFactory;
38:
39: import java.util.HashMap;
40: import java.util.Map;
41: import java.util.Map.Entry;
42:
43: /**
44: * Datastore schema creation/update
45: *
46: * @since 1.0.0
47: */
48: public class Schema {
49:
50: private static final Logger LOG = LoggerFactory.getLogger(Schema.class);
51:
52: /**
53: * Synchronize metadata
54: *
55: * @param scopeId
56: * @param time
57: * @return
58: * @throws ClientException
59: * @since 1.0.0
60: */
61: public Metadata synch(KapuaId scopeId, long time) throws ClientException, MappingException {
62: String dataIndexName;
63: try {
64: String indexingWindowOption = DatastoreSettings.getInstance().getString(DatastoreSettingsKey.INDEXING_WINDOW_OPTION, DatastoreUtils.INDEXING_WINDOW_OPTION_WEEK);
65: dataIndexName = DatastoreUtils.getDataIndexName(scopeId, time, indexingWindowOption);
66: } catch (KapuaException kaex) {
67: throw new ClientException(ClientErrorCodes.INTERNAL_ERROR, kaex, "Error while generating index name");
68: }
69:
70: Metadata currentMetadata = DatastoreCacheManager.getInstance().getMetadataCache().get(dataIndexName);
71:• if (currentMetadata != null) {
72: return currentMetadata;
73: }
74:
75: LOG.debug("Before entering updating metadata");
76: synchronized (Schema.class) {
77: LOG.debug("Entered updating metadata");
78: ElasticsearchClient<?> elasticsearchClient = DatastoreClientFactory.getInstance().getElasticsearchClient();
79: // Check existence of the data index
80: IndexResponse dataIndexExistsResponse = elasticsearchClient.isIndexExists(new IndexRequest(dataIndexName));
81:• if (!dataIndexExistsResponse.isIndexExists()) {
82: elasticsearchClient.createIndex(dataIndexName, getMappingSchema(dataIndexName));
83: LOG.info("Data index created: {}", dataIndexName);
84: }
85:
86: boolean enableSourceField = true;
87:
88: elasticsearchClient.putMapping(new TypeDescriptor(dataIndexName, MessageSchema.MESSAGE_TYPE_NAME), MessageSchema.getMesageTypeSchema(enableSourceField));
89:
90: // Check existence of the kapua internal indexes
91: String channelRegistryIndexName = DatastoreUtils.getChannelIndexName(scopeId);
92: IndexResponse channelRegistryIndexExistsResponse = elasticsearchClient.isIndexExists(new IndexRequest(channelRegistryIndexName));
93:• if (!channelRegistryIndexExistsResponse.isIndexExists()) {
94: elasticsearchClient.createIndex(channelRegistryIndexName, getMappingSchema(channelRegistryIndexName));
95: LOG.info("Channel Metadata index created: {}", channelRegistryIndexExistsResponse);
96:
97: elasticsearchClient.putMapping(new TypeDescriptor(channelRegistryIndexName, ChannelInfoSchema.CHANNEL_TYPE_NAME), ChannelInfoSchema.getChannelTypeSchema(enableSourceField));
98: }
99:
100: String clientRegistryIndexName = DatastoreUtils.getClientIndexName(scopeId);
101: IndexResponse clientRegistryIndexExistsResponse = elasticsearchClient.isIndexExists(new IndexRequest(clientRegistryIndexName));
102:• if (!clientRegistryIndexExistsResponse.isIndexExists()) {
103: elasticsearchClient.createIndex(clientRegistryIndexName, getMappingSchema(clientRegistryIndexName));
104: LOG.info("Client Metadata index created: {}", clientRegistryIndexExistsResponse);
105:
106: elasticsearchClient.putMapping(new TypeDescriptor(clientRegistryIndexName, ClientInfoSchema.CLIENT_TYPE_NAME), ClientInfoSchema.getClientTypeSchema(enableSourceField));
107: }
108:
109: String metricRegistryIndexName = DatastoreUtils.getMetricIndexName(scopeId);
110: IndexResponse metricRegistryIndexExistsResponse = elasticsearchClient.isIndexExists(new IndexRequest(metricRegistryIndexName));
111:• if (!metricRegistryIndexExistsResponse.isIndexExists()) {
112: elasticsearchClient.createIndex(metricRegistryIndexName, getMappingSchema(metricRegistryIndexName));
113: LOG.info("Metric Metadata index created: {}", metricRegistryIndexExistsResponse);
114:
115: elasticsearchClient.putMapping(new TypeDescriptor(metricRegistryIndexName, MetricInfoSchema.METRIC_TYPE_NAME), MetricInfoSchema.getMetricTypeSchema(enableSourceField));
116: }
117:
118: currentMetadata = new Metadata(dataIndexName, channelRegistryIndexName, clientRegistryIndexName, metricRegistryIndexName);
119: LOG.debug("Leaving updating metadata");
120: }
121:
122: // Current metadata can only increase the custom mappings
123: // other fields does not change within the same account id
124: // and custom mappings are not and must not be exposed to
125: // outside this class to preserve thread safetyness
126: DatastoreCacheManager.getInstance().getMetadataCache().put(dataIndexName, currentMetadata);
127:
128: return currentMetadata;
129: }
130:
131: /**
132: * Update metric mappings
133: *
134: * @param scopeId
135: * @param time
136: * @param metrics
137: * @throws ClientException
138: * @since 1.0.0
139: */
140: public void updateMessageMappings(KapuaId scopeId, long time, Map<String, Metric> metrics)
141: throws ClientException, MappingException {
142:• if (metrics == null || metrics.size() == 0) {
143: return;
144: }
145: String newIndex;
146: try {
147: String indexingWindowOption = DatastoreSettings.getInstance().getString(DatastoreSettingsKey.INDEXING_WINDOW_OPTION, DatastoreUtils.INDEXING_WINDOW_OPTION_WEEK);
148: newIndex = DatastoreUtils.getDataIndexName(scopeId, time, indexingWindowOption);
149: } catch (KapuaException kaex) {
150: throw new ClientException(ClientErrorCodes.INTERNAL_ERROR, kaex, "Error while generating index name");
151: }
152: Metadata currentMetadata = DatastoreCacheManager.getInstance().getMetadataCache().get(newIndex);
153:
154: ObjectNode metricsMapping = null;
155: Map<String, Metric> diffs = null;
156:
157: synchronized (Schema.class) {
158: // Update mappings only if a metric is new (not in cache)
159: diffs = getMessageMappingDiffs(currentMetadata, metrics);
160:• if (diffs == null || diffs.isEmpty()) {
161: return;
162: }
163: metricsMapping = getNewMessageMappingsBuilder(diffs);
164: }
165:
166: LOG.trace("Sending dynamic message mappings: {}", metricsMapping);
167: DatastoreClientFactory.getInstance().getElasticsearchClient().putMapping(new TypeDescriptor(currentMetadata.getDataIndexName(), MessageSchema.MESSAGE_TYPE_NAME), metricsMapping);
168: }
169:
170: /**
171: * @param esMetrics
172: * @return
173: * @throws DatamodelMappingException
174: * @throws KapuaException
175: * @since 1.0.0
176: */
177: private ObjectNode getNewMessageMappingsBuilder(Map<String, Metric> esMetrics) throws MappingException {
178:• if (esMetrics == null) {
179: return null;
180: }
181: // metrics mapping container (to be added to message mapping)
182: ObjectNode typeNode = MappingUtils.newObjectNode(); // root
183: ObjectNode typePropertiesNode = MappingUtils.newObjectNode(); // properties
184: ObjectNode metricsNode = MappingUtils.newObjectNode(); // metrics
185: ObjectNode metricsPropertiesNode = MappingUtils.newObjectNode(); // properties (metric properties)
186: typeNode.set(SchemaKeys.FIELD_NAME_PROPERTIES, typePropertiesNode);
187: typePropertiesNode.set(SchemaKeys.FIELD_NAME_METRICS, metricsNode);
188: metricsNode.set(SchemaKeys.FIELD_NAME_PROPERTIES, metricsPropertiesNode);
189:
190: // metrics mapping
191: ObjectNode metricMapping;
192:• for (Entry<String, Metric> esMetric : esMetrics.entrySet()) {
193: Metric metric = esMetric.getValue();
194: metricMapping = MappingUtils.newObjectNode(new KeyValueEntry[]{new KeyValueEntry(SchemaKeys.KEY_DYNAMIC, SchemaKeys.VALUE_TRUE)});
195:
196: ObjectNode metricMappingPropertiesNode = MappingUtils.newObjectNode(); // properties (inside metric name)
197: ObjectNode valueMappingNode;
198:
199:• switch (metric.getType()) {
200: case SchemaKeys.TYPE_STRING:
201: valueMappingNode = MappingUtils.newObjectNode(new KeyValueEntry[]{new KeyValueEntry(SchemaKeys.KEY_TYPE, SchemaKeys.TYPE_KEYWORD), new KeyValueEntry(SchemaKeys.KEY_INDEX, SchemaKeys.VALUE_TRUE)});
202: break;
203: case SchemaKeys.TYPE_DATE:
204: valueMappingNode = MappingUtils.newObjectNode(
205: new KeyValueEntry[]{new KeyValueEntry(SchemaKeys.KEY_TYPE, SchemaKeys.TYPE_DATE), new KeyValueEntry(SchemaKeys.KEY_FORMAT, DatastoreUtils.DATASTORE_DATE_FORMAT)});
206: break;
207: default:
208: valueMappingNode = MappingUtils.newObjectNode(new KeyValueEntry[]{new KeyValueEntry(SchemaKeys.KEY_TYPE, metric.getType())});
209: break;
210: }
211:
212: metricMappingPropertiesNode.set(DatastoreUtils.getClientMetricFromAcronym(metric.getType()), valueMappingNode);
213: metricMapping.set(SchemaKeys.FIELD_NAME_PROPERTIES, metricMappingPropertiesNode);
214: metricsPropertiesNode.set(metric.getName(), metricMapping);
215: }
216: return typeNode;
217: }
218:
219: /**
220: * @param currentMetadata
221: * @param esMetrics
222: * @return
223: * @since 1.0.0
224: */
225: private Map<String, Metric> getMessageMappingDiffs(Metadata currentMetadata, Map<String, Metric> esMetrics) {
226:• if (esMetrics == null || esMetrics.isEmpty()) {
227: return null;
228: }
229:
230: Map<String, Metric> diffs = null;
231:• for (Entry<String, Metric> esMetric : esMetrics.entrySet()) {
232:• if (!currentMetadata.getMessageMappingsCache().containsKey(esMetric.getKey())) {
233:• if (diffs == null) {
234: diffs = new HashMap<>(100);
235: }
236: currentMetadata.getMessageMappingsCache().put(esMetric.getKey(), esMetric.getValue());
237: diffs.put(esMetric.getKey(), esMetric.getValue());
238: }
239: }
240:
241: return diffs;
242: }
243:
244: /**
245: * @param idxName
246: * @return
247: * @throws MappingException
248: * @since 1.0.0
249: */
250: private ObjectNode getMappingSchema(String idxName) throws MappingException {
251: String idxRefreshInterval = String.format("%ss", DatastoreSettings.getInstance().getLong(DatastoreSettingsKey.INDEX_REFRESH_INTERVAL));
252: Integer idxShardNumber = DatastoreSettings.getInstance().getInt(DatastoreSettingsKey.INDEX_SHARD_NUMBER, 1);
253: Integer idxReplicaNumber = DatastoreSettings.getInstance().getInt(DatastoreSettingsKey.INDEX_REPLICA_NUMBER, 0);
254:
255: ObjectNode rootNode = MappingUtils.newObjectNode();
256: ObjectNode settingsNode = MappingUtils.newObjectNode();
257: ObjectNode refreshIntervalNode = MappingUtils.newObjectNode(new KeyValueEntry[]{
258: new KeyValueEntry(SchemaKeys.KEY_REFRESH_INTERVAL, idxRefreshInterval),
259: new KeyValueEntry(SchemaKeys.KEY_SHARD_NUMBER, idxShardNumber),
260: new KeyValueEntry(SchemaKeys.KEY_REPLICA_NUMBER, idxReplicaNumber)});
261: settingsNode.set(SchemaKeys.KEY_INDEX, refreshIntervalNode);
262: rootNode.set(SchemaKeys.KEY_SETTINGS, settingsNode);
263: LOG.info("Creating index for '{}' - refresh: '{}' - shards: '{}' replicas: '{}': ", idxName, idxRefreshInterval, idxShardNumber, idxReplicaNumber);
264: return rootNode;
265: }
266:
267: }