Skip to content

Package: KuraDevice

KuraDevice

nameinstructionbranchcomplexitylinemethod
KuraDevice()
M: 65 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 23 C: 0
0%
M: 1 C: 0
0%
addMoreThanOneDeviceToKuraMock(String)
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
connectionLost(Throwable)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
deliveryComplete(IMqttDeliveryToken)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
extractCallback(byte[])
M: 24 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 6 C: 0
0%
M: 1 C: 0
0%
getClientId()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getMetric(byte[], String)
M: 41 C: 0
0%
M: 6 C: 0
0%
M: 4 C: 0
0%
M: 15 C: 0
0%
M: 1 C: 0
0%
messageArrived(String, MqttMessage)
M: 1536 C: 0
0%
M: 108 C: 0
0%
M: 55 C: 0
0%
M: 257 C: 0
0%
M: 1 C: 0
0%
mqttClientConnect()
M: 42 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 13 C: 0
0%
M: 1 C: 0
0%
mqttClientDisconnect()
M: 43 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 9 C: 0
0%
M: 1 C: 0
0%
mqttClientSetup()
M: 31 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 7 C: 0
0%
M: 1 C: 0
0%
mqttClientSetupForMoreDevices()
M: 188 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 19 C: 0
0%
M: 1 C: 0
0%
sendMessageFromFile(String, int, boolean, String)
M: 16 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
static {...}
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2022 Eurotech and/or its affiliates and others
3: *
4: * This program and the accompanying materials are made
5: * available under the terms of the Eclipse Public License 2.0
6: * which is available at https://www.eclipse.org/legal/epl-2.0/
7: *
8: * SPDX-License-Identifier: EPL-2.0
9: *
10: * Contributors:
11: * Eurotech
12: *******************************************************************************/
13: package org.eclipse.kapua.service.device.registry.steps;
14:
15: import com.google.protobuf.InvalidProtocolBufferException;
16: import org.eclipse.kapua.kura.simulator.proto.KuraPayloadProto;
17: import org.eclipse.kapua.qa.common.Suppressed;
18: import org.eclipse.kapua.service.device.call.message.kura.KuraPayload;
19: import org.eclipse.kapua.service.device.call.message.kura.app.request.KuraRequestPayload;
20: import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
21: import org.eclipse.paho.client.mqttv3.MqttCallback;
22: import org.eclipse.paho.client.mqttv3.MqttClient;
23: import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
24: import org.eclipse.paho.client.mqttv3.MqttException;
25: import org.eclipse.paho.client.mqttv3.MqttMessage;
26: import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
27: import org.slf4j.Logger;
28: import org.slf4j.LoggerFactory;
29:
30: import java.io.IOException;
31: import java.net.URISyntaxException;
32: import java.nio.file.Files;
33: import java.nio.file.Paths;
34: import java.util.Date;
35: import java.util.List;
36:
37: public class KuraDevice implements MqttCallback {
38:
39: private static final Logger LOG = LoggerFactory.getLogger(KuraDevice.class);
40:
41: private static final String JOB_ID = "job.id";
42: private static final String CLIENT_ID = "client.id";
43:
44: private static final String $EDC = "$EDC/";
45: private static final String $EDC_KAPUA_SYS = "$EDC/kapua-sys/";
46: private static final String DEPLOY_V2_REPLY = "/DEPLOY-V2/REPLY/";
47: private static final String DEPLOY_V2_NOTIFY = "/DEPLOY-V2/NOTIFY/";
48: private static final String KEYS_V1_REPLY = "/KEYS-V1/REPLY/";
49:
50: private static final String COMPLETED = "COMPLETED";
51:
52: //
53: // Topics that Kura device is listening to.
54: //
55: private String assetExecRead;
56: private String assetExecWrite;
57: private String cmdExecCommand;
58: private String confGetConfigurations;
59: private String confPutConfigurations;
60: private String deployGetPackages;
61: private String deployExecUninstall;
62: private String deployExecDownload;
63: private String deployGetBundles;
64: private String deployExecStart34;
65: private String deployExecStart95;
66: private String deployExecStop77;
67: private String keystoreGetKeystores;
68: private String keystoreGetKeystoresEntries;
69: private String keystoreGetKeystoresEntry;
70: private String keystorePostKeystoresEntriesCertificate;
71: private String keystorePostKeystoresEntriesKeypair;
72: private String keystorePostKeystoresEntriesCsr;
73: private String keystoreDelKeystoresEntries;
74:
75: /**
76: * URI of mqtt broker.
77: */
78: private static final String BROKER_URI = "tcp://localhost:1883";
79:
80: /**
81: * Mocked Kura device client id.
82: */
83: private static final String CLIENT_ID_RPIONE3 = "rpione3";
84:
85: /**
86: * User with which Mocked Kura device is connecting to Cloud service.
87: */
88: private static final String CLIENT_USER = "kapua-broker";
89:
90: /**
91: * Mocked Kura device password while connecting to Cloud service.
92: */
93: private static final String CLIENT_PASSWORD = "kapua-password";
94:
95: /**
96: * User under which Kura device is listening for messages.
97: */
98: private static final String SERVER_USER = "kapua-sys";
99:
100: /**
101: * Password for system user under which Kura is listening form messages.
102: */
103: private static final String SERVER_PASSWORD = "kapua-password";
104:
105: /**
106: * Account under which Kura device is registered.
107: */
108: private static final String CLIENT_ACCOUNT = "kapua-sys";
109:
110: /**
111: * Kapua system topic and everything under it as mqtt filter.
112: */
113: private static final String TOPIC_FILTER = "$EDC/#";
114:
115: /**
116: * Default quality of service - mqtt.
117: */
118: private static final int QOS = 0;
119:
120: /**
121: * Mqtt client for sending messages form Mocked Kura device.
122: */
123: private MqttClient mqttClient;
124:
125: /**
126: * Mqtt client form listening from messages on Mocked Kura device.
127: */
128: private MqttClient subscribedClient;
129: private String clientId;
130:
131: public boolean bundleStateChanged;
132: public boolean configurationChanged;
133: public boolean packageListChanged;
134: public boolean packageListChangedAfterUninstall;
135: public boolean assetStateChanged;
136:
137: public int keystoreInstalledCertificate;
138: public int keystoreInstalledKeypair;
139:
140: public KuraDevice() {
141: // ASSET-V1
142: assetExecRead = "$EDC/kapua-sys/rpione3/ASSET-V1/EXEC/read";
143: assetExecWrite = "$EDC/kapua-sys/rpione3/ASSET-V1/EXEC/write";
144:
145: // CMD-V1
146: cmdExecCommand = "$EDC/kapua-sys/rpione3/CMD-V1/EXEC/command";
147:
148: // CONF-V1
149: confGetConfigurations = "$EDC/kapua-sys/rpione3/CONF-V1/GET/configurations";
150: confPutConfigurations = "$EDC/kapua-sys/rpione3/CONF-V1/PUT/configurations";
151:
152: // DEPLOY-V2
153: deployGetPackages = "$EDC/kapua-sys/rpione3/DEPLOY-V2/GET/packages";
154: deployExecDownload = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/download";
155: deployExecUninstall = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/uninstall";
156: deployGetBundles = "$EDC/kapua-sys/rpione3/DEPLOY-V2/GET/bundles";
157: deployExecStart34 = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/start/34";
158: deployExecStart95 = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/start/95";
159: deployExecStop77 = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/stop/77";
160:
161: // KEYS-V1
162: keystoreGetKeystores = "$EDC/kapua-sys/rpione3/KEYS-V1/GET/keystores";
163: keystoreGetKeystoresEntries = "$EDC/kapua-sys/rpione3/KEYS-V1/GET/keystores/entries";
164: keystoreGetKeystoresEntry = "$EDC/kapua-sys/rpione3/KEYS-V1/GET/keystores/entries/entry";
165: keystorePostKeystoresEntriesCertificate = "$EDC/kapua-sys/rpione3/KEYS-V1/POST/keystores/entries/certificate";
166: keystorePostKeystoresEntriesKeypair = "$EDC/kapua-sys/rpione3/KEYS-V1/POST/keystores/entries/keypair";
167: keystorePostKeystoresEntriesCsr = "$EDC/kapua-sys/rpione3/KEYS-V1/POST/keystores/entries/csr";
168: keystoreDelKeystoresEntries = "$EDC/kapua-sys/rpione3/KEYS-V1/DEL/keystores/entries";
169:
170: clientId = CLIENT_ID_RPIONE3;
171:
172: mqttClientSetup();
173: }
174:
175: public String getClientId() {
176: return this.clientId;
177: }
178:
179: public void addMoreThanOneDeviceToKuraMock(String name) {
180: clientId = name;
181: mqttClientSetupForMoreDevices();
182: }
183:
184: /**
185: * Disconnect Mocked Kura device mqtt clients that listen and send messages
186: * to mqtt broker.
187: */
188: public void mqttClientDisconnect() {
189: try {
190: try (Suppressed<Exception> s = Suppressed.withException()) {
191: s.run(mqttClient::disconnect);
192: s.run(subscribedClient::disconnect);
193: s.run(mqttClient::close);
194: s.run(subscribedClient::close);
195: }
196: } catch (Exception e) {
197: LOG.warn("Failed during cleanup of Paho resources!", e);
198: }
199: }
200:
201: /**
202: * Connect both listening and sending mqtt client of Mocked Kura device.
203: */
204: public void mqttClientConnect() {
205: MqttConnectOptions clientOpts = new MqttConnectOptions();
206: clientOpts.setUserName(CLIENT_USER);
207: clientOpts.setPassword(CLIENT_PASSWORD.toCharArray());
208:
209: MqttConnectOptions serverOpts = new MqttConnectOptions();
210: serverOpts.setUserName(SERVER_USER);
211: serverOpts.setPassword(SERVER_PASSWORD.toCharArray());
212:
213: try {
214: mqttClient.connect(clientOpts);
215: subscribedClient.connect(serverOpts);
216: subscribedClient.subscribe(TOPIC_FILTER, QOS);
217: } catch (MqttException me) {
218: LOG.error("Error while connecting KuraDevice!", me);
219: }
220: }
221:
222: /**
223: * Prepare client and server part of mocked mqtt.
224: * <p>
225: * MqttClient is meant to simulate Kura device for sending messages,
226: * while subscribedClient is meant to receive messages from Kura device side.
227: */
228: public void mqttClientSetup() {
229: try {
230: mqttClient = new MqttClient(BROKER_URI, CLIENT_ID_RPIONE3, new MemoryPersistence());
231: subscribedClient = new MqttClient(BROKER_URI, MqttClient.generateClientId(), new MemoryPersistence());
232: } catch (MqttException me) {
233: LOG.error("Error while setting up KuraDevice!", me);
234: }
235:
236: subscribedClient.setCallback(this);
237: }
238:
239: /**
240: * MqttClient is meant to simulate Kura device for sending messages,
241: * while subscribedClient is meant to receive messages from Kura device side.
242: */
243: private void mqttClientSetupForMoreDevices() {
244: try {
245: deployGetPackages = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/GET/packages";
246: deployExecDownload = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/download";
247: deployExecUninstall = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/uninstall";
248: deployGetBundles = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/GET/bundles";
249: confGetConfigurations = $EDC_KAPUA_SYS + clientId + "/CONF-V1/GET/configurations";
250: confPutConfigurations = $EDC_KAPUA_SYS + clientId + "/CONF-V1/PUT/configurations";
251: cmdExecCommand = $EDC_KAPUA_SYS + clientId + "/CMD-V1/EXEC/command";
252: deployExecStart34 = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/start/34";
253: deployExecStart95 = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/start/95";
254: deployExecStop77 = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/stop/77";
255:
256: assetExecRead = $EDC_KAPUA_SYS + clientId + "/ASSET-V1/EXEC/read";
257:
258: assetExecWrite = $EDC_KAPUA_SYS + clientId + "/ASSET-V1/EXEC/write";
259:
260: mqttClient = new MqttClient(BROKER_URI, clientId, new MemoryPersistence());
261: subscribedClient = new MqttClient(BROKER_URI, MqttClient.generateClientId(), new MemoryPersistence());
262: } catch (MqttException me) {
263: LOG.error("Error while setting up KuraDevice!", me);
264: }
265: subscribedClient.setCallback(this);
266: }
267:
268: /**
269: * Sending data to mqtt broker. Data is read form file containing pre-recorded response.
270: *
271: * @param topic mqtt broker topic
272: * @param qos mqtt QOS
273: * @param retained is message retained (mqtt specific)
274: * @param fileName name of file and path with pre-recorded response
275: * @throws MqttException
276: * @throws IOException
277: */
278: public void sendMessageFromFile(String topic, int qos, boolean retained, String fileName) throws MqttException, IOException, URISyntaxException {
279: byte[] payload = Files.readAllBytes(Paths.get(getClass().getResource(fileName).toURI()));
280:
281: mqttClient.publish(topic, payload, qos, retained);
282: }
283:
284: /**
285: * Extraction of metrics form Kapua message payload.
286: *
287: * @param payload payload received from Kapua
288: * @param metricKey string representing key of metric
289: * @return string representation of metric value
290: */
291: private String getMetric(byte[] payload, String metricKey) {
292:
293: String value = null;
294: KuraPayloadProto.KuraPayload kuraPayload = null;
295: try {
296: kuraPayload = KuraPayloadProto.KuraPayload.parseFrom(payload);
297: } catch (InvalidProtocolBufferException e) {
298: value = null;
299: }
300:
301:• if (kuraPayload == null) {
302: return value;
303: }
304:
305: List<KuraPayloadProto.KuraPayload.KuraMetric> metrics = kuraPayload.getMetricList();
306:• for (KuraPayloadProto.KuraPayload.KuraMetric metric : metrics) {
307: String name = metric.getName();
308:• if (name.equals(metricKey)) {
309: value = metric.getStringValue();
310: }
311: }
312:
313: return value;
314: }
315:
316: /**
317: * Ectraction of callback parameters form Kapua generated message stored as Metrics.
318: *
319: * @param payload Kapua message
320: * @return tuple with client and request id
321: */
322: private CallbackParam extractCallback(byte[] payload) {
323:
324: CallbackParam callbackParam = new CallbackParam();
325: String clientId = getMetric(payload, "requester.client.id");
326: String requestId = getMetric(payload, "request.id");
327: callbackParam.setClientId(clientId);
328: callbackParam.setRequestId(requestId);
329:
330: return callbackParam;
331: }
332:
333: @Override
334: public void connectionLost(Throwable throwable) {
335: LOG.info("Kapua Mock Device connection to broker lost.");
336: }
337:
338: @Override
339: public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
340: LOG.info("Message arrived in Kapua Mock Device with topic: " + topic);
341:
342: CallbackParam callbackParam;
343: String responseTopic;
344: byte[] responsePayload;
345: byte[] requestPayload = mqttMessage.getPayload();
346:
347: try {
348: // ASSET-V1
349:• if (topic.equals(assetExecRead)) {
350: callbackParam = extractCallback(requestPayload);
351:
352: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/ASSET-V1/REPLY/" + callbackParam.getRequestId();
353:• responsePayload = Files.readAllBytes(Paths.get(getClass().getResource(assetStateChanged ? "/mqtt/KapuaPool-client-id_ASSET-V1_READ_req-id_assets_updated_assets.mqtt" : "/mqtt/KapuaPool-client-id_ASSET-V1_READ_req-id_assets.mqtt").toURI()));
354:
355: mqttClient.publish(responseTopic, responsePayload, 0, false);
356:• } else if (topic.equals(assetExecWrite)) {
357: callbackParam = extractCallback(requestPayload);
358:
359: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/ASSET-V1/REPLY/" + callbackParam.getRequestId();
360: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_ASSET-V1_WRITE_req-id_assets.mqtt").toURI()));
361:
362: assetStateChanged = true;
363: mqttClient.publish(responseTopic, responsePayload, 0, false);
364: }
365: // CMD-V1
366:• else if (topic.equals(cmdExecCommand)) {
367: callbackParam = extractCallback(requestPayload);
368:
369: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/CMD-V1/REPLY/" + callbackParam.getRequestId();
370: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_CMD-V1_REPLY_req-id_command.mqtt").toURI()));
371:
372: mqttClient.publish(responseTopic, responsePayload, 0, false);
373: }
374: // CONF-V1
375:• else if (topic.equals(confGetConfigurations)) {
376: callbackParam = extractCallback(requestPayload);
377:
378: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/CONF-V1/REPLY/" + callbackParam.getRequestId();
379:• responsePayload = Files.readAllBytes(Paths.get(getClass().getResource(configurationChanged ? "/mqtt/KapuaPool-client-id_CONF-V1_REPLY_req-id_updated_configurations.mqtt" : "/mqtt/KapuaPool-client-id_CONF-V1_REPLY_req-id_inital_configurations.mqtt").toURI()));
380:
381: mqttClient.publish(responseTopic, responsePayload, 0, false);
382:• } else if (topic.equals(confPutConfigurations)) {
383: callbackParam = extractCallback(requestPayload);
384:
385: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/CONF-V1/REPLY/" + callbackParam.getRequestId();
386: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_CONF-V1_PUT_configurations.mqtt").toURI()));
387:
388: configurationChanged = true;
389: mqttClient.publish(responseTopic, responsePayload, 0, false);
390: }
391: // DEPLOY-V2
392:• else if (topic.equals(deployGetPackages)) {
393: callbackParam = extractCallback(requestPayload);
394:
395: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
396:• responsePayload = Files.readAllBytes(Paths.get(getClass().getResource(packageListChanged ? "/mqtt/KapuaPool-client-id_DEPLOY-V2_REPLY_req-id_packages_updated_list.mqtt" : (packageListChangedAfterUninstall == true ? "/mqtt/KapuaPoolClient-id_DEPLOY_V2_REPLY_package_list_after_uninstall.mqtt" : "/mqtt/KapuaPool-client-id_DEPLOY-V2_REPLY_req-id_packages_initial_list.mqtt")).toURI()));
397:
398: mqttClient.publish(responseTopic, responsePayload, 0, false);
399:• } else if (topic.equals(deployExecDownload)) {
400: callbackParam = extractCallback(requestPayload);
401: KuraPayload kuraPayloadInitial = new KuraPayload();
402: kuraPayloadInitial.readFromByteArray(requestPayload);
403:
404: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
405: KuraPayload customKuraPayload1 = new KuraPayload();
406: customKuraPayload1.setTimestamp(new Date());
407: customKuraPayload1.getMetrics().put("response.code", 200);
408: responsePayload = customKuraPayload1.toByteArray();
409: mqttClient.publish(responseTopic, responsePayload, 0, false);
410: Thread.sleep(100);
411:
412: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_NOTIFY + clientId + "/download";
413: KuraPayload customKuraPayload2 = new KuraPayload();
414: customKuraPayload2.setTimestamp(new Date());
415: customKuraPayload2.getMetrics().put(JOB_ID, kuraPayloadInitial.getMetrics().get(JOB_ID));
416: customKuraPayload2.getMetrics().put(CLIENT_ID, clientId);
417: customKuraPayload2.getMetrics().put("dp.download.progress", 50);
418: customKuraPayload2.getMetrics().put("dp.download.size", 20409);
419: customKuraPayload2.getMetrics().put("dp.download.status", "IN_PROGRESS");
420: customKuraPayload2.getMetrics().put("dp.download.index", 0);
421: responsePayload = customKuraPayload2.toByteArray();
422: mqttClient.publish(responseTopic, responsePayload, 0, false);
423: Thread.sleep(100);
424:
425: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_NOTIFY + clientId + "/download";
426: KuraPayload customKuraPayload3 = new KuraPayload();
427: customKuraPayload3.setTimestamp(new Date());
428: customKuraPayload3.getMetrics().put(JOB_ID, kuraPayloadInitial.getMetrics().get(JOB_ID));
429: customKuraPayload3.getMetrics().put(CLIENT_ID, clientId);
430: customKuraPayload3.getMetrics().put("dp.download.progress", 100);
431: customKuraPayload3.getMetrics().put("dp.download.size", 20409);
432: customKuraPayload3.getMetrics().put("dp.download.status", COMPLETED);
433: customKuraPayload3.getMetrics().put("dp.download.index", 0);
434: responsePayload = customKuraPayload3.toByteArray();
435: mqttClient.publish(responseTopic, responsePayload, 0, false);
436: Thread.sleep(100);
437:
438: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_NOTIFY + clientId + "/install";
439: KuraPayload customKuraPayload4 = new KuraPayload();
440: customKuraPayload4.setTimestamp(new Date());
441: customKuraPayload4.getMetrics().put("dp.name", "Example Publisher-1.0.300.dp");
442: customKuraPayload4.getMetrics().put(JOB_ID, kuraPayloadInitial.getMetrics().get(JOB_ID));
443: customKuraPayload4.getMetrics().put("dp.install.progress", 100);
444: customKuraPayload4.getMetrics().put("dp.install.status", COMPLETED);
445: customKuraPayload4.getMetrics().put(CLIENT_ID, clientId);
446: responsePayload = customKuraPayload4.toByteArray();
447: mqttClient.publish(responseTopic, responsePayload, 0, false);
448:
449: packageListChanged = true;
450:• } else if (topic.equals(deployExecUninstall)) {
451: callbackParam = extractCallback(requestPayload);
452: KuraPayload kuraPayloadInitial = new KuraPayload();
453: kuraPayloadInitial.readFromByteArray(requestPayload);
454:
455: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
456: KuraPayload customKuraPayload = new KuraPayload();
457:
458: customKuraPayload.setTimestamp(new Date());
459: customKuraPayload.getMetrics().put("response.code", 200);
460: responsePayload = customKuraPayload.toByteArray();
461: mqttClient.publish(responseTopic, responsePayload, 0, false);
462: Thread.sleep(5000);
463:
464: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_NOTIFY + clientId + "/uninstall";
465: KuraPayload customKuraPayload2 = new KuraPayload();
466:
467: customKuraPayload2.setTimestamp(new Date());
468: customKuraPayload2.getMetrics().put(JOB_ID, kuraPayloadInitial.getMetrics().get(JOB_ID));
469: customKuraPayload2.getMetrics().put("dp.name", "org.eclipse.kura.example.beacon");
470: customKuraPayload2.getMetrics().put("dp.uninstall.progress", 100);
471: customKuraPayload2.getMetrics().put("dp.uninstall.status", COMPLETED);
472: customKuraPayload2.getMetrics().put(CLIENT_ID, clientId);
473: responsePayload = customKuraPayload2.toByteArray();
474: mqttClient.publish(responseTopic, responsePayload, 0, false);
475:
476: packageListChangedAfterUninstall = true;
477:• } else if (topic.equals(deployGetBundles)) {
478: callbackParam = extractCallback(requestPayload);
479:
480: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
481:• responsePayload = Files.readAllBytes(Paths.get(getClass().getResource(bundleStateChanged ? "/mqtt/KapuaPool-client-id_DEPLOY-V2_REPLY_req-id_bundles_updated_state.mqtt" : "/mqtt/KapuaPool-client-id_DEPLOY-V2_REPLY_req-id_bundles_inital_state.mqtt").toURI()));
482:
483: mqttClient.publish(responseTopic, responsePayload, 0, false);
484:• } else if (topic.equals(deployExecStart34) || topic.equals(deployExecStart95)) {
485: callbackParam = extractCallback(requestPayload);
486:
487: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
488: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_DEPLOY-V2_EXEC_START_bundle_id.mqtt").toURI()));
489:
490: bundleStateChanged = true;
491: mqttClient.publish(responseTopic, responsePayload, 0, false);
492:• } else if (topic.equals(deployExecStop77)) {
493: callbackParam = extractCallback(requestPayload);
494:
495: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
496: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_DEPLOY-V2_EXEC_STOP_bundle_id.mqtt").toURI()));
497:
498: bundleStateChanged = true;
499: mqttClient.publish(responseTopic, responsePayload, 0, false);
500: }
501: // KEYS-V1
502:• else if (topic.equals(keystoreGetKeystores)) {
503: callbackParam = extractCallback(requestPayload);
504:
505: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + KEYS_V1_REPLY + callbackParam.getRequestId();
506:
507: byte[] responseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_GET_keystores_reply.json").toURI()));
508:
509: KuraPayload kuraResponsePayload = new KuraPayload();
510: kuraResponsePayload.setBody(responseBody);
511: kuraResponsePayload.getMetrics().put("response.code", 200);
512:
513: responsePayload = kuraResponsePayload.toByteArray();
514:
515: mqttClient.publish(responseTopic, responsePayload, 0, false);
516:• } else if (topic.equals(keystoreGetKeystoresEntries)) {
517: callbackParam = extractCallback(requestPayload);
518:
519: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + KEYS_V1_REPLY + callbackParam.getRequestId();
520:
521: KuraRequestPayload kuraRequestPayload = new KuraRequestPayload();
522: kuraRequestPayload.readFromByteArray(requestPayload);
523:
524: byte[] responseBody = null;
525:• if (kuraRequestPayload.hasBody()) {
526: String queryJsonString = new String(kuraRequestPayload.getBody());
527:
528:• if (queryJsonString.contains("alias")) {
529: responseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_GET_keystores_entries_alias_reply.json").toURI()));
530:• } else if (queryJsonString.contains("keystoreServicePid")) {
531: responseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_GET_keystores_entries_keystore_reply.json").toURI()));
532: }
533: } else {
534:
535:• if (keystoreInstalledCertificate == 0 && keystoreInstalledKeypair == 0) {
536: responseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_GET_keystores_entries_default_reply.json").toURI()));
537:• } else if (keystoreInstalledCertificate == 1 && keystoreInstalledKeypair == 0) {
538: responseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_GET_keystores_entries_cert_installed_reply.json").toURI()));
539:• } else if (keystoreInstalledCertificate == 1 && keystoreInstalledKeypair == 1) {
540: responseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_GET_keystores_entries_cert_key_installed_reply.json").toURI()));
541:• } else if (keystoreInstalledCertificate == 0 && keystoreInstalledKeypair == 1) {
542: responseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_GET_keystores_entries_key_installed_reply.json").toURI()));
543: }
544: }
545:
546: KuraPayload kuraResponsePayload = new KuraPayload();
547: kuraResponsePayload.setBody(responseBody);
548: kuraResponsePayload.getMetrics().put("response.code", 200);
549:
550: responsePayload = kuraResponsePayload.toByteArray();
551:
552: mqttClient.publish(responseTopic, responsePayload, 0, false);
553:• } else if (topic.equals(keystoreGetKeystoresEntry)) {
554: callbackParam = extractCallback(requestPayload);
555:
556: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + KEYS_V1_REPLY + callbackParam.getRequestId();
557:
558: KuraRequestPayload kuraRequestPayload = new KuraRequestPayload();
559: kuraRequestPayload.readFromByteArray(requestPayload);
560:
561: byte[] responseBody = null;
562:• if (kuraRequestPayload.hasBody()) {
563: String queryJsonString = new String(kuraRequestPayload.getBody());
564:
565:• if (queryJsonString.contains("\"alias\":\"localhost\"") &&
566:• queryJsonString.contains("\"keystoreServicePid\":\"HttpsKeystore\"")) {
567: responseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_GET_keystores_entry_reply.json").toURI()));
568: }
569: }
570:
571: KuraPayload kuraResponsePayload = new KuraPayload();
572: kuraResponsePayload.setBody(responseBody);
573: kuraResponsePayload.getMetrics().put("response.code", 200);
574:
575: responsePayload = kuraResponsePayload.toByteArray();
576:
577: mqttClient.publish(responseTopic, responsePayload, 0, false);
578:• } else if (topic.equals(keystorePostKeystoresEntriesCertificate)) {
579: callbackParam = extractCallback(requestPayload);
580:
581: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + KEYS_V1_REPLY + callbackParam.getRequestId();
582:
583: KuraRequestPayload kuraRequestPayload = new KuraRequestPayload();
584: kuraRequestPayload.readFromByteArray(requestPayload);
585:
586: int responseCode;
587:• if (kuraRequestPayload.hasBody()) {
588: String queryJsonString = new String(kuraRequestPayload.getBody());
589:
590:• if (queryJsonString.contains("-----BEGIN CERTIFICATE-----") &&
591:• queryJsonString.contains("-----END CERTIFICATE-----")) {
592: keystoreInstalledCertificate++;
593: responseCode = 200;
594: } else {
595: responseCode = 500;
596: }
597: } else {
598: responseCode = 400;
599: }
600:
601: KuraPayload kuraResponsePayload = new KuraPayload();
602: kuraResponsePayload.getMetrics().put("response.code", responseCode);
603: responsePayload = kuraResponsePayload.toByteArray();
604:
605: mqttClient.publish(responseTopic, responsePayload, 0, false);
606:• } else if (topic.equals(keystorePostKeystoresEntriesKeypair)) {
607: callbackParam = extractCallback(requestPayload);
608:
609: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + KEYS_V1_REPLY + callbackParam.getRequestId();
610:
611: KuraRequestPayload kuraRequestPayload = new KuraRequestPayload();
612: kuraRequestPayload.readFromByteArray(requestPayload);
613:
614: int responseCode;
615:• if (kuraRequestPayload.hasBody()) {
616: String queryJsonString = new String(kuraRequestPayload.getBody());
617:
618:• if (queryJsonString.contains("SHA256withRSA") &&
619:• queryJsonString.contains("CN=Let's Encrypt Authority X3,O=Let's Encrypt,C=US")) {
620: keystoreInstalledKeypair++;
621: responseCode = 200;
622: } else {
623: responseCode = 500;
624: }
625: } else {
626: responseCode = 400;
627: }
628:
629: KuraPayload kuraResponsePayload = new KuraPayload();
630: kuraResponsePayload.getMetrics().put("response.code", responseCode);
631: responsePayload = kuraResponsePayload.toByteArray();
632:
633: mqttClient.publish(responseTopic, responsePayload, 0, false);
634:• } else if (topic.equals(keystorePostKeystoresEntriesCsr)) {
635: callbackParam = extractCallback(requestPayload);
636:
637: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + KEYS_V1_REPLY + callbackParam.getRequestId();
638:
639: KuraRequestPayload kuraRequestPayload = new KuraRequestPayload();
640: kuraRequestPayload.readFromByteArray(requestPayload);
641:
642: byte[] resposnseBody = null;
643: int responseCode;
644:• if (kuraRequestPayload.hasBody()) {
645: String queryJsonString = new String(kuraRequestPayload.getBody());
646:
647:• if (queryJsonString.contains("\"signatureAlgorithm\":\"SHA256withRSA\"") &&
648:• queryJsonString.contains("\"attributes\":\"CN=Kura, OU=IoT, O=Eclipse, C=US\"")) {
649:
650:• if (queryJsonString.contains("\"alias\":\"localhostFixed\"")) {
651: resposnseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_POST_keystores_entries_csr_fixed_reply.json").toURI()));
652: responseCode = 200;
653:• } else if (queryJsonString.contains("\"alias\":\"localhostKuraBugged\"")) {
654: resposnseBody = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KEYS-V1_POST_keystores_entries_csr_kurabugged_reply.txt").toURI()));
655: responseCode = 200;
656: } else {
657: responseCode = 500;
658: }
659: } else {
660: responseCode = 500;
661: }
662: } else {
663: responseCode = 400;
664: }
665:
666: KuraPayload kuraResponsePayload = new KuraPayload();
667: kuraResponsePayload.getMetrics().put("response.code", responseCode);
668: kuraResponsePayload.setBody(resposnseBody);
669: responsePayload = kuraResponsePayload.toByteArray();
670:
671: mqttClient.publish(responseTopic, responsePayload, 0, false);
672:• } else if (topic.equals(keystoreDelKeystoresEntries)) {
673: callbackParam = extractCallback(requestPayload);
674:
675: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + KEYS_V1_REPLY + callbackParam.getRequestId();
676:
677: KuraRequestPayload kuraRequestPayload = new KuraRequestPayload();
678: kuraRequestPayload.readFromByteArray(requestPayload);
679:
680: int responseCode;
681:• if (kuraRequestPayload.hasBody()) {
682: String queryJsonString = new String(kuraRequestPayload.getBody());
683:
684:• if (queryJsonString.contains("SSLKeystore") && queryJsonString.contains("qaCertificate")) {
685: keystoreInstalledCertificate--;
686: responseCode = 200;
687:• } else if (queryJsonString.contains("SSLKeystore") && queryJsonString.contains("qaKeypair")) {
688: keystoreInstalledKeypair--;
689: responseCode = 200;
690: } else {
691: responseCode = 500;
692: }
693: } else {
694: responseCode = 400;
695: }
696:
697: KuraPayload kuraResponsePayload = new KuraPayload();
698: kuraResponsePayload.getMetrics().put("response.code", responseCode);
699: responsePayload = kuraResponsePayload.toByteArray();
700:
701: mqttClient.publish(responseTopic, responsePayload, 0, false);
702: }
703: // Fail
704: else {
705: LOG.error("Kapua Mock Device unhandled topic: {}", topic);
706: }
707: } catch (Exception e) {
708: LOG.error("Error while handling the request on topic: {}", topic, e);
709: }
710: }
711:
712: @Override
713: public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
714: LOG.info("Kapua Mock Device message delivery complete.");
715: }
716:
717: /**
718: * Simple tuple for callback parameters.
719: */
720: private class CallbackParam {
721:
722: private String clientId;
723:
724: private String requestId;
725:
726: public String getClientId() {
727: return clientId;
728: }
729:
730: public void setClientId(String clientId) {
731: this.clientId = clientId;
732: }
733:
734: public String getRequestId() {
735: return requestId;
736: }
737:
738: public void setRequestId(String requestId) {
739: this.requestId = requestId;
740: }
741: }
742: }