Skip to content

Package: ChannelInfoRegistryServiceImpl

ChannelInfoRegistryServiceImpl

nameinstructionbranchcomplexitylinemethod
ChannelInfoRegistryServiceImpl()
M: 53 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
checkDataAccess(KapuaId, Actions)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
count(ChannelInfoQuery)
M: 33 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
delete(ChannelInfoQuery)
M: 34 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
delete(KapuaId, StorableId)
M: 32 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 10 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, StorableId)
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
find(KapuaId, StorableId, StorableFetchStyle)
M: 38 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 11 C: 0
0%
M: 1 C: 0
0%
isServiceEnabled(KapuaId)
M: 9 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
query(ChannelInfoQuery)
M: 58 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 13 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 11 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
updateLastPublishedFields(ChannelInfo)
M: 122 C: 0
0%
M: 4 C: 0
0%
M: 3 C: 0
0%
M: 28 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: * Red Hat Inc
13: *******************************************************************************/
14: package org.eclipse.kapua.service.datastore.internal;
15:
16: import org.eclipse.kapua.KapuaException;
17: import org.eclipse.kapua.commons.service.internal.AbstractKapuaService;
18: import org.eclipse.kapua.commons.service.internal.KapuaServiceDisabledException;
19: import org.eclipse.kapua.commons.util.ArgumentValidator;
20: import org.eclipse.kapua.locator.KapuaLocator;
21: import org.eclipse.kapua.locator.KapuaProvider;
22: import org.eclipse.kapua.model.domain.Actions;
23: import org.eclipse.kapua.model.id.KapuaId;
24: import org.eclipse.kapua.service.account.AccountService;
25: import org.eclipse.kapua.service.authorization.AuthorizationService;
26: import org.eclipse.kapua.service.authorization.permission.Permission;
27: import org.eclipse.kapua.service.authorization.permission.PermissionFactory;
28: import org.eclipse.kapua.service.datastore.ChannelInfoRegistryService;
29: import org.eclipse.kapua.service.datastore.DatastoreDomains;
30: import org.eclipse.kapua.service.datastore.MessageStoreService;
31: import org.eclipse.kapua.service.datastore.internal.mediator.ChannelInfoField;
32: import org.eclipse.kapua.service.datastore.internal.mediator.DatastoreMediator;
33: import org.eclipse.kapua.service.datastore.internal.mediator.MessageField;
34: import org.eclipse.kapua.service.datastore.internal.model.query.MessageQueryImpl;
35: import org.eclipse.kapua.service.datastore.internal.schema.MessageSchema;
36: import org.eclipse.kapua.service.datastore.internal.setting.DatastoreSettings;
37: import org.eclipse.kapua.service.datastore.internal.setting.DatastoreSettingsKey;
38: import org.eclipse.kapua.service.datastore.model.ChannelInfo;
39: import org.eclipse.kapua.service.datastore.model.ChannelInfoListResult;
40: import org.eclipse.kapua.service.datastore.model.MessageListResult;
41: import org.eclipse.kapua.service.datastore.model.query.ChannelInfoQuery;
42: import org.eclipse.kapua.service.datastore.model.query.MessageQuery;
43: import org.eclipse.kapua.service.datastore.model.query.predicate.DatastorePredicateFactory;
44: import org.eclipse.kapua.service.storable.model.id.StorableId;
45: import org.eclipse.kapua.service.storable.model.query.SortField;
46: import org.eclipse.kapua.service.storable.model.query.StorableFetchStyle;
47: import org.eclipse.kapua.service.storable.model.query.predicate.AndPredicate;
48: import org.eclipse.kapua.service.storable.model.query.predicate.RangePredicate;
49: import org.eclipse.kapua.service.storable.model.query.predicate.TermPredicate;
50: import org.slf4j.Logger;
51: import org.slf4j.LoggerFactory;
52:
53: import java.util.ArrayList;
54: import java.util.Date;
55: import java.util.List;
56:
57: /**
58: * Channel info registry implementation
59: *
60: * @since 1.0.0
61: */
62: @KapuaProvider
63: public class ChannelInfoRegistryServiceImpl extends AbstractKapuaService implements ChannelInfoRegistryService {
64:
65: private static final Logger LOG = LoggerFactory.getLogger(ChannelInfoRegistryServiceImpl.class);
66:
67: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
68: private static final DatastorePredicateFactory DATASTORE_PREDICATE_FACTORY = LOCATOR.getFactory(DatastorePredicateFactory.class);
69:
70: private final AccountService accountService;
71: private final AuthorizationService authorizationService;
72: private final PermissionFactory permissionFactory;
73: private final ChannelInfoRegistryFacade channelInfoRegistryFacade;
74: private final MessageStoreService messageStoreService;
75:
76: private static final String QUERY = "query";
77: private static final String QUERY_SCOPE_ID = "query.scopeId";
78:
79: /**
80: * Default constructor
81: *
82: * @since 1.0.0
83: */
84: public ChannelInfoRegistryServiceImpl() {
85: super(DatastoreEntityManagerFactory.getInstance());
86:
87: KapuaLocator locator = KapuaLocator.getInstance();
88: accountService = locator.getService(AccountService.class);
89: authorizationService = locator.getService(AuthorizationService.class);
90: permissionFactory = locator.getFactory(PermissionFactory.class);
91: messageStoreService = locator.getService(MessageStoreService.class);
92:
93: MessageStoreService messageStoreService = KapuaLocator.getInstance().getService(MessageStoreService.class);
94: ConfigurationProviderImpl configurationProvider = new ConfigurationProviderImpl(messageStoreService, accountService);
95: channelInfoRegistryFacade = new ChannelInfoRegistryFacade(configurationProvider, DatastoreMediator.getInstance());
96: DatastoreMediator.getInstance().setChannelInfoStoreFacade(channelInfoRegistryFacade);
97: }
98:
99: @Override
100: public ChannelInfo find(KapuaId scopeId, StorableId id) throws KapuaException {
101: return find(scopeId, id, StorableFetchStyle.SOURCE_FULL);
102: }
103:
104: @Override
105: public ChannelInfo find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle)
106: throws KapuaException {
107:• if (!isServiceEnabled(scopeId)) {
108: throw new KapuaServiceDisabledException(this.getClass().getName());
109: }
110:
111: ArgumentValidator.notNull(scopeId, "scopeId");
112: ArgumentValidator.notNull(id, "id");
113:
114: checkDataAccess(scopeId, Actions.read);
115: try {
116: ChannelInfo channelInfo = channelInfoRegistryFacade.find(scopeId, id);
117:• if (channelInfo != null) {
118: // populate the lastMessageTimestamp
119: updateLastPublishedFields(channelInfo);
120: }
121: return channelInfo;
122: } catch (Exception e) {
123: throw KapuaException.internalError(e);
124: }
125: }
126:
127: @Override
128: public ChannelInfoListResult query(ChannelInfoQuery query)
129: throws KapuaException {
130:• if (!isServiceEnabled(query.getScopeId())) {
131: throw new KapuaServiceDisabledException(this.getClass().getName());
132: }
133:
134: ArgumentValidator.notNull(query, QUERY);
135: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
136:
137: checkDataAccess(query.getScopeId(), Actions.read);
138: try {
139: ChannelInfoListResult result = channelInfoRegistryFacade.query(query);
140:• if (result != null && query.getFetchAttributes().contains(ChannelInfoField.TIMESTAMP.field())) {
141: // populate the lastMessageTimestamp
142:• for (ChannelInfo channelInfo : result.getItems()) {
143: updateLastPublishedFields(channelInfo);
144: }
145: }
146: return result;
147: } catch (Exception e) {
148: throw KapuaException.internalError(e);
149: }
150: }
151:
152: @Override
153: public long count(ChannelInfoQuery query)
154: throws KapuaException {
155:• if (!isServiceEnabled(query.getScopeId())) {
156: throw new KapuaServiceDisabledException(this.getClass().getName());
157: }
158:
159: ArgumentValidator.notNull(query, QUERY);
160: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
161:
162: checkDataAccess(query.getScopeId(), Actions.read);
163: try {
164: return channelInfoRegistryFacade.count(query);
165: } catch (Exception e) {
166: throw KapuaException.internalError(e);
167: }
168: }
169:
170: void delete(KapuaId scopeId, StorableId id)
171: throws KapuaException {
172:• if (!isServiceEnabled(scopeId)) {
173: throw new KapuaServiceDisabledException(this.getClass().getName());
174: }
175:
176: ArgumentValidator.notNull(scopeId, "scopeId");
177: ArgumentValidator.notNull(id, "id");
178:
179: checkDataAccess(scopeId, Actions.delete);
180: try {
181: channelInfoRegistryFacade.delete(scopeId, id);
182: } catch (Exception e) {
183: throw KapuaException.internalError(e);
184: }
185: }
186:
187: void delete(ChannelInfoQuery query)
188: throws KapuaException {
189:• if (!isServiceEnabled(query.getScopeId())) {
190: throw new KapuaServiceDisabledException(this.getClass().getName());
191: }
192:
193: ArgumentValidator.notNull(query, QUERY);
194: ArgumentValidator.notNull(query.getScopeId(), QUERY_SCOPE_ID);
195:
196: checkDataAccess(query.getScopeId(), Actions.delete);
197: try {
198: channelInfoRegistryFacade.delete(query);
199: } catch (Exception e) {
200: throw KapuaException.internalError(e);
201: }
202: }
203:
204: private void checkDataAccess(KapuaId scopeId, Actions action)
205: throws KapuaException {
206: Permission permission = permissionFactory.newPermission(DatastoreDomains.DATASTORE_DOMAIN, action, scopeId);
207: authorizationService.checkPermission(permission);
208: }
209:
210: /**
211: * Update the last published date and last published message identifier for the specified channel info, so it gets the timestamp and the message id of the last published message for the
212: * account/clientId in the
213: * channel info
214: *
215: * @param channelInfo
216: * @throws KapuaException
217: * @since 1.0.0
218: */
219: private void updateLastPublishedFields(ChannelInfo channelInfo) throws KapuaException {
220: List<SortField> sort = new ArrayList<>();
221: sort.add(SortField.descending(MessageSchema.MESSAGE_TIMESTAMP));
222:
223: MessageQuery messageQuery = new MessageQueryImpl(channelInfo.getScopeId());
224: messageQuery.setAskTotalCount(true);
225: messageQuery.setFetchStyle(StorableFetchStyle.FIELDS);
226: messageQuery.setLimit(1);
227: messageQuery.setOffset(0);
228: messageQuery.setSortFields(sort);
229:
230: RangePredicate messageIdPredicate = DATASTORE_PREDICATE_FACTORY.newRangePredicate(ChannelInfoField.TIMESTAMP, channelInfo.getFirstMessageOn(), null);
231: TermPredicate clientIdPredicate = DATASTORE_PREDICATE_FACTORY.newTermPredicate(MessageField.CLIENT_ID, channelInfo.getClientId());
232: TermPredicate channelPredicate = DATASTORE_PREDICATE_FACTORY.newTermPredicate(MessageField.CHANNEL, channelInfo.getName());
233:
234: AndPredicate andPredicate = DATASTORE_PREDICATE_FACTORY.newAndPredicate();
235: andPredicate.getPredicates().add(messageIdPredicate);
236: andPredicate.getPredicates().add(clientIdPredicate);
237: andPredicate.getPredicates().add(channelPredicate);
238: messageQuery.setPredicate(andPredicate);
239:
240: MessageListResult messageList = messageStoreService.query(messageQuery);
241:
242: StorableId lastPublishedMessageId = null;
243: Date lastPublishedMessageTimestamp = null;
244:• if (messageList.getSize() == 1) {
245: lastPublishedMessageId = messageList.getFirstItem().getDatastoreId();
246: lastPublishedMessageTimestamp = messageList.getFirstItem().getTimestamp();
247:• } else if (messageList.isEmpty()) {
248: // this condition could happens due to the ttl of the messages (so if it happens, it does not necessarily mean there has been an error!)
249: LOG.warn("Cannot find last timestamp for the specified client id '{}' - account '{}'", channelInfo.getScopeId(), channelInfo.getClientId());
250: } else {
251: // this condition shouldn't never happens since the query has a limit 1
252: // if happens it means than an elasticsearch internal error happens and/or our driver didn't set it correctly!
253: LOG.error("Cannot find last timestamp for the specified client id '{}' - account '{}'. More than one result returned by the query!", channelInfo.getScopeId(), channelInfo.getClientId());
254: }
255:
256: channelInfo.setLastMessageId(lastPublishedMessageId);
257: channelInfo.setLastMessageOn(lastPublishedMessageTimestamp);
258: }
259:
260: @Override
261: public boolean isServiceEnabled(KapuaId scopeId) {
262:• return !DatastoreSettings.getInstance().getBoolean(DatastoreSettingsKey.DISABLE_DATASTORE, false);
263: }
264:
265: }