Skip to content

Package: KuraDevice$CallbackParam

KuraDevice$CallbackParam

nameinstructionbranchcomplexitylinemethod
KuraDevice.CallbackParam(KuraDevice)
M: 6 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getClientId()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
getRequestId()
M: 3 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%
setClientId(String)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
setRequestId(String)
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2017, 2020 Eurotech and/or its affiliates and others
3: *
4: * All rights reserved. This program and the accompanying materials
5: * are made available under the terms of the Eclipse Public License v1.0
6: * which accompanies this distribution, and is available at
7: * http://www.eclipse.org/legal/epl-v10.html
8: *
9: * Contributors:
10: * Eurotech
11: *******************************************************************************/
12: package org.eclipse.kapua.service.device.registry.steps;
13:
14: import java.io.IOException;
15: import java.net.URISyntaxException;
16: import java.nio.file.Files;
17: import java.nio.file.Paths;
18: import java.util.Date;
19: import java.util.List;
20:
21: import org.eclipse.kapua.qa.common.Suppressed;
22: import org.eclipse.kapua.service.device.call.message.kura.KuraPayload;
23: import org.eclipse.kura.core.message.protobuf.KuraPayloadProto;
24: import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
25: import org.eclipse.paho.client.mqttv3.MqttCallback;
26: import org.eclipse.paho.client.mqttv3.MqttClient;
27: import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
28: import org.eclipse.paho.client.mqttv3.MqttException;
29: import org.eclipse.paho.client.mqttv3.MqttMessage;
30: import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
31: import org.slf4j.Logger;
32: import org.slf4j.LoggerFactory;
33:
34: import com.google.protobuf.InvalidProtocolBufferException;
35:
36: public class KuraDevice implements MqttCallback {
37:
38: /**
39: * Logger.
40: */
41: private static final Logger logger = LoggerFactory.getLogger(KuraDevice.class);
42:
43: private static final String $EDC_KAPUA_SYS = "$EDC/kapua-sys/";
44: private static final String $EDC = "$EDC/";
45: private static final String DEPLOY_V2_REPLY = "/DEPLOY-V2/REPLY/";
46: private static final String DEPLOY_V2_NOTIFY = "/DEPLOY-V2/NOTIFY/";
47: private static final String JOB_ID = "job.id";
48: private static final String CLIENT_ID = "client.id";
49: private static final String COMPLETED = "COMPLETED";
50:
51: /**
52: * Topics that Kura device is listening to.
53: */
54: private String deployPackages;
55: private String deployV2ExecDownloadPackage;
56: private String uninstallPackage;
57: private String deployBundles;
58: private String deployConf;
59: private String putConf;
60: private String cmdExec;
61: private String deployV2ExecStart34;
62: private String deployV2ExecStart95;
63: private String deployV2ExecStop77;
64: private String readAssets;
65: private String writeAsset;
66:
67: /**
68: * URI of mqtt broker.
69: */
70: private static final String BROKER_URI = "tcp://localhost:1883";
71:
72: /**
73: * Mocked Kura device client id.
74: */
75: private static final String CLIENT_ID_RPIONE3 = "rpione3";
76:
77: /**
78: * User with which Mocked Kura device is connecting to Cloud service.
79: */
80: private static final String CLIENT_USER = "kapua-broker";
81:
82: /**
83: * Mocked Kura device password while connecting to Cloud service.
84: */
85: private static final String CLIENT_PASSWORD = "kapua-password";
86:
87: /**
88: * User under which Kura device is listening for messages.
89: */
90: private static final String SERVER_USER = "kapua-sys";
91:
92: /**
93: * Password for system user under which Kura is listening form messages.
94: */
95: private static final String SERVER_PASSWORD = "kapua-password";
96:
97: /**
98: * Account under which Kura device is registered.
99: */
100: private static final String CLIENT_ACCOUNT = "kapua-sys";
101:
102: /**
103: * Kapua system topic and everything under it as mqtt filter.
104: */
105: private static final String TOPIC_FILTER = "$EDC/#";
106:
107: /**
108: * Default quality of service - mqtt.
109: */
110: private static final int QOS = 0;
111:
112: /**
113: * Mqtt client for sending messages form Mocked Kura device.
114: */
115: private MqttClient mqttClient;
116:
117: /**
118: * Mqtt client form listening from messages on Mocked Kura device.
119: */
120: private MqttClient subscribedClient;
121: private String clientId;
122:
123: public boolean bundleStateChanged;
124: public boolean configurationChanged;
125: public boolean packageListChanged;
126: public boolean packageListChangedAfterUninstall;
127: public boolean assetStateChanged;
128:
129: public KuraDevice() {
130: deployPackages = "$EDC/kapua-sys/rpione3/DEPLOY-V2/GET/packages";
131: deployV2ExecDownloadPackage = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/download";
132: uninstallPackage = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/uninstall";
133: deployBundles = "$EDC/kapua-sys/rpione3/DEPLOY-V2/GET/bundles";
134: deployConf = "$EDC/kapua-sys/rpione3/CONF-V1/GET/configurations";
135: putConf = "$EDC/kapua-sys/rpione3/CONF-V1/PUT/configurations";
136: cmdExec = "$EDC/kapua-sys/rpione3/CMD-V1/EXEC/command";
137: deployV2ExecStart34 = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/start/34";
138: deployV2ExecStart95 = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/start/95";
139: deployV2ExecStop77 = "$EDC/kapua-sys/rpione3/DEPLOY-V2/EXEC/stop/77";
140:
141: readAssets = "$EDC/kapua-sys/rpione3/ASSET-V1/EXEC/read";
142:
143: writeAsset = "$EDC/kapua-sys/rpione3/ASSET-V1/EXEC/write";
144:
145: clientId = CLIENT_ID_RPIONE3;
146:
147: mqttClientSetup();
148: }
149:
150: public String getClientId() {
151: return this.clientId;
152: }
153:
154: public void addMoreThanOneDeviceToKuraMock(String name){
155: clientId = name;
156: mqttClientSetupForMoreDevices();
157: }
158:
159: /**
160: * Disconnect Mocked Kura device mqtt clients that listen and send messages
161: * to mqtt broker.
162: */
163: public void mqttClientDisconnect() {
164: try {
165: try (final Suppressed<Exception> s = Suppressed.withException()) {
166: s.run(mqttClient::disconnect);
167: s.run(subscribedClient::disconnect);
168: s.run(mqttClient::close);
169: s.run(subscribedClient::close);
170: }
171: } catch (final Exception e) {
172: logger.warn("Failed during cleanup of Paho resources", e);
173: }
174: }
175:
176: /**
177: * Connect both listening and sending mqtt client of Mocked Kura device.
178: */
179: public void mqttClientConnect() {
180:
181: MqttConnectOptions clientOpts = new MqttConnectOptions();
182: clientOpts.setUserName(CLIENT_USER);
183: clientOpts.setPassword(CLIENT_PASSWORD.toCharArray());
184: MqttConnectOptions serverOpts = new MqttConnectOptions();
185: serverOpts.setUserName(SERVER_USER);
186: serverOpts.setPassword(SERVER_PASSWORD.toCharArray());
187: try {
188: mqttClient.connect(clientOpts);
189: subscribedClient.connect(serverOpts);
190: subscribedClient.subscribe(TOPIC_FILTER, QOS);
191: } catch (MqttException e) {
192: e.printStackTrace();
193: }
194: }
195:
196: /**
197: * Prepare client and server part of mocked mqtt.
198: */
199: public void mqttClientSetup() {
200: /*
201: * mqttClient is meant to simulate Kura device for sending messages,
202: * while subscribedClient is meant to receive messages from Kura device side.
203: */
204: try {
205: mqttClient = new MqttClient(BROKER_URI, CLIENT_ID_RPIONE3,
206: new MemoryPersistence());
207: subscribedClient = new MqttClient(BROKER_URI, MqttClient.generateClientId(),
208: new MemoryPersistence());
209: } catch (MqttException e) {
210: e.printStackTrace();
211: }
212:
213: subscribedClient.setCallback(this);
214: }
215:
216: private void mqttClientSetupForMoreDevices() {
217: /*
218: * mqttClient is meant to simulate Kura device for sending messages,
219: * while subscribedClient is meant to receive messages from Kura device side.
220: */
221: try {
222: deployPackages = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/GET/packages";
223: deployV2ExecDownloadPackage = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/download";
224: uninstallPackage = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/uninstall";
225: deployBundles = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/GET/bundles";
226: deployConf = $EDC_KAPUA_SYS + clientId + "/CONF-V1/GET/configurations";
227: putConf = $EDC_KAPUA_SYS + clientId + "/CONF-V1/PUT/configurations";
228: cmdExec = $EDC_KAPUA_SYS + clientId + "/CMD-V1/EXEC/command";
229: deployV2ExecStart34 = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/start/34";
230: deployV2ExecStart95 = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/start/95";
231: deployV2ExecStop77 = $EDC_KAPUA_SYS + clientId + "/DEPLOY-V2/EXEC/stop/77";
232:
233: readAssets = $EDC_KAPUA_SYS + clientId + "/ASSET-V1/EXEC/read";
234:
235: writeAsset = $EDC_KAPUA_SYS + clientId + "/ASSET-V1/EXEC/write";
236:
237: mqttClient = new MqttClient(BROKER_URI, clientId,
238: new MemoryPersistence());
239: subscribedClient = new MqttClient(BROKER_URI, MqttClient.generateClientId(),
240: new MemoryPersistence());
241: } catch (MqttException e) {
242: e.printStackTrace();
243: }
244: subscribedClient.setCallback(this);
245: }
246:
247: /**
248: * Sending data to mqtt broker. Data is read form file containing pre-recorded response.
249: *
250: * @param topic
251: * mqtt broker topic
252: * @param qos
253: * mqtt QOS
254: * @param retained
255: * is message retained (mqtt specific)
256: * @param fileName
257: * name of file and path with pre-recorded response
258: * @throws MqttException
259: * @throws IOException
260: */
261: public void sendMessageFromFile(String topic, int qos, boolean retained, String fileName) throws MqttException, IOException, URISyntaxException {
262: byte[] payload = Files.readAllBytes(Paths.get(getClass().getResource(fileName).toURI()));
263:
264: mqttClient.publish(topic, payload, qos, retained);
265: }
266:
267: /**
268: * Extraction of metrics form Kapua message payload.
269: *
270: * @param payload
271: * payload received from Kapua
272: * @param metricKey
273: * string representing key of metric
274: * @return string representation of metric value
275: */
276: private String getMetric(byte[] payload, String metricKey) {
277:
278: String value = null;
279: KuraPayloadProto.KuraPayload kuraPayload = null;
280: try {
281: kuraPayload = KuraPayloadProto.KuraPayload.parseFrom(payload);
282: } catch (InvalidProtocolBufferException e) {
283: value = null;
284: }
285: if (kuraPayload == null) {
286: return value;
287: }
288:
289: List<KuraPayloadProto.KuraPayload.KuraMetric> metrics = kuraPayload.getMetricList();
290: for (KuraPayloadProto.KuraPayload.KuraMetric metric : metrics) {
291: String name = metric.getName();
292: if (name.equals(metricKey)) {
293: value = metric.getStringValue();
294: }
295: }
296:
297: return value;
298: }
299:
300: /**
301: * Ectraction of callback parameters form Kapua generated message stored as Metrics.
302: *
303: * @param payload
304: * Kapua message
305: * @return tuple with client and request id
306: */
307: private CallbackParam extractCallback(byte[] payload) {
308:
309: CallbackParam callbackParam = new CallbackParam();
310: String clientId = getMetric(payload, "requester.client.id");
311: String requestId = getMetric(payload, "request.id");
312: callbackParam.setClientId(clientId);
313: callbackParam.setRequestId(requestId);
314:
315: return callbackParam;
316: }
317:
318: @Override
319: public void connectionLost(Throwable throwable) {
320: logger.info("Kapua Mock Device connection to broker lost.");
321: }
322:
323: @Override
324: public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
325: logger.info("Message arrived in Kapua Mock Device with topic: " + topic);
326:
327: CallbackParam callbackParam = null;
328: String responseTopic = null;
329: byte[] responsePayload = null;
330: byte[] payload = mqttMessage.getPayload();
331:
332: if (topic.equals(deployPackages)) {
333: callbackParam = extractCallback(payload);
334:
335: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
336: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource(packageListChanged == true ? "/mqtt/KapuaPool-client-id_DEPLOY-V2_REPLY_req-id_packages_updated_list.mqtt" : (packageListChangedAfterUninstall == true ? "/mqtt/KapuaPoolClient-id_DEPLOY_V2_REPLY_package_list_after_uninstall.mqtt" : "/mqtt/KapuaPool-client-id_DEPLOY-V2_REPLY_req-id_packages_initial_list.mqtt")).toURI()));
337:
338: mqttClient.publish(responseTopic, responsePayload, 0, false);
339: } else if (topic.equals(deployV2ExecDownloadPackage)) {
340: callbackParam = extractCallback(payload);
341: KuraPayload kuraPayloadInitial = new KuraPayload();
342: kuraPayloadInitial.readFromByteArray(payload);
343:
344: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
345: KuraPayload customKuraPayload1 = new KuraPayload();
346: customKuraPayload1.setTimestamp(new Date());
347: customKuraPayload1.getMetrics().put("response.code", 200);
348: responsePayload = customKuraPayload1.toByteArray();
349: mqttClient.publish(responseTopic, responsePayload, 0, false);
350: Thread.sleep(100);
351:
352: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_NOTIFY + clientId + "/download";
353: KuraPayload customKuraPayload2 = new KuraPayload();
354: customKuraPayload2.setTimestamp(new Date());
355: customKuraPayload2.getMetrics().put(JOB_ID, kuraPayloadInitial.getMetrics().get(JOB_ID));
356: customKuraPayload2.getMetrics().put(CLIENT_ID, clientId);
357: customKuraPayload2.getMetrics().put("dp.download.progress", 50);
358: customKuraPayload2.getMetrics().put("dp.download.size", 20409);
359: customKuraPayload2.getMetrics().put("dp.download.status", "IN_PROGRESS");
360: customKuraPayload2.getMetrics().put("dp.download.index", 0);
361: responsePayload = customKuraPayload2.toByteArray();
362: mqttClient.publish(responseTopic, responsePayload , 0, false);
363: Thread.sleep(100);
364:
365: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_NOTIFY + clientId + "/download";
366: KuraPayload customKuraPayload3 = new KuraPayload();
367: customKuraPayload3.setTimestamp(new Date());
368: customKuraPayload3.getMetrics().put(JOB_ID, kuraPayloadInitial.getMetrics().get(JOB_ID));
369: customKuraPayload3.getMetrics().put(CLIENT_ID, clientId);
370: customKuraPayload3.getMetrics().put("dp.download.progress", 100);
371: customKuraPayload3.getMetrics().put("dp.download.size", 20409);
372: customKuraPayload3.getMetrics().put("dp.download.status", COMPLETED);
373: customKuraPayload3.getMetrics().put("dp.download.index", 0);
374: responsePayload = customKuraPayload3.toByteArray();
375: mqttClient.publish(responseTopic, responsePayload, 0, false);
376: Thread.sleep(100);
377:
378: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_NOTIFY + clientId + "/install";
379: KuraPayload customKuraPayload4 = new KuraPayload();
380: customKuraPayload4.setTimestamp(new Date());
381: customKuraPayload4.getMetrics().put("dp.name", "Example Publisher-1.0.300.dp");
382: customKuraPayload4.getMetrics().put(JOB_ID, kuraPayloadInitial.getMetrics().get(JOB_ID));
383: customKuraPayload4.getMetrics().put("dp.install.progress", 100);
384: customKuraPayload4.getMetrics().put("dp.install.status", COMPLETED);
385: customKuraPayload4.getMetrics().put(CLIENT_ID, clientId);
386: responsePayload = customKuraPayload4.toByteArray();
387: mqttClient.publish(responseTopic, responsePayload, 0, false);
388:
389: packageListChanged = true;
390: } else if (topic.equals(uninstallPackage)) {
391: callbackParam = extractCallback(payload);
392: KuraPayload kuraPayloadInitial = new KuraPayload();
393: kuraPayloadInitial.readFromByteArray(payload);
394:
395: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
396: KuraPayload customKuraPayload = new KuraPayload();
397:
398: customKuraPayload.setTimestamp(new Date());
399: customKuraPayload.getMetrics().put("response.code", 200);
400: responsePayload = customKuraPayload.toByteArray();
401: mqttClient.publish(responseTopic, responsePayload, 0, false);
402: Thread.sleep(5000);
403:
404: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_NOTIFY + clientId + "/uninstall";
405: KuraPayload customKuraPayload2 = new KuraPayload();
406:
407: customKuraPayload2.setTimestamp(new Date());
408: customKuraPayload2.getMetrics().put(JOB_ID, kuraPayloadInitial.getMetrics().get(JOB_ID));
409: customKuraPayload2.getMetrics().put("dp.name", "org.eclipse.kura.example.beacon");
410: customKuraPayload2.getMetrics().put("dp.uninstall.progress", 100);
411: customKuraPayload2.getMetrics().put("dp.uninstall.status", COMPLETED);
412: customKuraPayload2.getMetrics().put(CLIENT_ID, clientId);
413: responsePayload = customKuraPayload2.toByteArray();
414: mqttClient.publish(responseTopic, responsePayload , 0, false);
415:
416: packageListChangedAfterUninstall = true;
417: } else if (topic.equals(deployBundles)) {
418: callbackParam = extractCallback(payload);
419:
420: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
421: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource(bundleStateChanged == true ? "/mqtt/KapuaPool-client-id_DEPLOY-V2_REPLY_req-id_bundles_updated_state.mqtt" : "/mqtt/KapuaPool-client-id_DEPLOY-V2_REPLY_req-id_bundles_inital_state.mqtt").toURI()));
422:
423: mqttClient.publish(responseTopic, responsePayload, 0, false);
424: } else if (topic.equals(deployConf)) {
425: callbackParam = extractCallback(payload);
426:
427: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/CONF-V1/REPLY/" + callbackParam.getRequestId();
428: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource( configurationChanged == true ? "/mqtt/KapuaPool-client-id_CONF-V1_REPLY_req-id_updated_configurations.mqtt" : "/mqtt/KapuaPool-client-id_CONF-V1_REPLY_req-id_inital_configurations.mqtt").toURI()));
429:
430: mqttClient.publish(responseTopic, responsePayload, 0, false);
431: }
432: else if (topic.equals(putConf)) {
433: callbackParam = extractCallback(payload);
434:
435: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/CONF-V1/REPLY/" + callbackParam.getRequestId();
436: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_CONF-V1_PUT_configurations.mqtt").toURI()));
437:
438: configurationChanged = true;
439: mqttClient.publish(responseTopic, responsePayload, 0, false);
440: } else if (topic.equals(cmdExec)) {
441: callbackParam = extractCallback(payload);
442:
443: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/CMD-V1/REPLY/" + callbackParam.getRequestId();
444: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_CMD-V1_REPLY_req-id_command.mqtt").toURI()));
445:
446: mqttClient.publish(responseTopic, responsePayload, 0, false);
447: } else if (topic.equals(deployV2ExecStart34) || topic.equals(deployV2ExecStart95)) {
448: callbackParam = extractCallback(payload);
449:
450: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
451: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_DEPLOY-V2_EXEC_START_bundle_id.mqtt").toURI()));
452:
453: bundleStateChanged = true;
454: mqttClient.publish(responseTopic, responsePayload, 0, false);
455: } else if (topic.equals(deployV2ExecStop77)) {
456: callbackParam = extractCallback(payload);
457:
458: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + DEPLOY_V2_REPLY + callbackParam.getRequestId();
459: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_DEPLOY-V2_EXEC_STOP_bundle_id.mqtt").toURI()));
460:
461: bundleStateChanged = true;
462: mqttClient.publish(responseTopic, responsePayload, 0, false);
463: } else if (topic.equals(readAssets)) {
464: callbackParam = extractCallback(payload);
465:
466: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/ASSET-V1/REPLY/" + callbackParam.getRequestId();
467: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource(assetStateChanged == true ? "/mqtt/KapuaPool-client-id_ASSET-V1_READ_req-id_assets_updated_assets.mqtt" : "/mqtt/KapuaPool-client-id_ASSET-V1_READ_req-id_assets.mqtt").toURI()));
468:
469: mqttClient.publish(responseTopic, responsePayload, 0, false);
470: } else if (topic.equals(writeAsset)) {
471: callbackParam = extractCallback(payload);
472:
473: responseTopic = $EDC + CLIENT_ACCOUNT + "/" + callbackParam.getClientId() + "/ASSET-V1/REPLY/" + callbackParam.getRequestId();
474: responsePayload = Files.readAllBytes(Paths.get(getClass().getResource("/mqtt/KapuaPool-client-id_ASSET-V1_WRITE_req-id_assets.mqtt").toURI()));
475:
476: assetStateChanged = true;
477: mqttClient.publish(responseTopic, responsePayload, 0, false);
478: } else {
479: logger.error("Kapua Mock Device unhandled topic: " + topic);
480: }
481: }
482:
483: @Override
484: public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
485: logger.info("Kapua Mock Device message delivery complete.");
486: }
487:
488: /**
489: * Simple tuple for callback parameters.
490: */
491: private class CallbackParam {
492:
493: private String clientId;
494:
495: private String requestId;
496:
497: public String getClientId() {
498: return clientId;
499: }
500:
501: public void setClientId(String clientId) {
502: this.clientId = clientId;
503: }
504:
505: public String getRequestId() {
506: return requestId;
507: }
508:
509: public void setRequestId(String requestId) {
510: this.requestId = requestId;
511: }
512: }
513: }