diff --git a/src/mesh/Channels.cpp b/src/mesh/Channels.cpp index f3c692e34..80bcc10c6 100644 --- a/src/mesh/Channels.cpp +++ b/src/mesh/Channels.cpp @@ -15,6 +15,7 @@ Channels channels; const char *Channels::adminChannel = "admin"; const char *Channels::gpioChannel = "gpio"; const char *Channels::serialChannel = "serial"; +const char *Channels::mqttChannel = "mqtt"; uint8_t xorHash(const uint8_t *p, size_t len) { @@ -313,4 +314,4 @@ bool Channels::decryptForHash(ChannelIndex chIndex, ChannelHash channelHash) int16_t Channels::setActiveByIndex(ChannelIndex channelIndex) { return setCrypto(channelIndex); -} +} \ No newline at end of file diff --git a/src/mesh/Channels.h b/src/mesh/Channels.h index b4bdcbd5c..87a72e07b 100644 --- a/src/mesh/Channels.h +++ b/src/mesh/Channels.h @@ -32,7 +32,7 @@ class Channels Channels() {} /// Well known channel names - static const char *adminChannel, *gpioChannel, *serialChannel; + static const char *adminChannel, *gpioChannel, *serialChannel, *mqttChannel; const meshtastic_ChannelSettings &getPrimary() { return getByIndex(getPrimaryIndex()).settings; } @@ -139,4 +139,4 @@ class Channels }; /// Singleton channel table -extern Channels channels; +extern Channels channels; \ No newline at end of file diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index 87dacde7a..70b2d753c 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -17,7 +17,6 @@ #include "mesh/wifi/WiFiAPClient.h" #include #endif -#include "mqtt/JSON.h" #include const int reconnectMax = 5; @@ -49,8 +48,6 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length) payloadStr[length] = 0; // null terminated string JSONValue *json_value = JSON::Parse(payloadStr); if (json_value != NULL) { - LOG_INFO("JSON Received on MQTT, parsing..\n"); - // check if it is a valid envelope JSONObject json; json = json_value->AsObject(); @@ -61,22 +58,21 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length) ptr = strtok(NULL, "/"); } meshtastic_Channel sendChannel = channels.getByName(ptr); - LOG_DEBUG("Found Channel name: %s (Index %d)\n", channels.getGlobalId(sendChannel.index), sendChannel.index); + // We allow downlink JSON packets only on a channel named "mqtt" + if (strncasecmp(channels.getGlobalId(sendChannel.index), Channels::mqttChannel, strlen(Channels::mqttChannel)) == 0 && + sendChannel.settings.downlink_enabled) { + if (isValidJsonEnvelope(json)) { + // this is a valid envelope + if (json["type"]->AsString().compare("sendtext") == 0 && json["payload"]->IsString()) { + std::string jsonPayloadStr = json["payload"]->AsString(); + LOG_INFO("JSON payload %s, length %u\n", jsonPayloadStr.c_str(), jsonPayloadStr.length()); - if ((json.find("sender") != json.end()) && (json.find("payload") != json.end()) && - (json.find("type") != json.end()) && json["type"]->IsString() && - (json["type"]->AsString().compare("sendtext") == 0)) { - // this is a valid envelope - if (json["payload"]->IsString() && json["type"]->IsString() && - (json["sender"]->AsString().compare(owner.id) != 0)) { - std::string jsonPayloadStr = json["payload"]->AsString(); - LOG_INFO("JSON payload %s, length %u\n", jsonPayloadStr.c_str(), jsonPayloadStr.length()); - - // construct protobuf data packet using TEXT_MESSAGE, send it to the mesh - meshtastic_MeshPacket *p = router->allocForSending(); - p->decoded.portnum = meshtastic_PortNum_TEXT_MESSAGE_APP; - p->channel = sendChannel.index; - if (sendChannel.settings.downlink_enabled) { + // construct protobuf data packet using TEXT_MESSAGE, send it to the mesh + meshtastic_MeshPacket *p = router->allocForSending(); + p->decoded.portnum = meshtastic_PortNum_TEXT_MESSAGE_APP; + p->channel = sendChannel.index; + if (json.find("to") != json.end() && json["to"]->IsNumber()) + p->to = json["to"]->AsNumber(); if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) { memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length()); p->decoded.payload.size = jsonPayloadStr.length(); @@ -85,49 +81,42 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length) } else { LOG_WARN("Received MQTT json payload too long, dropping\n"); } - } else { - LOG_WARN("Received MQTT json payload on channel %s, but downlink is disabled, dropping\n", - sendChannel.settings.name); - } - } else { - LOG_DEBUG("JSON Ignoring downlink message we originally sent.\n"); - } - } else if ((json.find("sender") != json.end()) && (json.find("payload") != json.end()) && - (json.find("type") != json.end()) && json["type"]->IsString() && - (json["type"]->AsString().compare("sendposition") == 0)) { - // invent the "sendposition" type for a valid envelope - if (json["payload"]->IsObject() && json["type"]->IsString() && - (json["sender"]->AsString().compare(owner.id) != 0)) { - JSONObject posit; - posit = json["payload"]->AsObject(); // get nested JSON Position - meshtastic_Position pos = meshtastic_Position_init_default; - pos.latitude_i = posit["latitude_i"]->AsNumber(); - pos.longitude_i = posit["longitude_i"]->AsNumber(); - pos.altitude = posit["altitude"]->AsNumber(); - pos.time = posit["time"]->AsNumber(); + } else if (json["type"]->AsString().compare("sendposition") == 0 && json["payload"]->IsObject()) { + // invent the "sendposition" type for a valid envelope + JSONObject posit; + posit = json["payload"]->AsObject(); // get nested JSON Position + meshtastic_Position pos = meshtastic_Position_init_default; + if (posit.find("latitude_i") != posit.end() && posit["latitude_i"]->IsNumber()) + pos.latitude_i = posit["latitude_i"]->AsNumber(); + if (posit.find("longitude_i") != posit.end() && posit["longitude_i"]->IsNumber()) + pos.longitude_i = posit["longitude_i"]->AsNumber(); + if (posit.find("altitude") != posit.end() && posit["altitude"]->IsNumber()) + pos.altitude = posit["altitude"]->AsNumber(); + if (posit.find("time") != posit.end() && posit["time"]->IsNumber()) + pos.time = posit["time"]->AsNumber(); - // construct protobuf data packet using POSITION, send it to the mesh - meshtastic_MeshPacket *p = router->allocForSending(); - p->decoded.portnum = meshtastic_PortNum_POSITION_APP; - p->channel = sendChannel.index; - if (sendChannel.settings.downlink_enabled) { + // construct protobuf data packet using POSITION, send it to the mesh + meshtastic_MeshPacket *p = router->allocForSending(); + p->decoded.portnum = meshtastic_PortNum_POSITION_APP; + p->channel = sendChannel.index; + if (json.find("to") != json.end() && json["to"]->IsNumber()) + p->to = json["to"]->AsNumber(); p->decoded.payload.size = pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &meshtastic_Position_msg, &pos); // make the Data protobuf from position service.sendToMesh(p, RX_SRC_LOCAL); } else { - LOG_WARN("Received MQTT json payload on channel %s, but downlink is disabled, dropping\n", - sendChannel.settings.name); + LOG_DEBUG("JSON Ignoring downlink message with unsupported type.\n"); } } else { - LOG_DEBUG("JSON Ignoring downlink message we originally sent.\n"); + LOG_ERROR("JSON Received payload on MQTT but not a valid envelope.\n"); } } else { - LOG_ERROR("JSON Received payload on MQTT but not a valid envelope\n"); + LOG_WARN("JSON downlink received on channel not called 'mqtt' or without downlink enabled.\n"); } } else { // no json, this is an invalid payload - LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length); + LOG_ERROR("JSON Received payload on MQTT but not a valid JSON\n"); } delete json_value; } else { @@ -818,4 +807,14 @@ std::string MQTT::meshPacketToJson(meshtastic_MeshPacket *mp) delete value; return jsonStr; +} + +bool MQTT::isValidJsonEnvelope(JSONObject &json) +{ + // if "sender" is provided, avoid processing packets we uplinked + return (json.find("sender") != json.end() ? (json["sender"]->AsString().compare(owner.id) != 0) : true) && + (json.find("from") != json.end()) && json["from"]->IsNumber() && + (json["from"]->AsNumber() == nodeDB.getNodeNum()) && // only accept message if the "from" is us + (json.find("type") != json.end()) && json["type"]->IsString() && // should specify a type + (json.find("payload") != json.end()); // should have a payload } \ No newline at end of file diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index fc9f9d454..dfcb75b7d 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -5,6 +5,7 @@ #include "concurrency/OSThread.h" #include "mesh/Channels.h" #include "mesh/generated/meshtastic/mqtt.pb.h" +#include "mqtt/JSON.h" #if HAS_WIFI #include #define HAS_NETWORKING 1 @@ -100,6 +101,9 @@ class MQTT : private concurrency::OSThread void publishStatus(); void publishQueuedMessages(); + // returns true if this is a valid JSON envelope which we accept on downlink + bool isValidJsonEnvelope(JSONObject &json); + /// Return 0 if sleep is okay, veto sleep if we are connected to pubsub server // int preflightSleepCb(void *unused = NULL) { return pubSub.connected() ? 1 : 0; } };