Skip to content

Package: Es7Migration

Es7Migration

nameinstructionbranchcomplexitylinemethod
Es7Migration(EsClientWrapper, EsClusterDescriptor, Map)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
getNewDataMessageIndexName(String, String)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getNewDataRegistryIndexName(String, String)
M: 9 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
isOldMessageIndexNaming(String, String, String)
M: 29 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%
lambda$migrateAccountIndices$0(Map.Entry)
M: 249 C: 0
0%
M: 7 C: 0
0%
M: 5 C: 0
0%
M: 40 C: 0
0%
M: 1 C: 0
0%
migrateAccountIndices(String)
M: 19 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
populateMessageIndices(String, String)
M: 155 C: 0
0%
M: 10 C: 0
0%
M: 6 C: 0
0%
M: 15 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.extras.esmigrator;
14:
15: import java.io.IOException;
16: import java.util.HashMap;
17: import java.util.Iterator;
18: import java.util.Map;
19: import java.util.regex.Pattern;
20:
21: import com.fasterxml.jackson.databind.JsonNode;
22: import org.apache.commons.lang3.StringUtils;
23: import org.elasticsearch.ElasticsearchException;
24: import org.slf4j.Logger;
25: import org.slf4j.LoggerFactory;
26:
27: public class Es7Migration {
28:
29: private static final Logger LOGGER = LoggerFactory.getLogger(Es7Migration.class.getName());
30:
31: private static final String DOC = "_doc";
32:
33: private final EsClientWrapper client;
34: private final EsClusterDescriptor esClusterDescriptor;
35: private final Map<String, String> migrationReport;
36:
37: private static final String CHANNEL_SUFFIX = "-channel";
38: private static final String CLIENT_SUFFIX = "-client";
39: private static final String METRIC_SUFFIX = "-metric";
40:
41: public Es7Migration(EsClientWrapper client, EsClusterDescriptor esClusterDescriptor, Map<String, String> migrationReport) {
42: this.client = client;
43: this.esClusterDescriptor = esClusterDescriptor;
44: this.migrationReport = migrationReport;
45: }
46:
47: public void migrateAccountIndices(String accountId) throws IOException, ElasticsearchException {
48: String indicesPrefix = esClusterDescriptor.getIndicesPrefix();
49: LOGGER.debug("Migrating indices for account id {}", accountId);
50: Map<String, String> indicesToProcess = populateMessageIndices(accountId, indicesPrefix);
51:
52: indicesToProcess.entrySet().forEach((index -> {
53: String srcIndex = index.getKey();
54: String destIndex = index.getValue();
55: try {
56:• if (client.indicesExists(index.getKey())) {
57: LOGGER.debug("Found index {}, renaming to {}", srcIndex, destIndex);
58: Map<String, Object> mappingMetaData = client.getMappings(srcIndex).mappings().get(srcIndex).getSourceAsMap();
59: client.createIndex(destIndex, mappingMetaData);
60: ReindexTaskResult reindexTaskResult = client.reindexWithTask(srcIndex, DOC, destIndex, DOC);
61:• if (!reindexTaskResult.isSuccessful()) {
62: migrationReport.put(srcIndex, String.format("Error while renaming Index %s to %s: %s", srcIndex, destIndex, reindexTaskResult.getMessage()));
63: return;
64: }
65: LOGGER.debug("Reindex complete. Deleting old index");
66:• switch(esClusterDescriptor.getAction()) {
67: case DELETE:
68: try {
69: client.deleteIndex(srcIndex);
70: LOGGER.debug("Old index {} deleted", srcIndex);
71: migrationReport.put(srcIndex, String.format("Index %s successfully renamed to %s", srcIndex, destIndex));
72: } catch (IOException | ElasticsearchException exception) {
73: String message = MigratorUtils.getExceptionMessageOrName(exception);
74: LOGGER.warn("Unable to delete old index {}: {}", srcIndex, message);
75: migrationReport.put(srcIndex, String.format("Index %s successfully renamed to %s, but old index could not be deleted", srcIndex, destIndex));
76: }
77: break;
78: case CLOSE:
79: try {
80: client.closeIndex(srcIndex);
81: LOGGER.debug("Old index {} closed", srcIndex);
82: migrationReport.put(srcIndex, String.format("Index %s successfully renamed to %s", srcIndex, destIndex));
83: } catch (IOException | ElasticsearchException exception) {
84: String message = MigratorUtils.getExceptionMessageOrName(exception);
85: LOGGER.warn("Unable to close old index {}: {}", srcIndex, message);
86: migrationReport.put(srcIndex, String.format("Index %s successfully renamed to %s, but old index could not be closed", srcIndex, destIndex));
87: }
88: break;
89: default:
90: LOGGER.debug("No action on index {} after completing migration", index);
91: break;
92: }
93: } else {
94: migrationReport.put(srcIndex, String.format("Index %s not existing. Skipping...", srcIndex));
95: LOGGER.warn("Index {} not existing. Skipping...", srcIndex);
96: }
97: } catch (IOException | ElasticsearchException exception) {
98: String message = MigratorUtils.getExceptionMessageOrName(exception);
99: LOGGER.error("Error migrating index {}: {}", srcIndex, message);
100: migrationReport.put(srcIndex, String.format("Error while renaming Index %s to %s: %s", srcIndex, destIndex, message));
101: }
102: }));
103: }
104:
105: private Map<String, String> populateMessageIndices(String accountId, String indicesPrefix) throws IOException, ElasticsearchException {
106: Map<String, String> indicesToProcess = new HashMap<>();
107:• String oldDataIndexRadix = (StringUtils.isNotBlank(indicesPrefix) ? indicesPrefix + "-" : "") + accountId;
108: JsonNode indices = client.getIndices(oldDataIndexRadix + "-*");
109: Iterator<String> indicesIterator = indices.fieldNames();
110:• while (indicesIterator.hasNext()) {
111: String indexName = indicesIterator.next();
112:• if (isOldMessageIndexNaming(accountId, indexName, indicesPrefix)) {
113: indicesToProcess.put(indexName, getNewDataMessageIndexName(indexName, oldDataIndexRadix));
114: }
115: }
116:• String oldRegistryIndexRadix = (StringUtils.isNotBlank(indicesPrefix) ? indicesPrefix + "-" : "") + "." + accountId;
117:• String newRegistryIndexRadix = (StringUtils.isNotBlank(indicesPrefix) ? indicesPrefix + "-" : "") + accountId + "-data";
118: indicesToProcess.put(oldRegistryIndexRadix + CHANNEL_SUFFIX, getNewDataRegistryIndexName(newRegistryIndexRadix, CHANNEL_SUFFIX));
119: indicesToProcess.put(oldRegistryIndexRadix + CLIENT_SUFFIX, getNewDataRegistryIndexName(newRegistryIndexRadix, CLIENT_SUFFIX));
120: indicesToProcess.put(oldRegistryIndexRadix + METRIC_SUFFIX, getNewDataRegistryIndexName(newRegistryIndexRadix, METRIC_SUFFIX));
121: return indicesToProcess;
122: }
123:
124: boolean isOldMessageIndexNaming(String accountId, String indexName, String prefix) {
125: StringBuilder regexBuilder = new StringBuilder("^");
126:• if (StringUtils.isNotBlank(prefix)) {
127: regexBuilder.append(prefix).append("-");
128: }
129: regexBuilder.append(accountId).append("-\\d{4}-\\d{2}(-\\d{2}){0,2}");
130: Pattern pattern = Pattern.compile(regexBuilder.toString());
131: return pattern.matcher(indexName).matches();
132: }
133:
134: String getNewDataMessageIndexName(String indexName, String oldDataIndexRadix) {
135: return indexName.replaceFirst(oldDataIndexRadix, oldDataIndexRadix + "-data-message");
136: }
137:
138: String getNewDataRegistryIndexName(String newRegistryIndexRadix, String suffix) {
139: return newRegistryIndexRadix + suffix;
140: }
141:
142: }