Skip to content

Package: Migrator

Migrator

nameinstructionbranchcomplexitylinemethod
Migrator()
M: 63 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 13 C: 0
0%
M: 1 C: 0
0%
doMigrate()
M: 92 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 24 C: 0
0%
M: 1 C: 0
0%
gatherAccountIds()
M: 47 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 12 C: 0
0%
M: 1 C: 0
0%
printFinalReport(String)
M: 42 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
writeReportToFile()
M: 77 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 15 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2020, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.extras.esmigrator;
14:
15: import java.io.File;
16: import java.io.IOException;
17: import java.io.PrintWriter;
18: import java.sql.Connection;
19: import java.sql.DriverManager;
20: import java.sql.ResultSet;
21: import java.sql.Statement;
22: import java.util.Date;
23: import java.util.HashSet;
24: import java.util.Map;
25: import java.util.Set;
26: import java.util.TreeMap;
27:
28: import org.eclipse.kapua.extras.esmigrator.settings.EsMigratorSetting;
29: import org.eclipse.kapua.extras.esmigrator.settings.EsMigratorSettingKey;
30:
31: import org.elasticsearch.ElasticsearchException;
32: import org.elasticsearch.client.core.MainResponse;
33: import org.slf4j.Logger;
34: import org.slf4j.LoggerFactory;
35:
36: public class Migrator {
37:
38: private static final Logger LOGGER = LoggerFactory.getLogger(Migrator.class.getName());
39:
40: private static final int DEFAULT_ELASTICSEARCH_SOCKET_TIMEOUT = 30000;
41: private static final int DEFAULT_ELASTICSEARCH_BATCH_SIZE = 100;
42: private static final int DEFAULT_ELASTICSEARCH_TASK_POLLING_INTERVAL = 30000;
43: private static final String DEFAULT_JDBC_CONNECTION_STRING = "jdbc:h2:tcp://localhost:3306/kapuadb;schema=kapuadb";
44: private static final String DEFAULT_JDBC_USERNAME = "kapua";
45: private static final String DEFAULT_JDBC_PASSWORD = "kapua";
46:
47: private final EsClusterDescriptor esClusterDescriptor;
48: private final boolean reportToFile;
49: private final String jdbcConnectionString;
50: private final String jdbcUsername;
51: private final String jdbcPassword;
52: private final int esSocketTimeout;
53: private final boolean esIgnoreSslCertificate;
54: private final int batchSize;
55: private final int taskPollingInterval;
56: private final Map<String, String> migrationReport = new TreeMap<>();
57:
58: public Migrator() {
59: EsMigratorSetting esMigratorSetting = EsMigratorSetting.getInstance();
60: esClusterDescriptor = new EsClusterDescriptor();
61: reportToFile = esMigratorSetting.getBoolean(EsMigratorSettingKey.MIGRATOR_REPORT_TO_FILE, false);
62: jdbcConnectionString = esMigratorSetting.getString(EsMigratorSettingKey.MIGRATOR_JDBC_CONNECTION_STRING, DEFAULT_JDBC_CONNECTION_STRING);
63: jdbcUsername = esMigratorSetting.getString(EsMigratorSettingKey.MIGRATOR_JDBC_USERNAME, DEFAULT_JDBC_USERNAME);
64: jdbcPassword = esMigratorSetting.getString(EsMigratorSettingKey.MIGRATOR_JDBC_PASSWORD, DEFAULT_JDBC_PASSWORD);
65: esSocketTimeout = esMigratorSetting.getInt(EsMigratorSettingKey.ELASTICSEARCH_SOCKET_TIMEOUT, DEFAULT_ELASTICSEARCH_SOCKET_TIMEOUT);
66: batchSize = esMigratorSetting.getInt(EsMigratorSettingKey.ELASTICSEARCH_BATCH_SIZE, DEFAULT_ELASTICSEARCH_BATCH_SIZE);
67: taskPollingInterval = esMigratorSetting.getInt(EsMigratorSettingKey.ELASTICSEARCH_TASK_POLLING_INTERVAL, DEFAULT_ELASTICSEARCH_TASK_POLLING_INTERVAL);
68: esIgnoreSslCertificate = esMigratorSetting.getBoolean(EsMigratorSettingKey.ELASTICSEARCH_CLUSTER_SSL_IGNORE_CERTIFICATE, false);
69: }
70:
71: void doMigrate() {
72: Set<String> accountIds = gatherAccountIds();
73:• if (accountIds == null) {
74: LOGGER.error("Unable to gather Account IDs from the DB. Migration failed.");
75: return;
76: }
77: try (EsClientWrapper client = new EsClientWrapper(esClusterDescriptor, esSocketTimeout, batchSize, taskPollingInterval, esIgnoreSslCertificate)) {
78: // Determine ES Version
79: MainResponse mainResponse = client.info();
80: LOGGER.debug("Elasticsearch Version {}", mainResponse.getVersion());
81: String version = mainResponse.getVersion().getNumber();
82:• if (!version.startsWith("7")) {
83: LOGGER.error("This version of the Migration Tool MUST run against an Elasticsearch 7 cluster. Version found: {}", version);
84: } else {
85: Es7Migration migration = new Es7Migration(client, esClusterDescriptor, migrationReport);
86:• for (String accountId : accountIds) {
87: try {
88: migration.migrateAccountIndices(accountId);
89: } catch (IOException | ElasticsearchException exception) {
90: LOGGER.error("Unmanaged Elasticsearch exception in migration steps: {}", MigratorUtils.getExceptionMessageOrName(exception));
91: }
92: }
93: }
94: } catch (IOException | ElasticsearchException exception) {
95: LOGGER.error("Unmanaged Elasticsearch exception in pre-migration steps: {}", MigratorUtils.getExceptionMessageOrName(exception));
96: }
97: printFinalReport("Migration Report");
98:• if (reportToFile) {
99: writeReportToFile();
100: }
101: }
102:
103:
104: private Set<String> gatherAccountIds() {
105: Set<String> accountIds = new HashSet<>();
106: try (Connection connection = DriverManager.getConnection(jdbcConnectionString, jdbcUsername, jdbcPassword)) {
107: try (Statement statement = connection.createStatement()) {
108: String query = "SELECT DISTINCT(id) FROM ACT_ACCOUNT;";
109: ResultSet resultSet = statement.executeQuery(query);
110:• while (resultSet.next()) {
111: accountIds.add(resultSet.getBigDecimal(1).toPlainString());
112: }
113: }
114: } catch (Exception exception) {
115: LOGGER.warn("Unable to gather Account IDs from the DB: {}", exception.getMessage());
116: return null;
117: }
118: return accountIds;
119: }
120:
121: private void printFinalReport(String title) {
122: String separator = "======================";
123: LOGGER.info(separator);
124: LOGGER.info(title);
125: LOGGER.info(separator);
126:• for (Map.Entry<String, String> resultsEntry : migrationReport.entrySet()) {
127: LOGGER.info("index: {} - result: {}", resultsEntry.getKey(), resultsEntry.getValue());
128: }
129: LOGGER.info(separator);
130: LOGGER.info("{} END", title);
131: LOGGER.info(separator);
132: }
133:
134: private void writeReportToFile() {
135: File reportDir;
136: try {
137: reportDir = new File("reports");
138: reportDir.mkdir();
139: } catch (SecurityException securityException) {
140: LOGGER.warn("Unable to create report file", securityException);
141: return;
142: }
143: File reportFile = new File(reportDir, new Date().toString().replace(" ", "_") + ".txt");
144: try (PrintWriter printWriter = new PrintWriter(reportFile)) {
145:• for (Map.Entry<String, String> resultsEntry : migrationReport.entrySet()) {
146: printWriter.println(String.format("index: %s - result: %s", resultsEntry.getKey(), resultsEntry.getValue()));
147: }
148: } catch (IOException ioException) {
149: LOGGER.warn("Unable to write report file", ioException);
150: }
151: }
152:
153: }