Skip to content

Package: DataMessages

DataMessages

nameinstructionbranchcomplexitylinemethod
DataMessages()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
count(ScopeId, MessageQuery)
M: 10 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
find(ScopeId, StorableEntityId)
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
getChannelPredicate(String, boolean)
M: 14 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
getMetricPredicate(String, MetricType, String, String)
M: 43 C: 0
0%
M: 8 C: 0
0%
M: 5 C: 0
0%
M: 8 C: 0
0%
M: 1 C: 0
0%
query(ScopeId, MessageQuery)
M: 8 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
simpleQuery(ScopeId, String, String, boolean, DateParam, DateParam, String, MetricType, String, String, SortDirection, int, int)
M: 103 C: 0
0%
M: 14 C: 0
0%
M: 8 C: 0
0%
M: 21 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 18 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
storeMessage(ScopeId, KapuaDataMessage)
M: 13 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech - initial API and implementation
12: *******************************************************************************/
13: package org.eclipse.kapua.app.api.resources.v1.resources;
14:
15: import com.google.common.base.Strings;
16: import org.eclipse.kapua.KapuaException;
17: import org.eclipse.kapua.KapuaIllegalNullArgumentException;
18: import org.eclipse.kapua.app.api.core.resources.AbstractKapuaResource;
19: import org.eclipse.kapua.app.api.core.model.CountResult;
20: import org.eclipse.kapua.app.api.core.model.DateParam;
21: import org.eclipse.kapua.app.api.core.model.MetricType;
22: import org.eclipse.kapua.app.api.core.model.ScopeId;
23: import org.eclipse.kapua.app.api.core.model.StorableEntityId;
24: import org.eclipse.kapua.locator.KapuaLocator;
25: import org.eclipse.kapua.message.device.data.KapuaDataMessage;
26: import org.eclipse.kapua.model.type.ObjectValueConverter;
27: import org.eclipse.kapua.service.KapuaService;
28: import org.eclipse.kapua.service.datastore.MessageStoreFactory;
29: import org.eclipse.kapua.service.datastore.MessageStoreService;
30: import org.eclipse.kapua.service.datastore.internal.mediator.ChannelInfoField;
31: import org.eclipse.kapua.service.datastore.internal.mediator.MessageField;
32: import org.eclipse.kapua.service.datastore.internal.schema.MessageSchema;
33: import org.eclipse.kapua.service.datastore.model.DatastoreMessage;
34: import org.eclipse.kapua.service.datastore.model.MessageListResult;
35: import org.eclipse.kapua.service.datastore.model.query.MessageQuery;
36: import org.eclipse.kapua.service.datastore.model.query.predicate.DatastorePredicateFactory;
37: import org.eclipse.kapua.service.elasticsearch.client.model.InsertResponse;
38: import org.eclipse.kapua.service.storable.model.query.SortDirection;
39: import org.eclipse.kapua.service.storable.model.query.SortField;
40: import org.eclipse.kapua.service.storable.model.query.StorableFetchStyle;
41: import org.eclipse.kapua.service.storable.model.query.predicate.AndPredicate;
42: import org.eclipse.kapua.service.storable.model.query.predicate.RangePredicate;
43: import org.eclipse.kapua.service.storable.model.query.predicate.StorablePredicate;
44: import org.eclipse.kapua.service.storable.model.query.predicate.TermPredicate;
45:
46: import javax.ws.rs.Consumes;
47: import javax.ws.rs.DefaultValue;
48: import javax.ws.rs.GET;
49: import javax.ws.rs.POST;
50: import javax.ws.rs.Path;
51: import javax.ws.rs.PathParam;
52: import javax.ws.rs.Produces;
53: import javax.ws.rs.QueryParam;
54: import javax.ws.rs.core.MediaType;
55: import javax.ws.rs.core.Response;
56: import java.util.ArrayList;
57: import java.util.Date;
58: import java.util.List;
59:
60: @Path("{scopeId}/data/messages")
61: public class DataMessages extends AbstractKapuaResource {
62:
63: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
64: private static final MessageStoreService MESSAGE_STORE_SERVICE = LOCATOR.getService(MessageStoreService.class);
65: private static final MessageStoreFactory MESSAGE_STORE_FACTORY = LOCATOR.getFactory(MessageStoreFactory.class);
66: private static final DatastorePredicateFactory DATASTORE_PREDICATE_FACTORY = LOCATOR.getFactory(DatastorePredicateFactory.class);
67:
68: /**
69: * Gets the {@link DatastoreMessage} list in the scope.
70: *
71: * @param scopeId The {@link ScopeId} in which to search results.
72: * @param clientId The client id to filter results.
73: * @param channel The channel id to filter results. It allows '#' wildcard in last channel level.
74: * @param strictChannel Restrict the search only to this channel ignoring its children. Only meaningful if channel is set.
75: * @param startDateParam The start date to filter the results. Must come before endDate parameter.
76: * @param endDateParam The end date to filter the results. Must come after startDate parameter
77: * @param offset The result set offset.
78: * @param limit The result set limit.
79: * @return The {@link MessageListResult} of all the datastoreMessages associated to the current selected scope.
80: * @throws KapuaException Whenever something bad happens. See specific {@link KapuaService} exceptions.
81: * @since 1.0.0
82: */
83: @GET
84: @Produces({MediaType.APPLICATION_XML})
85: public <V extends Comparable<V>> MessageListResult simpleQuery(@PathParam("scopeId") ScopeId scopeId,
86: @QueryParam("clientId") String clientId,
87: @QueryParam("channel") String channel,
88: @QueryParam("strictChannel") boolean strictChannel,
89: @QueryParam("startDate") DateParam startDateParam,
90: @QueryParam("endDate") DateParam endDateParam,
91: @QueryParam("metricName") String metricName,
92: @QueryParam("metricType") MetricType<V> metricType,
93: @QueryParam("metricMin") String metricMinValue,
94: @QueryParam("metricMax") String metricMaxValue,
95: @QueryParam("sortDir") @DefaultValue("DESC") SortDirection sortDir,
96: @QueryParam("offset") @DefaultValue("0") int offset,
97: @QueryParam("limit") @DefaultValue("50") int limit)
98: throws KapuaException {
99:
100: AndPredicate andPredicate = DATASTORE_PREDICATE_FACTORY.newAndPredicate();
101:• if (!Strings.isNullOrEmpty(clientId)) {
102: TermPredicate clientIdPredicate = DATASTORE_PREDICATE_FACTORY.newTermPredicate(MessageField.CLIENT_ID, clientId);
103: andPredicate.getPredicates().add(clientIdPredicate);
104: }
105:
106:• if (!Strings.isNullOrEmpty(channel)) {
107: andPredicate.getPredicates().add(getChannelPredicate(channel, strictChannel));
108: }
109:
110:• Date startDate = startDateParam != null ? startDateParam.getDate() : null;
111:• Date endDate = endDateParam != null ? endDateParam.getDate() : null;
112:• if (startDate != null || endDate != null) {
113: RangePredicate timestampPredicate = DATASTORE_PREDICATE_FACTORY.newRangePredicate(ChannelInfoField.TIMESTAMP, startDate, endDate);
114: andPredicate.getPredicates().add(timestampPredicate);
115: }
116:
117:• if (!Strings.isNullOrEmpty(metricName)) {
118: andPredicate.getPredicates().add(getMetricPredicate(metricName, metricType, metricMinValue, metricMaxValue));
119: }
120:
121: MessageQuery query = MESSAGE_STORE_FACTORY.newQuery(scopeId);
122: query.setPredicate(andPredicate);
123: query.setOffset(offset);
124: query.setLimit(limit);
125:
126: List<SortField> sort = new ArrayList<>();
127: sort.add(SortField.of(MessageSchema.MESSAGE_TIMESTAMP, sortDir));
128: query.setSortFields(sort);
129:
130: return query(scopeId, query);
131: }
132:
133: /**
134: * Stores a new Message under the account of the currently connected user.
135: * In this case, the provided message will only be stored in the back-end
136: * database and it will not be forwarded to the message broker.
137: *
138: * @param message The {@link KapuaDataMessage } to be stored
139: * @return an {@link InsertResponse} object encapsulating the response from
140: * the datastore
141: * @throws KapuaException Whenever something bad happens. See specific
142: * {@link KapuaService} exceptions.
143: */
144: @POST
145: @Consumes({MediaType.APPLICATION_XML})
146: @Produces({MediaType.APPLICATION_XML})
147: public Response storeMessage(@PathParam("scopeId") ScopeId scopeId,
148: KapuaDataMessage message)
149: throws KapuaException {
150: message.setScopeId(scopeId);
151: return returnCreated(new StorableEntityId(MESSAGE_STORE_SERVICE.store(message).toString()));
152: }
153:
154: /**
155: * Queries the results with the given {@link MessageQuery} parameter.
156: *
157: * @param scopeId The {@link ScopeId} in which to search results.
158: * @param query The {@link MessageQuery} to used to filter results.
159: * @return The {@link MessageListResult} of all the result matching the given {@link MessageQuery} parameter.
160: * @throws KapuaException Whenever something bad happens. See specific {@link KapuaService} exceptions.
161: * @since 1.0.0
162: */
163: @POST
164: @Path("_query")
165: @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
166: @Produces({MediaType.APPLICATION_XML})
167: public MessageListResult query(@PathParam("scopeId") ScopeId scopeId,
168: MessageQuery query)
169: throws KapuaException {
170: query.setScopeId(scopeId);
171:
172: return MESSAGE_STORE_SERVICE.query(query);
173: }
174:
175: /**
176: * Counts the results with the given {@link MessageQuery} parameter.
177: *
178: * @param scopeId The {@link ScopeId} in which to search results.
179: * @param query The {@link MessageQuery} to used to filter results.
180: * @return The count of all the result matching the given {@link MessageQuery} parameter.
181: * @throws KapuaException Whenever something bad happens. See specific {@link KapuaService} exceptions.
182: * @since 1.0.0
183: */
184: @POST
185: @Path("_count")
186: @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
187: @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
188: public CountResult count(@PathParam("scopeId") ScopeId scopeId,
189: MessageQuery query)
190: throws KapuaException {
191: query.setScopeId(scopeId);
192:
193: return new CountResult(MESSAGE_STORE_SERVICE.count(query));
194: }
195:
196: /**
197: * Returns the DatastoreMessage specified by the "datastoreMessageId" path parameter.
198: *
199: * @param datastoreMessageId The id of the requested DatastoreMessage.
200: * @return The requested DatastoreMessage object.
201: * @throws KapuaException Whenever something bad happens. See specific {@link KapuaService} exceptions.
202: * @since 1.0.0
203: */
204: @GET
205: @Path("{datastoreMessageId}")
206: @Produces({MediaType.APPLICATION_XML})
207: public DatastoreMessage find(@PathParam("scopeId") ScopeId scopeId,
208: @PathParam("datastoreMessageId") StorableEntityId datastoreMessageId)
209: throws KapuaException {
210: DatastoreMessage datastoreMessage = MESSAGE_STORE_SERVICE.find(scopeId, datastoreMessageId, StorableFetchStyle.SOURCE_FULL);
211:
212: return returnNotNullEntity(datastoreMessage);
213: }
214:
215: private StorablePredicate getChannelPredicate(String channel, boolean strictChannel) {
216: StorablePredicate channelPredicate;
217:• if (strictChannel) {
218: channelPredicate = DATASTORE_PREDICATE_FACTORY.newTermPredicate(ChannelInfoField.CHANNEL, channel);
219: } else {
220: channelPredicate = DATASTORE_PREDICATE_FACTORY.newChannelMatchPredicate(channel);
221: }
222: return channelPredicate;
223: }
224:
225: private <V extends Comparable<V>> StorablePredicate getMetricPredicate(String metricName, MetricType<V> metricType, String metricMinValue, String metricMaxValue) throws KapuaIllegalNullArgumentException {
226:• if (metricMinValue == null && metricMaxValue == null) {
227:• Class<V> type = metricType != null ? metricType.getType() : null;
228: return DATASTORE_PREDICATE_FACTORY.newMetricExistsPredicate(metricName, type);
229: } else {
230:• if (metricType == null) {
231: throw new KapuaIllegalNullArgumentException("metricType");
232: }
233: V minValue = (V) ObjectValueConverter.fromString(metricMinValue, metricType.getType());
234: V maxValue = (V) ObjectValueConverter.fromString(metricMaxValue, metricType.getType());
235:
236: return DATASTORE_PREDICATE_FACTORY.newMetricPredicate(metricName, metricType.getType(), minValue, maxValue);
237: }
238: }
239: }