Skip to content

Package: StreamsJson

StreamsJson

nameinstructionbranchcomplexitylinemethod
StreamsJson()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
lambda$publish$0(KapuaDataPayload, XmlAdaptedMetric)
M: 16 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 4 C: 0
0%
M: 1 C: 0
0%
publish(ScopeId, Long, JsonKapuaDataMessage)
M: 67 C: 0
0%
M: 2 C: 0
0%
M: 2 C: 0
0%
M: 17 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 12 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2020 Eurotech and/or its affiliates and others
3: *
4: * All rights reserved. This program and the accompanying materials
5: * are made available under the terms of the Eclipse Public License v1.0
6: * which accompanies this distribution, and is available at
7: * http://www.eclipse.org/legal/epl-v10.html
8: *
9: * Contributors:
10: * Eurotech - initial API and implementation
11: *******************************************************************************/
12: package org.eclipse.kapua.app.api.resources.v1.resources;
13:
14: import org.eclipse.kapua.KapuaException;
15: import org.eclipse.kapua.app.api.resources.v1.resources.marker.JsonSerializationFixed;
16: import org.eclipse.kapua.app.api.resources.v1.resources.model.ScopeId;
17: import org.eclipse.kapua.app.api.resources.v1.resources.model.data.JsonKapuaDataMessage;
18: import org.eclipse.kapua.locator.KapuaLocator;
19: import org.eclipse.kapua.message.device.data.KapuaDataMessage;
20: import org.eclipse.kapua.message.device.data.KapuaDataMessageFactory;
21: import org.eclipse.kapua.message.device.data.KapuaDataPayload;
22: import org.eclipse.kapua.model.type.ObjectValueConverter;
23:
24: import javax.ws.rs.Consumes;
25: import javax.ws.rs.POST;
26: import javax.ws.rs.Path;
27: import javax.ws.rs.PathParam;
28: import javax.ws.rs.QueryParam;
29: import javax.ws.rs.core.MediaType;
30: import javax.ws.rs.core.Response;
31:
32: /**
33: * @see JsonSerializationFixed
34: */
35: @Path("{scopeId}/streams")
36: public class StreamsJson extends AbstractKapuaResource implements JsonSerializationFixed {
37:
38: private static final KapuaLocator LOCATOR = KapuaLocator.getInstance();
39: private static final KapuaDataMessageFactory KAPUA_DATA_MESSAGE_FACTORY = LOCATOR.getFactory(KapuaDataMessageFactory.class);
40:
41: private static final Streams STREAMS = new Streams();
42:
43: /**
44: * Publishes a fire-and-forget message to a topic composed of:
45: * [account-name] / [client-id] / [semtantic-parts]
46: * In such a schema, the parts are defined as follows:
47: * <ul>
48: * <li>account-name: the name of the current scope</li>
49: * <li>client-id: from the "clientId" property in the body</li>
50: * <li>semantic-parts: array of strings in the "channel" property in the body</li>
51: * </ul>
52: * For example, the following JSON body will publish on the "kapua-sys/AA:BB:CC:DD:EE:FF/one/two/three" topic:
53: * <pre>
54: * {
55: * "type": "kapuaDataMessage",
56: * "position": {
57: * "type": "kapuaPosition",
58: * "latitude": 0,
59: * "longitude": 0
60: * },
61: * "clientId": "AA:BB:CC:DD:EE:FF",
62: * "channel": {
63: * "type": "kapuaDataChannel",
64: * "semanticParts": ["one", "two", "three"]
65: * },
66: * "payload": {
67: * "type": "kapuaDataPayload",
68: * "metrics": [
69: * {
70: * "valueType": "string",
71: * "value": "aaa",
72: * "name": "metric-1"
73: * },
74: * {
75: * "valueType": "string",
76: * "value": "bbb",
77: * "name": "metric-2"
78: * }
79: * ]
80: * }
81: * }
82: * </pre>
83: *
84: * @param scopeId
85: * @param timeout
86: * @param jsonKapuaDataMessage
87: * @return
88: * @throws KapuaException
89: */
90: @POST
91: @Path("messages")
92: @Consumes({MediaType.APPLICATION_JSON})
93: public Response publish(
94: @PathParam("scopeId") ScopeId scopeId,
95: @QueryParam("timeout") Long timeout,
96: JsonKapuaDataMessage jsonKapuaDataMessage) throws KapuaException {
97:
98: KapuaDataMessage kapuaDataMessage = KAPUA_DATA_MESSAGE_FACTORY.newKapuaDataMessage();
99:
100: kapuaDataMessage.setId(jsonKapuaDataMessage.getId());
101: kapuaDataMessage.setScopeId(scopeId);
102: kapuaDataMessage.setDeviceId(jsonKapuaDataMessage.getDeviceId());
103: kapuaDataMessage.setClientId(jsonKapuaDataMessage.getClientId());
104: kapuaDataMessage.setReceivedOn(jsonKapuaDataMessage.getReceivedOn());
105: kapuaDataMessage.setSentOn(jsonKapuaDataMessage.getSentOn());
106: kapuaDataMessage.setCapturedOn(jsonKapuaDataMessage.getCapturedOn());
107: kapuaDataMessage.setPosition(jsonKapuaDataMessage.getPosition());
108: kapuaDataMessage.setChannel(jsonKapuaDataMessage.getChannel());
109:
110: KapuaDataPayload kapuaDataPayload = KAPUA_DATA_MESSAGE_FACTORY.newKapuaDataPayload();
111:
112:• if (jsonKapuaDataMessage.getPayload() != null) {
113: kapuaDataPayload.setBody(jsonKapuaDataMessage.getPayload().getBody());
114:
115: jsonKapuaDataMessage.getPayload().getMetrics().forEach(
116: jsonMetric -> {
117: String name = jsonMetric.getName();
118: Object value = ObjectValueConverter.fromString(jsonMetric.getValue(), jsonMetric.getValueType());
119:
120: kapuaDataPayload.getMetrics().put(name, value);
121: });
122: }
123:
124: kapuaDataMessage.setPayload(kapuaDataPayload);
125:
126: STREAMS.publish(scopeId, timeout, kapuaDataMessage);
127:
128: return returnNoContent();
129: }
130: }