Skip to content

Package: ServiceEventModule

ServiceEventModule

nameinstructionbranchcomplexitylinemethod
ServiceEventModule()
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
getSubscriptionName(String, String)
M: 13 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
start()
M: 129 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 26 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
stop()
M: 83 C: 0
0%
M: 12 C: 0
0%
M: 7 C: 0
0%
M: 28 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.commons.event;
14:
15: import org.apache.commons.lang3.StringUtils;
16: import org.eclipse.kapua.KapuaException;
17: import org.eclipse.kapua.commons.core.ServiceModule;
18: import org.eclipse.kapua.event.ServiceEventBus;
19: import org.slf4j.Logger;
20: import org.slf4j.LoggerFactory;
21:
22: import java.util.ArrayList;
23: import java.util.HashSet;
24: import java.util.List;
25: import java.util.Set;
26: import java.util.concurrent.Executors;
27: import java.util.concurrent.ScheduledExecutorService;
28: import java.util.concurrent.ScheduledFuture;
29: import java.util.concurrent.TimeUnit;
30:
31: /**
32: * Base {@link ServiceModule} implementation to be used by the modules that listen for events.
33: *
34: * @since 1.0
35: */
36: public abstract class ServiceEventModule implements ServiceModule {
37:
38: private static final Logger LOGGER = LoggerFactory.getLogger(ServiceEventModule.class);
39:
40: private static final int MAX_WAIT_LOOP_ON_SHUTDOWN = 30;
41: private static final int SCHEDULED_EXECUTION_TIME_WINDOW = 30;
42: private static final long WAIT_TIME = 1000;
43:
44: private ServiceEventModuleConfiguration serviceEventModuleConfiguration;
45: private Set<String> subscriberNames = new HashSet<>();
46:
47: private ScheduledExecutorService houseKeeperScheduler;
48: private ScheduledFuture<?> houseKeeperHandler;
49: private ServiceEventHousekeeper houseKeeperJob;
50:
51: protected abstract ServiceEventModuleConfiguration initializeConfiguration();
52:
53: private String getSubscriptionName(String address, String subscriber) {
54: return String.format("%s-%s", address, subscriber);
55: }
56:
57: @Override
58: public void start() throws KapuaException {
59: LOGGER.info("Starting service event module... {}", this.getClass().getName());
60: LOGGER.info("Starting service event module... initialize configurations");
61: serviceEventModuleConfiguration = initializeConfiguration();
62: LOGGER.info("Starting service event module... initialize event bus");
63: ServiceEventBus eventbus = ServiceEventBusManager.getInstance();
64: LOGGER.info("Starting service event module... initialize event subscriptions");
65: List<ServiceEntry> servicesEntryList = new ArrayList<>();
66:• if (serviceEventModuleConfiguration.getServiceEventClientConfigurations() != null) {
67:• for (ServiceEventClientConfiguration selc : serviceEventModuleConfiguration.getServiceEventClientConfigurations()) {
68: //get the specific service address... if empty switch to use the default configuration address
69: String address = selc.getAddress();
70:• if (StringUtils.isEmpty(selc.getAddress())) {
71: address = serviceEventModuleConfiguration.getInternalAddress();
72: }
73: // Listen to upstream service events
74:• if (selc.getEventListener() != null) {
75: eventbus.subscribe(address, getSubscriptionName(address, selc.getClientName()), selc.getEventListener());
76: }
77: servicesEntryList.add(new ServiceEntry(selc.getClientName(), address));
78: subscriberNames.add(selc.getClientName()); // Set because names must be unique
79: }
80: } else {
81: LOGGER.info("Configuration subscriptions are missing. No subscriptions added!");
82: }
83:
84: // register events to the service map
85: LOGGER.info("Starting service event module... register services names");
86: ServiceMap.registerServices(serviceEventModuleConfiguration.getInternalAddress(), servicesEntryList);
87:
88: // Start the House keeper
89: LOGGER.info("Starting service event module... start housekeeper");
90: houseKeeperScheduler = Executors.newScheduledThreadPool(1);
91: houseKeeperJob = new ServiceEventHousekeeper(
92: serviceEventModuleConfiguration.getEntityManagerFactory(),
93: eventbus,
94: servicesEntryList);
95: // Start time can be made random from 0 to 30 seconds
96: houseKeeperHandler = houseKeeperScheduler.scheduleAtFixedRate(houseKeeperJob, SCHEDULED_EXECUTION_TIME_WINDOW, SCHEDULED_EXECUTION_TIME_WINDOW, TimeUnit.SECONDS);
97: LOGGER.info("Starting service event module... DONE");
98: }
99:
100: @Override
101: public void stop() throws KapuaException {
102: LOGGER.info("Stopping service event module... {}", this.getClass().getName());
103: LOGGER.info("Stopping service event module... house keeper scheduler [step 1/3]");
104:• if (houseKeeperJob != null) {
105: houseKeeperJob.stop();
106: } else {
107: LOGGER.warn("Cannot shutdown the housekeeper scheduler [step 1/3] since it is null (initialization may not be successful)");
108: }
109: LOGGER.info("Stopping service event module... house keeper scheduler [step 2/3]");
110:• if (houseKeeperHandler != null) {
111: int waitLoop = 0;
112:• while (houseKeeperHandler.isDone()) {
113: try {
114: Thread.sleep(WAIT_TIME);
115: } catch (InterruptedException e) {
116: // do nothing
117: }
118:• if (waitLoop++ > MAX_WAIT_LOOP_ON_SHUTDOWN) {
119: LOGGER.warn("Cannot cancel the house keeper task afeter a while!");
120: break;
121: }
122: }
123: } else {
124: LOGGER.warn("Cannot shutdown the housekeeper scheduler [step 2/3] since it is null (initialization may not be successful)");
125: }
126: LOGGER.info("Stopping service event module... house keeper scheduler [step 3/3]");
127:• if (houseKeeperScheduler != null) {
128: houseKeeperScheduler.shutdown();
129: } else {
130: LOGGER.warn("Cannot shutdown the housekeeper scheduler [step 3/3] since it is null (initialization may not be successful)");
131: }
132: LOGGER.info("Stopping service event module... unregister services names");
133:• if (serviceEventModuleConfiguration != null) {
134: ServiceMap.unregisterServices(new ArrayList<>(subscriberNames));
135: subscriberNames.clear();
136: } else {
137: LOGGER.warn("Cannot unregister services since configuration is null (initialization may not be successful)");
138: }
139: LOGGER.info("Stopping service event module... DONE");
140: }
141:
142: }