Skip to content

Package: ServiceEventHousekeeper$NoExecutionNeededException

ServiceEventHousekeeper$NoExecutionNeededException

nameinstructionbranchcomplexitylinemethod
ServiceEventHousekeeper.NoExecutionNeededException(ServiceEventHousekeeper, String)
M: 7 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.commons.event;
14:
15: import org.eclipse.kapua.KapuaException;
16: import org.eclipse.kapua.commons.jpa.EntityManager;
17: import org.eclipse.kapua.commons.jpa.EntityManagerFactory;
18: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
19: import org.eclipse.kapua.commons.service.event.store.api.EventStoreRecord;
20: import org.eclipse.kapua.commons.service.event.store.api.EventStoreRecordAttributes;
21: import org.eclipse.kapua.commons.service.event.store.api.EventStoreRecordListResult;
22: import org.eclipse.kapua.commons.service.event.store.api.EventStoreRecordQuery;
23: import org.eclipse.kapua.commons.service.event.store.api.EventStoreService;
24: import org.eclipse.kapua.commons.service.event.store.api.ServiceEventUtil;
25: import org.eclipse.kapua.commons.service.event.store.internal.EventStoreFactoryImpl;
26: import org.eclipse.kapua.commons.service.event.store.internal.EventStoreServiceImpl;
27: import org.eclipse.kapua.commons.setting.system.SystemSetting;
28: import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
29: import org.eclipse.kapua.commons.util.KapuaDateUtils;
30: import org.eclipse.kapua.event.ServiceEvent.EventStatus;
31: import org.eclipse.kapua.event.ServiceEventBus;
32: import org.eclipse.kapua.event.ServiceEventBusException;
33: import org.eclipse.kapua.model.KapuaUpdatableEntityAttributes;
34: import org.eclipse.kapua.model.query.predicate.AndPredicate;
35: import org.eclipse.kapua.model.query.predicate.AttributePredicate.Operator;
36: import org.slf4j.Logger;
37: import org.slf4j.LoggerFactory;
38:
39: import java.time.Duration;
40: import java.time.temporal.ChronoUnit;
41: import java.util.Date;
42: import java.util.List;
43:
44: /**
45: * Event bus housekeeper. It is responsible to send unsent messages or send again messages gone in error.
46: *
47: * @since 1.0
48: */
49: public class ServiceEventHousekeeper implements Runnable {
50:
51: private static final Logger LOGGER = LoggerFactory.getLogger(ServiceEventHousekeeper.class);
52:
53: private enum EventsProcessType {
54: OLD,
55: SEND_ERROR
56: }
57:
58: private static final long WAIT_TIME = SystemSetting.getInstance().getLong(SystemSettingKey.HOUSEKEEPER_EXECUTION_WAIT_TIME);
59: private static final long OLD_MESSAGES_TIME_WINDOW = SystemSetting.getInstance().getLong(SystemSettingKey.HOUSEKEEPER_OLD_MESSAGES_TIME_WINDOW);
60: private static final int EVENT_SCAN_WINDOW = SystemSetting.getInstance().getInt(SystemSettingKey.HOUSEKEEPER_EVENT_SCAN_WINDOW);
61:
62: private final Object monitor = new Object();
63:
64: private EventStoreService kapuaEventService;
65:
66: private EntityManager manager;
67:
68: private ServiceEventBus eventbus;
69: private List<ServiceEntry> servicesEntryList;
70: private boolean running;
71:
72: /**
73: * Default constructor
74: *
75: * @param entityManagerFactory
76: * @param eventbus
77: * @param servicesEntryList
78: * @throws KapuaException
79: */
80: public ServiceEventHousekeeper(EntityManagerFactory entityManagerFactory, ServiceEventBus eventbus, List<ServiceEntry> servicesEntryList) throws KapuaException {
81: this.eventbus = eventbus;
82: this.servicesEntryList = servicesEntryList;
83: manager = entityManagerFactory.createEntityManager();
84: kapuaEventService = new EventStoreServiceImpl(entityManagerFactory);
85: }
86:
87: @Override
88: public void run() {
89: //TODO handling events table cleanup
90: running = true;
91: while (running) {
92: waitStep();
93: for (ServiceEntry serviceEntry : servicesEntryList) {
94: try {
95: if (running) {
96: KapuaSecurityUtils.doPrivileged(() -> processServiceEvents(serviceEntry.getServiceName()));
97: }
98: } catch (KapuaException e) {
99: LOGGER.warn("Generic error {}", e.getMessage(), e);
100: } finally {
101: //remove the lock if present
102: if (manager.isTransactionActive()) {
103: manager.rollback();
104: }
105: }
106: }
107: }
108: running = false;
109: }
110:
111: private void processServiceEvents(String serviceName) throws KapuaException {
112: try {
113: LOGGER.trace("Scan not processed events for service '{}'", serviceName);
114: Date startRun = Date.from(KapuaDateUtils.getKapuaSysDate());
115: //try to acquire lock
116: HousekeeperRun kapuaEventHousekeeper = getLock(serviceName);
117: //scan unsent events (marked as SENT_ERROR)
118: findAndSendUnsentEvents(serviceName, EventsProcessType.SEND_ERROR);
119: //scan unsent OLD events (marked as FIRED but raised before a specific (configurable) time window)
120: findAndSendUnsentEvents(serviceName, EventsProcessType.OLD);
121: //release lock
122: updateLock(kapuaEventHousekeeper, serviceName, startRun);
123: } catch (LockException | NoExecutionNeededException e) {
124: LOGGER.trace("The lock is handled by someone else or the last execution was to close");
125: } finally {
126: //remove the lock if present
127: if (manager.isTransactionActive()) {
128: manager.rollback();
129: }
130: }
131: }
132:
133: private void findAndSendUnsentEvents(String serviceName, EventsProcessType eventsProcessType) throws KapuaException {
134: EventStoreRecordListResult unsentMessagesList = getUnsentEvents(serviceName, eventsProcessType);
135: //send unprocessed events
136: if (!unsentMessagesList.isEmpty()) {
137: for (EventStoreRecord kapuaEvent : unsentMessagesList.getItems()) {
138: try {
139: String address = ServiceMap.getAddress(serviceName);
140: LOGGER.info("publish event: service '{}' - address '{}' - operation '{}' - id '{}'",
141: kapuaEvent.getService(),
142: address,
143: kapuaEvent.getOperation(),
144: kapuaEvent.getContextId());
145:
146: eventbus.publish(address, ServiceEventUtil.toServiceEventBus(kapuaEvent));
147: //if message was sent successfully then confirm the event in the event table
148: //if something goes wrong during this update the event message may be raised twice (but this condition should happens rarely and it is compliant to the contract of the service events)
149: //this is done in a different transaction
150: kapuaEvent.setStatus(EventStatus.SENT);
151: kapuaEventService.update(kapuaEvent);
152: } catch (ServiceEventBusException e) {
153: LOGGER.warn("Exception publishing event: {}", e.getMessage(), e);
154: } catch (KapuaException e) {
155: //this may be a valid condition if the HouseKeeper is doing the update concurrently with this task
156: LOGGER.warn("Exception acknowledging event: {}", e.getMessage(), e);
157: }
158: }
159: }
160: }
161:
162: private EventStoreRecordListResult getUnsentEvents(String serviceName, EventsProcessType eventsProcessType) throws KapuaException {
163: EventStoreRecordQuery query = new EventStoreFactoryImpl().newQuery(null);
164:
165: AndPredicate andPredicate = query.andPredicate();
166: andPredicate.and(query.attributePredicate(EventStoreRecordAttributes.SERVICE_NAME, serviceName));
167:
168: if (EventsProcessType.SEND_ERROR.equals(eventsProcessType)) {
169: LOGGER.trace("Looking for SENT_ERROR events. Add EventStatus=SENT_ERROR query predicate.");
170: andPredicate.and(query.attributePredicate(EventStoreRecordAttributes.EVENT_STATUS, EventStatus.SEND_ERROR));
171: } else {
172: LOGGER.trace("Looking for OLD events. Add EventStatus=RAISED query predicate.");
173: andPredicate.and(query.attributePredicate(EventStoreRecordAttributes.EVENT_STATUS, EventStatus.TRIGGERED));
174: //add timestamp predicate
175: Date eventDateBound = Date.from(KapuaDateUtils.getKapuaSysDate().minusMillis(OLD_MESSAGES_TIME_WINDOW));
176: LOGGER.trace("Looking for OLD events. Add timestamp condition query predicate. Date before {}", eventDateBound);
177: andPredicate.and(query.attributePredicate(KapuaUpdatableEntityAttributes.MODIFIED_ON, eventDateBound, Operator.LESS_THAN_OR_EQUAL));
178: }
179:
180: query.setPredicate(andPredicate);
181: query.setLimit(EVENT_SCAN_WINDOW);
182:
183: return kapuaEventService.query(query);
184: }
185:
186: private void waitStep() {
187: try {
188: synchronized (monitor) {
189: monitor.wait(WAIT_TIME);
190: }
191: } catch (InterruptedException e) {
192: LOGGER.warn("Exception waiting for next scheduled execution: {}", e.getMessage(), e);
193: }
194: }
195:
196: public void stop() {
197: running = false;
198: synchronized (monitor) {
199: monitor.notify();
200: }
201: }
202:
203: private HousekeeperRun getLock(String serviceName) throws LockException, NoExecutionNeededException {
204: HousekeeperRun kapuaEventHousekeeper = null;
205: try {
206: manager.beginTransaction();
207: kapuaEventHousekeeper = manager.findWithLock(HousekeeperRun.class, serviceName);
208: } catch (Exception e) {
209: throw new LockException(String.format("Cannot acquire lock: %s", e.getMessage()), e);
210: }
211: // Check last housekeeper run
212: if (KapuaDateUtils.getKapuaSysDate().isBefore(kapuaEventHousekeeper.getLastRunOn().toInstant().plus(Duration.of(WAIT_TIME, ChronoUnit.MILLIS)))) {
213: throw new NoExecutionNeededException("Not enough time since the last execution");
214: }
215: return kapuaEventHousekeeper;
216: }
217:
218: private void updateLock(HousekeeperRun kapuaEventHousekeeper, String serviceName, Date startRun) throws KapuaException {
219: kapuaEventHousekeeper.setLastRunBy(serviceName);
220: kapuaEventHousekeeper.setLastRunOn(startRun);
221: manager.persist(kapuaEventHousekeeper);
222: manager.commit();
223: }
224:
225: private class LockException extends Exception {
226:
227: private static final long serialVersionUID = 3099804470559976126L;
228:
229: public LockException(String msg, Throwable t) {
230: super(msg, t);
231: }
232: }
233:
234: private class NoExecutionNeededException extends Exception {
235:
236: private static final long serialVersionUID = 7292333466656851052L;
237:
238: public NoExecutionNeededException(String msg) {
239: super(msg);
240: }
241:
242: }
243:
244: }