Skip to content

Package: RaiseServiceEventInterceptor

RaiseServiceEventInterceptor

nameinstructionbranchcomplexitylinemethod
RaiseServiceEventInterceptor()
M: 43 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
fillEvent(MethodInvocation, ServiceEvent)
M: 204 C: 0
0%
M: 24 C: 0
0%
M: 13 C: 0
0%
M: 44 C: 0
0%
M: 1 C: 0
0%
invoke(MethodInvocation)
M: 44 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 14 C: 0
0%
M: 1 C: 0
0%
lambda$updateEventStatus$0(ServiceEvent, EntityManager)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
logFoundEntities(List, List)
M: 54 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
logInputParameters(Method)
M: 84 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 12 C: 0
0%
M: 1 C: 0
0%
sendEvent(MethodInvocation, ServiceEvent, Object)
M: 59 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 14 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
updateEventStatus(MethodInvocation, ServiceEvent, ServiceEvent.EventStatus)
M: 26 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
useEntityToFillEvent(ServiceEvent, List)
M: 50 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
useKapuaIdsToFillEvent(ServiceEvent, List, Class[])
M: 92 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 17 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.commons.event;
14:
15: import org.aopalliance.intercept.MethodInterceptor;
16: import org.aopalliance.intercept.MethodInvocation;
17: import org.eclipse.kapua.commons.core.InterceptorBind;
18: import org.eclipse.kapua.commons.jpa.EntityManagerContainer;
19: import org.eclipse.kapua.commons.metric.MetricServiceFactory;
20: import org.eclipse.kapua.commons.metric.MetricsService;
21: import org.eclipse.kapua.commons.model.id.KapuaEid;
22: import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
23: import org.eclipse.kapua.commons.security.KapuaSession;
24: import org.eclipse.kapua.commons.service.event.store.api.EventStoreRecord;
25: import org.eclipse.kapua.commons.service.event.store.api.ServiceEventUtil;
26: import org.eclipse.kapua.commons.service.event.store.internal.EventStoreDAO;
27: import org.eclipse.kapua.commons.service.internal.AbstractKapuaService;
28: import org.eclipse.kapua.event.RaiseServiceEvent;
29: import org.eclipse.kapua.event.ServiceEvent;
30: import org.eclipse.kapua.event.ServiceEvent.EventStatus;
31: import org.eclipse.kapua.event.ServiceEventBusException;
32: import org.eclipse.kapua.locator.KapuaProvider;
33: import org.eclipse.kapua.model.KapuaEntity;
34: import org.eclipse.kapua.model.id.KapuaId;
35: import org.eclipse.kapua.service.KapuaService;
36: import org.slf4j.Logger;
37: import org.slf4j.LoggerFactory;
38:
39: import com.codahale.metrics.Counter;
40:
41: import java.lang.annotation.Annotation;
42: import java.lang.reflect.Method;
43: import java.lang.reflect.Parameter;
44: import java.util.ArrayList;
45: import java.util.Date;
46: import java.util.List;
47:
48: /**
49: * Event interceptor. It builds the event object and sends it to the event bus.
50: *
51: * @since 1.0
52: */
53: @KapuaProvider
54: @InterceptorBind(matchSubclassOf = KapuaService.class, matchAnnotatedWith = RaiseServiceEvent.class)
55: public class RaiseServiceEventInterceptor implements MethodInterceptor {
56:
57: private static final Logger LOG = LoggerFactory.getLogger(RaiseServiceEventInterceptor.class);
58:
59: private static final String MODULE = "commons";
60: private static final String COMPONENT = "service_event";
61: private static final String ACTION = "event_data_filler";
62: private static final String COUNT = "count";
63:
64:
65: private static final MetricsService METRIC_SERVICE = MetricServiceFactory.getInstance();
66:
67: private Counter wrongId;
68: private Counter wrongEntity;
69:
70: public RaiseServiceEventInterceptor() {
71: wrongId = METRIC_SERVICE.getCounter(MODULE, COMPONENT, ACTION, "wrong_id", COUNT);
72: wrongEntity = METRIC_SERVICE.getCounter(MODULE, COMPONENT, ACTION, "wrong_entity", COUNT);
73: }
74:
75: @Override
76: public Object invoke(MethodInvocation invocation) throws Throwable {
77: Object returnObject = null;
78:
79: try {
80: // if(!create) then the entity id can be set here
81: ServiceEvent serviceEvent = ServiceEventScope.begin();
82:
83: KapuaSession session = KapuaSecurityUtils.getSession();
84: // Context ID is initialized/managed by the EventScope object
85: serviceEvent.setTimestamp(new Date());
86: serviceEvent.setUserId(session.getUserId());
87: serviceEvent.setScopeId(session.getScopeId());
88: fillEvent(invocation, serviceEvent);
89:
90: // execute the business logic
91: returnObject = invocation.proceed();
92:
93: // Raise service event if the execution is successful
94: try {
95: sendEvent(invocation, serviceEvent, returnObject);
96: } catch (ServiceEventBusException e) {
97: LOG.warn("Error sending event: {}", e.getMessage(), e);
98: }
99:
100: return returnObject;
101:
102: } finally {
103: ServiceEventScope.end();
104: }
105: }
106:
107: private void fillEvent(MethodInvocation invocation, ServiceEvent serviceEvent) {
108:• if (LOG.isDebugEnabled()) {
109: logInputParameters(invocation.getMethod());
110: }
111: StringBuilder inputs = new StringBuilder();
112: //find ids and entities
113: List<KapuaEntity> entities = new ArrayList<>();
114: List<KapuaId> ids = new ArrayList<>();
115: Class<?>[] parametersClass = invocation.getMethod().getParameterTypes();
116: Object[] arguments = invocation.getArguments();
117:• for (int i = 0; i<parametersClass.length; i++) {
118: Class<?> parameterClass = parametersClass[i];
119: LOG.debug("Parameter '{}' type {}", i, parameterClass);
120:• if (KapuaId.class.isAssignableFrom(parameterClass)) {
121: ids.add((KapuaId)arguments[i]);
122: }
123:• else if (KapuaEntity.class.isAssignableFrom(parameterClass)) {
124: entities.add((KapuaEntity)arguments[i]);
125: }
126: // fill the inputs
127:• inputs.append(arguments[i] != null ? arguments[i].toString() : "null");
128: inputs.append(", ");
129: }
130:• if (inputs.length() > 2) {
131: inputs.replace(inputs.length() - 2, inputs.length(), "");
132: }
133: serviceEvent.setInputs(inputs.toString());
134:• if (LOG.isDebugEnabled()) {
135: logFoundEntities(entities, ids);
136: }
137:• if (invocation.getThis() instanceof AbstractKapuaService) {
138: // get the service name
139: // the service is wrapped by guice so getThis --> getSuperclass() should provide the intercepted class
140: // then keep the interface from this object
141: serviceEvent.setOperation(invocation.getMethod().getName());
142: Class<?> wrappedClass = ((AbstractKapuaService) invocation.getThis()).getClass().getSuperclass(); // this object should be not null
143: Class<?>[] implementedClass = wrappedClass.getInterfaces();
144: // assuming that the KapuaService implemented is specified by the first implementing interface
145: String serviceInterfaceName = implementedClass[0].getName();
146: // String splittedServiceInterfaceName[] = serviceInterfaceName.split("\\.");
147: // String serviceName = splittedServiceInterfaceName.length > 0 ? splittedServiceInterfaceName[splittedServiceInterfaceName.length-1] : "";
148: // String cleanedServiceName = serviceName.substring(0, serviceName.length()-"Service".length()).toLowerCase();
149: String cleanedServiceName = serviceInterfaceName;
150: LOG.debug("Service name '{}' ", cleanedServiceName);
151: serviceEvent.setService(cleanedServiceName);
152:• if (entities.size()>0) {
153: useEntityToFillEvent(serviceEvent, entities);
154: }
155:• else if (ids.size()>0) {
156: // otherwise assume that the second identifier is the entity id (and the first the scope id, if there are more than one) or take the first one (if there is one)
157: useKapuaIdsToFillEvent(serviceEvent, ids, implementedClass);
158: }
159: } else {
160: Annotation[] annotations = invocation.getMethod().getDeclaredAnnotations();
161:• for (Annotation annotation : annotations) {
162:• if (RaiseServiceEvent.class.isAssignableFrom(annotation.annotationType())) {
163: RaiseServiceEvent raiseKapuaEvent = (RaiseServiceEvent) annotation;
164: serviceEvent.setService(raiseKapuaEvent.service());
165: serviceEvent.setEntityType(raiseKapuaEvent.entityType());
166: serviceEvent.setOperation(raiseKapuaEvent.operation());
167: serviceEvent.setNote(raiseKapuaEvent.note());
168: break;
169: }
170: }
171: }
172: }
173:
174: private void useEntityToFillEvent(ServiceEvent serviceEvent, List<KapuaEntity> entities) {
175:• if (entities.size()>1) {
176: LOG.warn("Found more than one KapuaEntity in the parameters! Assuming to use the first one!");
177: wrongEntity.inc();
178: }
179: KapuaEntity entity = entities.get(0);
180: serviceEvent.setEntityType(entity.getClass().getName());
181: serviceEvent.setEntityId(entity.getId());
182: serviceEvent.setEntityScopeId(entity.getScopeId());
183: LOG.info("Entity '{}' with id '{}' and scopeId '{}' found!", entity.getClass().getName(), entity.getId(), entity.getScopeId());
184: }
185:
186: private void useKapuaIdsToFillEvent(ServiceEvent serviceEvent, List<KapuaId> ids, Class<?>[] implementedClass) {
187:• if (ids.size()>2) {
188: LOG.warn("Found more than two KapuaId in the parameters! Assuming to use the first two!");
189: wrongId.inc();
190: }
191:• if (ids.size() >= 2) {
192: serviceEvent.setEntityScopeId(ids.get(0));
193: serviceEvent.setEntityId(ids.get(1));
194: }
195: else {
196: serviceEvent.setEntityId(ids.get(0));
197: }
198: String serviceInterface = implementedClass[0].getAnnotatedInterfaces()[0].getType().getTypeName();
199: String genericsList = serviceInterface.substring(serviceInterface.indexOf('<') + 1, serviceInterface.indexOf('>'));
200: String[] entityClassesToScan = genericsList.replaceAll("\\,", "").split(" ");
201:• for (String str : entityClassesToScan) {
202: try {
203:• if (KapuaEntity.class.isAssignableFrom(Class.forName(str))) {
204: serviceEvent.setEntityType(str);
205: }
206: } catch (ClassNotFoundException e) {
207: // do nothing
208: LOG.warn("Cannot find class {}", str, e);
209: }
210: }
211: }
212:
213: private void logInputParameters(Method method) {
214: LOG.debug("Event input parameters: ");
215: LOG.debug(" Parameter types");
216:• for (Class<?> tmp : method.getParameterTypes()) {
217: LOG.debug(" {}", tmp.getName());
218: }
219: LOG.info(" Declared annotations");
220:• for (Annotation tmp : method.getDeclaredAnnotations()) {
221: LOG.debug(" {}", tmp.getClass());
222: }
223: LOG.info(" Parameters");
224:• for (Parameter tmp : method.getParameters()) {
225: LOG.debug(" {} - class: {}", tmp.getName(), tmp.getType());
226: }
227: LOG.debug("================ END");
228: }
229:
230: private void logFoundEntities(List<KapuaEntity> entities, List<KapuaId> ids) {
231: LOG.debug("Entities found:");
232:• for (KapuaEntity tmp : entities) {
233: LOG.debug(" id: {} - scopeId: {} - type: {}", tmp.getId(), tmp.getScopeId(), tmp.getType());
234: }
235: LOG.debug(" KapuaIds found:");
236:• for (KapuaId tmp : ids) {
237: LOG.debug(" id: {}", tmp.getId());
238: }
239: }
240:
241: private void sendEvent(MethodInvocation invocation, ServiceEvent serviceEvent, Object returnedValue) throws ServiceEventBusException {
242: String address = ServiceMap.getAddress(serviceEvent.getService());
243: try {
244: ServiceEventBusManager.getInstance().publish(address, serviceEvent);
245: LOG.info("SENT event from service {} to {} - entity type {} - entity scope id {} - entity id {} - context id {}",
246: serviceEvent.getService(),
247: address,
248: serviceEvent.getEntityType(),
249: serviceEvent.getEntityScopeId(),
250: serviceEvent.getEntityId(),
251: serviceEvent.getContextId());
252: // if message was sent successfully then confirm the event in the event table
253: updateEventStatus(invocation, serviceEvent, EventStatus.SENT);
254: } catch (ServiceEventBusException e) {
255: LOG.warn("Error sending event", e);
256: // mark event status as SEND_ERROR
257: updateEventStatus(invocation, serviceEvent, EventStatus.SEND_ERROR);
258: }
259: }
260:
261: private void updateEventStatus(MethodInvocation invocation, ServiceEvent serviceEventBus, EventStatus newServiceEventStatus) {
262:• if (invocation.getThis() instanceof AbstractKapuaService) {
263: try {
264: serviceEventBus.setStatus(newServiceEventStatus);
265: ((AbstractKapuaService) invocation.getThis()).getEntityManagerSession().doTransactedAction(EntityManagerContainer.<EventStoreRecord>create().onResultHandler(em -> {
266: return EventStoreDAO.update(em,
267: ServiceEventUtil.mergeToEntity(EventStoreDAO.find(em, serviceEventBus.getScopeId(), KapuaEid.parseCompactId(serviceEventBus.getId())), serviceEventBus));
268: }));
269: } catch (Throwable t) {
270: // this may be a valid condition if the HouseKeeper is doing the update concurrently with this task
271: LOG.warn("Error updating event status: {}", t.getMessage(), t);
272: }
273: }
274: }
275:
276: }