From d4e6dd32c57014c71520563efc70e9447c8b3c30 Mon Sep 17 00:00:00 2001 From: Michael Kleinhenz Date: Sat, 12 Mar 2022 15:08:57 +0100 Subject: [PATCH] JSON MQTT Integration (#1283) * Added downstream JSON MQTT handling. * Added uplink JSON messaging * Fixed shadow variable. * Added missing dependency. * Fixes Environment -> Telemetry. * Fixed native issue. * Added json11 pio reg dependency. * Fixed json11 library dependency. Co-authored-by: Sacha Weatherstone --- platformio.ini | 3 +- src/mqtt/MQTT.cpp | 181 ++++++++++++++++++++++++++++++++++++++++++---- src/mqtt/MQTT.h | 3 + 3 files changed, 173 insertions(+), 14 deletions(-) diff --git a/platformio.ini b/platformio.ini index f6b6fd62b..284d0492a 100644 --- a/platformio.ini +++ b/platformio.ini @@ -91,6 +91,7 @@ lib_deps = https://github.com/geeksville/ArduinoThread.git#72921ac222eed6f526ba1682023cee290d9aa1b3 PubSubClient nanopb/Nanopb@^0.4.6 + meshtastic/json11@^1.0.2 ; Used for the code analysis in PIO Home / Inspect check_tool = cppcheck @@ -119,7 +120,7 @@ lib_deps = adafruit/Adafruit BME280 Library@^2.2.2 adafruit/Adafruit BME680 Library@^2.0.1 adafruit/Adafruit MCP9808 Library@^2.0.0 - + ; Common settings for ESP targes, mixin with extends = esp32_base [esp32_base] extends = arduino_base diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index ffea9198b..6df41e829 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -1,19 +1,22 @@ #include "MQTT.h" +#include "MeshService.h" #include "NodeDB.h" #include "PowerFSM.h" #include "main.h" #include "mesh/Channels.h" #include "mesh/Router.h" #include "mesh/generated/mqtt.pb.h" +#include "mesh/generated/telemetry.pb.h" #include "sleep.h" #include #include +#include MQTT *mqtt; String statusTopic = "msh/1/stat/"; -String cryptTopic = "msh/1/c/"; // msh/1/c/CHANNELID/NODEID -String txtTopic = "msh/1/txt/"; // msh/1/txt/CHANNELID/NODEID +String cryptTopic = "msh/1/c/"; // msh/1/c/CHANNELID/NODEID +String jsonTopic = "msh/1/json/"; // msh/1/json/CHANNELID/NODEID void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { @@ -25,7 +28,43 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length) // parsing ServiceEnvelope ServiceEnvelope e = ServiceEnvelope_init_default; if (!pb_decode_from_bytes(payload, length, ServiceEnvelope_fields, &e)) { - DEBUG_MSG("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length); + + // check if this is a json payload message + using namespace json11; + char payloadStr[length + 1]; + memcpy(payloadStr, payload, length); + payloadStr[length] = 0; // null terminated string + std::string err; + auto json = Json::parse(payloadStr, err); + if (err.empty()) { + DEBUG_MSG("Received json payload on MQTT, parsing..\n"); + // check if it is a valid envelope + if (json.object_items().count("sender") != 0 && json.object_items().count("payload") != 0) { + // this is a valid envelope + if (json["sender"].string_value().compare(owner.id) != 0) { + std::string jsonPayloadStr = json["payload"].dump(); + DEBUG_MSG("Received json payload %s, length %u\n", jsonPayloadStr.c_str(), jsonPayloadStr.length()); + // construct protobuf data packet using TEXT_MESSAGE, send it to the mesh + MeshPacket *p = router->allocForSending(); + p->decoded.portnum = PortNum_TEXT_MESSAGE_APP; + if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) { + memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length()); + p->decoded.payload.size = jsonPayloadStr.length(); + MeshPacket *packet = packetPool.allocCopy(*p); + service.sendToMesh(packet, RX_SRC_LOCAL); + } else { + DEBUG_MSG("Received MQTT json payload too long, dropping\n"); + } + } else { + DEBUG_MSG("Ignoring downlink message we originally sent.\n"); + } + } else { + DEBUG_MSG("Received json payload on MQTT but not a valid envelope\n"); + } + } else { + // no json, this is an invalid payload + DEBUG_MSG("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length); + } } else { if (strcmp(e.gateway_id, owner.id) == 0) DEBUG_MSG("Ignoring downlink message we originally sent.\n"); @@ -73,11 +112,11 @@ void MQTT::reconnect() const char *mqttPassword = "large4cats"; if (*radioConfig.preferences.mqtt_server) { - serverAddr = radioConfig.preferences.mqtt_server; // Override the default - mqttUsername = radioConfig.preferences.mqtt_username; //do not use the hardcoded credentials for a custom mqtt server + serverAddr = radioConfig.preferences.mqtt_server; // Override the default + mqttUsername = radioConfig.preferences.mqtt_username; // do not use the hardcoded credentials for a custom mqtt server mqttPassword = radioConfig.preferences.mqtt_password; } else { - //we are using the default server. Use the hardcoded credentials by default, but allow overriding + // we are using the default server. Use the hardcoded credentials by default, but allow overriding if (*radioConfig.preferences.mqtt_username && radioConfig.preferences.mqtt_username[0] != '\0') { mqttUsername = radioConfig.preferences.mqtt_username; } @@ -96,7 +135,8 @@ void MQTT::reconnect() } pubSub.setServer(serverAddr, serverPort); - DEBUG_MSG("Connecting to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, serverPort, mqttUsername, mqttPassword); + DEBUG_MSG("Connecting to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, serverPort, mqttUsername, + mqttPassword); auto myStatus = (statusTopic + owner.id); bool connected = pubSub.connect(owner.id, mqttUsername, mqttPassword, myStatus.c_str(), 1, true, "offline"); if (connected) { @@ -123,6 +163,9 @@ void MQTT::sendSubscriptions() String topic = cryptTopic + channels.getGlobalId(i) + "/#"; DEBUG_MSG("Subscribing to %s\n", topic.c_str()); pubSub.subscribe(topic.c_str(), 1); // FIXME, is QOS 1 right? + String topicDecoded = jsonTopic + channels.getGlobalId(i) + "/#"; + DEBUG_MSG("Subscribing to %s\n", topicDecoded.c_str()); + pubSub.subscribe(topicDecoded.c_str(), 1); // FIXME, is QOS 1 right? } } } @@ -195,12 +238,124 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) pubSub.publish(topic.c_str(), bytes, numBytes, false); - // publish to txt topic for messages of type TEXT_MESSAGE_APP - if (mp.decoded.portnum == PortNum_TEXT_MESSAGE_APP) { - String plaintextTopic = txtTopic + channelId + "/" + owner.id; - DEBUG_MSG("publish txt %s, %u bytes\n", topic.c_str(), numBytes); - auto &p = mp.decoded; - pubSub.publish(plaintextTopic.c_str(), p.payload.bytes, p.payload.size, false); + // handle json topic + using namespace json11; + auto jsonString = this->downstreamPacketToJson((MeshPacket *)&mp); + if (jsonString.length() != 0) { + String topicJson = jsonTopic + channelId + "/" + owner.id; + DEBUG_MSG("publish json message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); + pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); } } } + +// converts a downstream packet into a json message +String MQTT::downstreamPacketToJson(MeshPacket *mp) +{ + using namespace json11; + + // the created jsonObj is immutable after creation, so + // we need to do the heavy lifting before assembling it. + String msgType; + Json msgPayload; + + switch (mp->decoded.portnum) { + case PortNum_TEXT_MESSAGE_APP: { + msgType = "text"; + // convert bytes to string + DEBUG_MSG("got text message of size %u\n", mp->decoded.payload.size); + char payloadStr[(mp->decoded.payload.size) + 1]; + memcpy(payloadStr, mp->decoded.payload.bytes, mp->decoded.payload.size); + payloadStr[mp->decoded.payload.size] = 0; // null terminated string + // check if this is a JSON payload + std::string err; + auto json = Json::parse(payloadStr, err); + if (err.empty()) { + DEBUG_MSG("text message payload is of type json\n"); + // if it is, then we can just use the json object + msgPayload = json; + } else { + // if it isn't, then we need to create a json object + // with the string as the value + DEBUG_MSG("text message payload is of type plaintext\n"); + msgPayload = Json::object({{"text", payloadStr}}); + } + break; + } + case PortNum_TELEMETRY_APP: { + msgType = "telemetry"; + Telemetry scratch; + Telemetry *decoded = NULL; + if (mp->which_payloadVariant == MeshPacket_decoded_tag) { + memset(&scratch, 0, sizeof(scratch)); + if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &Telemetry_msg, + &scratch)) { + decoded = &scratch; + msgPayload = Json::object{ + {"temperature", decoded->temperature}, + {"relative_humidity", decoded->relative_humidity}, + {"barometric_pressure", decoded->barometric_pressure}, + {"gas_resistance", decoded->gas_resistance}, + {"voltage", decoded->voltage}, + {"current", decoded->current}, + }; + } else + DEBUG_MSG("Error decoding protobuf for telemetry message!\n"); + }; + break; + } + case PortNum_NODEINFO_APP: { + msgType = "nodeinfo"; + User scratch; + User *decoded = NULL; + if (mp->which_payloadVariant == MeshPacket_decoded_tag) { + memset(&scratch, 0, sizeof(scratch)); + if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &User_msg, &scratch)) { + decoded = &scratch; + msgPayload = Json::object{{"id", decoded->id}, + {"longname", decoded->long_name}, + {"shortname", decoded->short_name}, + {"hardware", decoded->hw_model}}; + + } else + DEBUG_MSG("Error decoding protobuf for nodeinfo message!\n"); + }; + break; + } + case PortNum_POSITION_APP: { + msgType = "position"; + Position scratch; + Position *decoded = NULL; + if (mp->which_payloadVariant == MeshPacket_decoded_tag) { + memset(&scratch, 0, sizeof(scratch)); + if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &Position_msg, &scratch)) { + decoded = &scratch; + msgPayload = Json::object{ + {"latitude_i", decoded->latitude_i}, {"longitude_i", decoded->longitude_i}, {"altitude", decoded->altitude}}; + } else { + DEBUG_MSG("Error decoding protobuf for position message!\n"); + } + }; + break; + } + // add more packet types here if needed + default: + break; + } + + // assemble the final jsonObj + Json jsonObj = Json::object{{"id", Json((int)mp->id)}, + {"timestamp", Json((int)mp->rx_time)}, + {"to", Json((int)mp->to)}, + {"from", Json((int)mp->from)}, + {"channel", Json((int)mp->channel)}, + {"type", msgType.c_str()}, + {"sender", owner.id}, + {"payload", msgPayload}}; + + // serialize and return it + std::string jsonStr = jsonObj.dump(); + DEBUG_MSG("serialized json message: %s\n", jsonStr.c_str()); + + return jsonStr.c_str(); +} \ No newline at end of file diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index e03ad67b3..cfb68354e 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -57,6 +57,9 @@ class MQTT : private concurrency::OSThread /// Called when a new publish arrives from the MQTT server void onPublish(char *topic, byte *payload, unsigned int length); + /// Called when a new publish arrives from the MQTT server + String downstreamPacketToJson(MeshPacket *mp); + /// Return 0 if sleep is okay, veto sleep if we are connected to pubsub server // int preflightSleepCb(void *unused = NULL) { return pubSub.connected() ? 1 : 0; } };