Skip to content

Package: EmbeddedBroker

EmbeddedBroker

nameinstructionbranchcomplexitylinemethod
EmbeddedBroker()
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
lambda$stop$0(List)
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
start()
M: 61 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 20 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 39 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%
stop()
M: 54 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 21 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Red Hat Inc and others.
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Red Hat Inc - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.qa.common.utils;
14:
15: import java.time.Duration;
16:
17: import cucumber.api.java.en.Given;
18:
19: import java.util.HashMap;
20: import java.util.List;
21: import java.util.Map;
22: import java.util.UUID;
23:
24: import org.apache.activemq.broker.BrokerFactory;
25: import org.apache.activemq.broker.BrokerService;
26: import org.eclipse.kapua.qa.common.Ports;
27: import org.eclipse.kapua.qa.common.Suppressed;
28: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreMediator;
29: import org.slf4j.Logger;
30: import org.slf4j.LoggerFactory;
31:
32: import cucumber.runtime.java.guice.ScenarioScoped;
33:
34: @ScenarioScoped
35: public class EmbeddedBroker {
36:
37: private static final Logger logger = LoggerFactory.getLogger(EmbeddedBroker.class);
38:
39: private static final String DEFAULT_DATA_DIRECTORY_PREFIX = "target/activemq/" + UUID.randomUUID();
40: private static final String DEFAULT_KAHA_DB_DIRECTORY = DEFAULT_DATA_DIRECTORY_PREFIX + "/kahaDB";
41: private static final String DEFAULT_DATA_DIRECTORY = DEFAULT_DATA_DIRECTORY_PREFIX + "/data";
42: private static final String KAHA_DB_DIRECTORY = "KAHA_DB_DIRECTORY";
43: /**
44: * Embedded broker configuration file from classpath resources.
45: */
46: public static final String ACTIVEMQ_XML = "xbean:activemq.xml";
47:
48: private static final int EXTRA_STARTUP_DELAY = Integer.getInteger("org.eclipse.kapua.qa.broker.extraStartupDelay", 0);
49:
50: private static final boolean NO_EMBEDDED_SERVERS = Boolean.getBoolean("org.eclipse.kapua.qa.noEmbeddedServers");
51:
52: private Map<String, List<AutoCloseable>> closables = new HashMap<>();
53:
54: private static BrokerService broker;
55:
56: public EmbeddedBroker() {
57: }
58:
59: @Given("^Start Broker$")
60: public void start() {
61:
62:• if (NO_EMBEDDED_SERVERS) {
63: return;
64: }
65: logger.info("Starting new Broker instance");
66: try {
67: // test if port is already open
68:
69:• if (Ports.isPortOpen(1883)) {
70: throw new IllegalStateException("Broker port is already in use");
71: }
72:
73: // start the broker
74: System.setProperty(KAHA_DB_DIRECTORY, DEFAULT_KAHA_DB_DIRECTORY);
75: broker = BrokerFactory.createBroker(ACTIVEMQ_XML);
76: broker.setDataDirectory(DEFAULT_DATA_DIRECTORY);
77: logger.info("Setting ActiveMQ data directory to {}", broker.getBrokerDataDirectory());
78: broker.start();
79:
80: // wait for the broker
81:
82:• if (!broker.waitUntilStarted(Duration.ofSeconds(20).toMillis())) {
83: throw new IllegalStateException("Failed to start up broker in time");
84: }
85:
86:• if (EXTRA_STARTUP_DELAY > 0) {
87: Thread.sleep(Duration.ofSeconds(EXTRA_STARTUP_DELAY).toMillis());
88: }
89: } catch (RuntimeException e) {
90: throw e;
91: } catch (Exception e) {
92: throw new RuntimeException("Failed to start broker", e);
93: }
94: }
95:
96: @Given("^Stop Broker$")
97: public void stop() {
98:
99:• if (NO_EMBEDDED_SERVERS) {
100: return;
101: }
102: logger.info("Stopping Broker instance ...");
103: try (final Suppressed<RuntimeException> s = Suppressed.withRuntimeException()) {
104:
105: // close all resources
106:
107: closables.values().stream().flatMap(values -> values.stream()).forEach(s::closeSuppressed);
108:
109: // shut down broker
110:
111:• if (broker != null) {
112: broker.stop();
113: broker.waitUntilStopped();
114: broker = null;
115: }
116:
117: } catch (Exception e) {
118: logger.error("Failed to stop Broker!");
119: e.printStackTrace();
120: }
121:
122: DatastoreMediator.getInstance().clearCache();
123:
124:• if (EXTRA_STARTUP_DELAY > 0) {
125: try {
126: Thread.sleep(Duration.ofSeconds(EXTRA_STARTUP_DELAY).toMillis());
127: } catch (InterruptedException e) {
128: e.printStackTrace();
129: }
130: }
131: logger.info("Stopping Broker instance ... done!");
132: }
133:
134: }