Skip to content

Package: EsClientWrapper

EsClientWrapper

nameinstructionbranchcomplexitylinemethod
EsClientWrapper(EsClusterDescriptor, int, int, int, boolean)
M: 43 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 14 C: 0
0%
M: 1 C: 0
0%
close()
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
closeIndex(String)
M: 18 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
count(String)
M: 17 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
createIndex(String, Map)
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
createIndex(String, String)
M: 78 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 17 C: 0
0%
M: 1 C: 0
0%
deleteIndex(String)
M: 17 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
getIndices(String)
M: 29 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
getMappings(String[])
M: 13 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
getTaskInfo(String)
M: 27 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
indicesExists(String[])
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
info()
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
isIndexOpen(String)
M: 38 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
lambda$indicesExists$5(String)
M: 24 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
lambda$new$0(String, String)
M: 16 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
lambda$new$1(int)
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$new$3(EsClusterDescriptor, boolean, HttpAsyncClientBuilder)
M: 31 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
lambda$new$4(int, RequestConfig.Builder)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$null$2(String, SSLSession)
M: 2 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
reindexWithTask(String, String, String, String)
M: 267 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 47 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 5 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
submitReindexTask(String, String, String, String)
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
submitReindexTask(String, String, String, String, Script)
M: 41 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.extras.esmigrator;
14:
15: import java.io.IOException;
16: import java.util.Arrays;
17: import java.util.Map;
18:
19: import com.fasterxml.jackson.databind.JsonNode;
20: import com.fasterxml.jackson.databind.node.ArrayNode;
21: import org.apache.commons.lang3.StringUtils;
22: import org.apache.http.HttpHost;
23: import org.apache.http.auth.AuthScope;
24: import org.apache.http.auth.UsernamePasswordCredentials;
25: import org.apache.http.client.CredentialsProvider;
26: import org.apache.http.client.methods.HttpGet;
27: import org.apache.http.impl.client.BasicCredentialsProvider;
28: import org.apache.http.util.EntityUtils;
29: import org.elasticsearch.ElasticsearchException;
30: import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
31: import org.elasticsearch.client.Request;
32: import org.elasticsearch.client.RequestOptions;
33: import org.elasticsearch.client.Response;
34: import org.elasticsearch.client.RestClient;
35: import org.elasticsearch.client.RestClientBuilder;
36: import org.elasticsearch.client.RestHighLevelClient;
37: import org.elasticsearch.client.core.CountRequest;
38: import org.elasticsearch.client.core.MainResponse;
39: import org.elasticsearch.client.indices.CloseIndexRequest;
40: import org.elasticsearch.client.indices.CreateIndexRequest;
41: import org.elasticsearch.client.indices.GetIndexRequest;
42: import org.elasticsearch.client.indices.GetMappingsRequest;
43: import org.elasticsearch.client.indices.GetMappingsResponse;
44: import org.elasticsearch.common.settings.Settings;
45: import org.elasticsearch.common.xcontent.XContentType;
46: import org.elasticsearch.index.reindex.ReindexRequest;
47: import org.elasticsearch.script.Script;
48: import org.slf4j.Logger;
49: import org.slf4j.LoggerFactory;
50:
51: public class EsClientWrapper implements AutoCloseable {
52:
53: private final RestHighLevelClient client;
54: private final int batchSize;
55: private final int taskPollingInterval;
56:
57: private static final Logger LOGGER = LoggerFactory.getLogger(EsClientWrapper.class.getName());
58:
59: private static final String DELETING_INDEX = "Deleting index {}";
60: private static final String REINDEXING = "Reindexing {} to {}";
61:
62: private static final String INDEX_NUMBER_OF_SHARDS = "index.number_of_shards";
63: private static final String INDEX_NUMBER_OF_REPLICAS = "index.number_of_replicas";
64: private static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
65:
66: private final EsClusterDescriptor esClusterDescriptor;
67:
68: public EsClientWrapper(EsClusterDescriptor esClusterDescriptor, int esSocketTimeout, int batchSize, int taskPollingInterval, boolean esIgnoreSslCertificate) {
69: this.esClusterDescriptor = esClusterDescriptor;
70:• String scheme = esClusterDescriptor.isEsClusterSsl() ? "https://" : "http://";
71: RestClientBuilder restClientBuilder = RestClient
72: .builder(esClusterDescriptor.getEsClusterNodes()
73: .stream()
74: .map(clusterNodeAddress -> {
75: String clusterNodeFullAddress = scheme + clusterNodeAddress;
76: LOGGER.debug("{} added to nodes list", clusterNodeFullAddress);
77: return HttpHost.create(clusterNodeFullAddress);
78: })
79: .toArray(HttpHost[]::new))
80: .setHttpClientConfigCallback(builder -> {
81:• if (StringUtils.isNotBlank(esClusterDescriptor.getUsername())) {
82: CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
83: credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esClusterDescriptor.getUsername(),
84: esClusterDescriptor.getPassword()));
85: builder.disableAuthCaching().setDefaultCredentialsProvider(credentialsProvider);
86: }
87:• if (esIgnoreSslCertificate) {
88: builder.setSSLHostnameVerifier((hostname, session) -> true);
89: }
90: return builder;
91: })
92: .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(esSocketTimeout));
93:
94: client = new RestHighLevelClient(restClientBuilder);
95: this.batchSize = batchSize;
96: this.taskPollingInterval = taskPollingInterval;
97: }
98:
99: @Override
100: public void close() throws IOException {
101: client.close();
102: }
103:
104: public MainResponse info() throws IOException {
105: return client.info(RequestOptions.DEFAULT);
106: }
107:
108: public boolean indicesExists(String... indicesNames) throws IOException, ElasticsearchException {
109: return Arrays.stream(indicesNames).anyMatch(index -> {
110: try {
111: return client.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT);
112: } catch (IOException | ElasticsearchException exception) {
113: LOGGER.warn("Error while checking index {} existence: {}", index, MigratorUtils.getExceptionMessageOrName(exception));
114: return false;
115: }
116: });
117: }
118:
119: public JsonNode getIndices(String indexQuery) throws IOException, ElasticsearchException {
120: Request indicesRequest = new Request(HttpGet.METHOD_NAME, "/" + indexQuery + "?format=json&include_type_name=true");
121: Response indicesResponse = client.getLowLevelClient().performRequest(indicesRequest);
122: return MigratorUtils.getObjectMapper().readValue(EntityUtils.toString(indicesResponse.getEntity()), JsonNode.class);
123: }
124:
125: public void createIndex(String index, Map<String, Object> sourceMap) throws IOException, ElasticsearchException {
126: createIndex(index, MigratorUtils.getObjectMapper().writer().writeValueAsString(sourceMap));
127: }
128:
129: public void createIndex(String index, String mapping) throws IOException, ElasticsearchException {
130: int indexesShardNumber = esClusterDescriptor.getIndicesShardNumber();
131: int indexesReplicaNumber = esClusterDescriptor.getIndicesReplicaNumber();
132: String indexesRefreshInterval = esClusterDescriptor.getIndicesRefreshInterval();
133: Settings settings = Settings.builder()
134: .put(INDEX_NUMBER_OF_SHARDS, indexesShardNumber)
135: .put(INDEX_NUMBER_OF_REPLICAS, indexesReplicaNumber)
136: .put(INDEX_REFRESH_INTERVAL, indexesRefreshInterval)
137: .build();
138: CreateIndexRequest firstCreateIndexRequest = new CreateIndexRequest(index)
139: .settings(settings)
140: .mapping(mapping, XContentType.JSON);
141: LOGGER.debug("Creating index {} - shards: '{}' - replicas: '{}' - refresh: '{}'", index, indexesShardNumber, indexesReplicaNumber, indexesRefreshInterval);
142: String createIndexRequestBody = firstCreateIndexRequest.mappings().utf8ToString();
143: LOGGER.debug("Creating index {}", index);
144: LOGGER.trace("Creating index {}, body {}", index, createIndexRequestBody);
145:
146: client.indices().create(firstCreateIndexRequest, RequestOptions.DEFAULT);
147: }
148:
149: public void closeIndex(String index) throws IOException, ElasticsearchException {
150: CloseIndexRequest request = new CloseIndexRequest(index);
151: client.indices().close(request, RequestOptions.DEFAULT);
152: }
153:
154: public void deleteIndex(String index) throws IOException, ElasticsearchException {
155: DeleteIndexRequest request = new DeleteIndexRequest(index);
156: LOGGER.debug(DELETING_INDEX, index);
157: client.indices().delete(request, RequestOptions.DEFAULT);
158: }
159:
160: public boolean isIndexOpen(String indexName) throws IOException, ElasticsearchException {
161: Request catIndicesRequest = new Request(HttpGet.METHOD_NAME, "/_cat/indices/" + indexName + "?format=json");
162: Response catIndicesResponse = client.getLowLevelClient().performRequest(catIndicesRequest);
163: ArrayNode catIndicesArray = MigratorUtils.getObjectMapper().readValue(EntityUtils.toString(catIndicesResponse.getEntity()), ArrayNode.class);
164: return catIndicesArray.get(0).get("status").asText().equals("open");
165: }
166:
167: public long count(String index) throws IOException, ElasticsearchException {
168: CountRequest countRequest = new CountRequest(index);
169: return client.count(countRequest, RequestOptions.DEFAULT).getCount();
170: }
171:
172: public ReindexTaskResult reindexWithTask(String sourceIndex, String sourceDocType, String destIndex, String destDocType) {
173: LOGGER.debug(REINDEXING, sourceIndex, destIndex);
174: String taskId;
175: try {
176: taskId = submitReindexTask(sourceIndex, sourceDocType, destIndex, destDocType);
177: } catch (IOException | ElasticsearchException exception) {
178: String message = String.format("Error submitting reindex task from index %s to index %s: %s", sourceIndex, destIndex, MigratorUtils.getExceptionMessageOrName(exception));
179: LOGGER.warn(message);
180: try {
181: deleteIndex(destIndex);
182: } catch (IOException | ElasticsearchException innerExcetpion) {
183: LOGGER.warn("Error deleting index {}: {}", destIndex, MigratorUtils.getExceptionMessageOrName(exception));
184: }
185: return new ReindexTaskResult(false, message);
186: }
187:
188: LOGGER.debug("Reindex task from index {} to index {} submitted, task id: {}", sourceIndex, destIndex, taskId);
189:
190: boolean reindexComplete = false;
191:• while (!reindexComplete) {
192: try {
193: JsonNode getTaskResponseJsonNode = getTaskInfo(taskId);
194:• if (getTaskResponseJsonNode.get("completed").asBoolean()) {
195: String message = String.format("Reindex task %s completed, status: %n%s", taskId, getTaskResponseJsonNode.toPrettyString());
196: LOGGER.trace(message);
197: reindexComplete = true;
198: ArrayNode failuresNode = getTaskResponseJsonNode.get("response").withArray("failures");
199:• if (failuresNode.size() > 0) {
200: StringBuilder errorMessageStringBuilder = new StringBuilder(String.format("Error during reindex from index %s to index %s. Failures: ", sourceIndex, destIndex));
201:• for (JsonNode failureNode : failuresNode) {
202: JsonNode causeNode = failureNode.get("cause");
203: errorMessageStringBuilder.append(causeNode.get("type").asText()).append(": ").append(causeNode.get("reason").asText()).append("; ");
204: }
205: errorMessageStringBuilder.delete(errorMessageStringBuilder.lastIndexOf("; "), errorMessageStringBuilder.length());
206: String errorMessage = errorMessageStringBuilder.toString();
207: LOGGER.warn(errorMessage);
208: deleteIndex(destIndex);
209: return new ReindexTaskResult(false, message);
210: }
211: } else {
212: String message = String.format("Reindex task %s in progress, current status: %n%s", taskId, getTaskResponseJsonNode.toPrettyString());
213: LOGGER.trace(message);
214: Thread.sleep(taskPollingInterval);
215: }
216: } catch (InterruptedException interruptedException) {
217: String errorMessage = String.format("Error while waiting for Task %s to complete: %s", taskId, interruptedException.getMessage());
218: LOGGER.warn(errorMessage, errorMessage);
219: Thread.currentThread().interrupt();
220: return new ReindexTaskResult(false, errorMessage);
221: } catch (IOException | ElasticsearchException exception) {
222: String errorMessage = String.format("Error during reindex task %s management: %s", taskId, MigratorUtils.getExceptionMessageOrName(exception));
223: LOGGER.warn(errorMessage, errorMessage);
224: return new ReindexTaskResult(false, errorMessage);
225: }
226: }
227: LOGGER.debug("Reindex complete: {} on {}", sourceIndex, destIndex);
228: return new ReindexTaskResult(true, null);
229: }
230:
231: public String submitReindexTask(String sourceIndex, String sourceDocType, String destIndex, String destDocType) throws IOException, ElasticsearchException {
232: return submitReindexTask(sourceIndex, sourceDocType, destIndex, destDocType, null);
233: }
234:
235: public String submitReindexTask(String sourceIndex, String sourceDocType, String destIndex, String destDocType, Script script) throws IOException, ElasticsearchException {
236: ReindexRequest reindexRequest = new ReindexRequest()
237: .setSourceBatchSize(batchSize)
238: .setSourceIndices(sourceIndex)
239: .setSourceDocTypes(sourceDocType)
240: .setDestIndex(destIndex)
241: .setDestDocType(destDocType)
242: .setRefresh(true);
243:• if (script != null) {
244: reindexRequest.setScript(script);
245: }
246: return client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask();
247: }
248:
249: public JsonNode getTaskInfo(String taskId) throws IOException, ElasticsearchException {
250: Request getTaskRequest = new Request(HttpGet.METHOD_NAME, String.format("/_tasks/%s", taskId));
251: Response getTaskResponse = client.getLowLevelClient().performRequest(getTaskRequest);
252: return MigratorUtils.getObjectMapper().readValue(EntityUtils.toString(getTaskResponse.getEntity()), JsonNode.class);
253: }
254:
255: public GetMappingsResponse getMappings(String... index) throws IOException, ElasticsearchException {
256: GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(index);
257: return client.indices().getMapping(getMappingsRequest, RequestOptions.DEFAULT);
258: }
259:
260: }