diff --git a/src/mesh/MemoryPool.h b/src/mesh/MemoryPool.h index d30404b9f..c4af3c4ac 100644 --- a/src/mesh/MemoryPool.h +++ b/src/mesh/MemoryPool.h @@ -2,6 +2,8 @@ #include #include +#include +#include #include "PointerQueue.h" @@ -9,6 +11,7 @@ template class Allocator { public: + Allocator() : deleter([this](T *p) { this->release(p); }) {} virtual ~Allocator() {} /// Return a queable object which has been prefilled with zeros. Panic if no buffer is available @@ -43,12 +46,32 @@ template class Allocator return p; } + /// Variations of the above methods that return std::unique_ptr instead of raw pointers. + using UniqueAllocation = std::unique_ptr &>; + /// Return a queable object which has been prefilled with zeros. + /// std::unique_ptr wrapped variant of allocZeroed(). + UniqueAllocation allocUniqueZeroed() { return UniqueAllocation(allocZeroed(), deleter); } + /// Return a queable object which has been prefilled with zeros - allow timeout to wait for available buffers (you probably + /// don't want this version). + /// std::unique_ptr wrapped variant of allocZeroed(TickType_t maxWait). + UniqueAllocation allocUniqueZeroed(TickType_t maxWait) { return UniqueAllocation(allocZeroed(maxWait), deleter); } + /// Return a queable object which is a copy of some other object + /// std::unique_ptr wrapped variant of allocCopy(const T &src, TickType_t maxWait). + UniqueAllocation allocUniqueCopy(const T &src, TickType_t maxWait = portMAX_DELAY) + { + return UniqueAllocation(allocCopy(src, maxWait), deleter); + } + /// Return a buffer for use by others virtual void release(T *p) = 0; protected: // Alloc some storage virtual T *alloc(TickType_t maxWait) = 0; + + private: + // std::unique_ptr Deleter function; calls release(). + const std::function deleter; }; /** diff --git a/src/mesh/MeshTypes.h b/src/mesh/MeshTypes.h index cf1b54c78..1d6bd342d 100644 --- a/src/mesh/MeshTypes.h +++ b/src/mesh/MeshTypes.h @@ -44,6 +44,7 @@ typedef int ErrorCode; /// Alloc and free packets to our global, ISR safe pool extern Allocator &packetPool; +using UniquePacketPoolPacket = Allocator::UniqueAllocation; /** * Most (but not always) of the time we want to treat packets 'from' the local phone (where from == 0), as if they originated on diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index 967db04d6..1f7a06787 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -23,11 +23,14 @@ #include "serialization/MeshPacketSerializer.h" #include #include - -const int reconnectMax = 5; +#include MQTT *mqtt; +namespace +{ +constexpr int reconnectMax = 5; + static MemoryDynamic staticMqttPool; Allocator &mqttPool = staticMqttPool; @@ -37,6 +40,167 @@ static uint8_t bytes[meshtastic_MqttClientProxyMessage_size + 30]; // 12 for cha static bool isMqttServerAddressPrivate = false; +// meshtastic_ServiceEnvelope that automatically releases dynamically allocated memory when it goes out of scope. +struct DecodedServiceEnvelope : public meshtastic_ServiceEnvelope { + DecodedServiceEnvelope() = delete; + DecodedServiceEnvelope(const uint8_t *payload, size_t length) + : meshtastic_ServiceEnvelope(meshtastic_ServiceEnvelope_init_default), + validDecode(pb_decode_from_bytes(payload, length, &meshtastic_ServiceEnvelope_msg, this)) + { + } + ~DecodedServiceEnvelope() + { + if (validDecode) + pb_release(&meshtastic_ServiceEnvelope_msg, this); + } + // Clients must check that this is true before using. + const bool validDecode; +}; + +inline void onReceiveProto(char *topic, byte *payload, size_t length) +{ + const DecodedServiceEnvelope e(payload, length); + if (!e.validDecode || e.channel_id == NULL || e.gateway_id == NULL || e.packet == NULL) { + LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!", topic, length); + return; + } + const meshtastic_Channel &ch = channels.getByName(e.channel_id); + if (strcmp(e.gateway_id, owner.id) == 0) { + // Generate an implicit ACK towards ourselves (handled and processed only locally!) for this message. + // We do this because packets are not rebroadcasted back into MQTT anymore and we assume that at least one node + // receives it when we get our own packet back. Then we'll stop our retransmissions. + if (isFromUs(e.packet)) + routingModule->sendAckNak(meshtastic_Routing_Error_NONE, getFrom(e.packet), e.packet->id, ch.index); + else + LOG_INFO("Ignore downlink message we originally sent"); + return; + } + if (isFromUs(e.packet)) { + LOG_INFO("Ignore downlink message we originally sent"); + return; + } + + // Find channel by channel_id and check downlink_enabled + if (!(strcmp(e.channel_id, "PKI") == 0 || + (strcmp(e.channel_id, channels.getGlobalId(ch.index)) == 0 && ch.settings.downlink_enabled))) { + return; + } + LOG_INFO("Received MQTT topic %s, len=%u", topic, length); + + UniquePacketPoolPacket p = packetPool.allocUniqueCopy(*e.packet); + p->via_mqtt = true; // Mark that the packet was received via MQTT + + if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) { + if (moduleConfig.mqtt.encryption_enabled) { + LOG_INFO("Ignore decoded message on MQTT, encryption is enabled"); + return; + } + if (p->decoded.portnum == meshtastic_PortNum_ADMIN_APP) { + LOG_INFO("Ignore decoded admin packet"); + return; + } + p->channel = ch.index; + } + + // PKI messages get accepted even if we can't decrypt + if (router && p->which_payload_variant == meshtastic_MeshPacket_encrypted_tag && strcmp(e.channel_id, "PKI") == 0) { + const meshtastic_NodeInfoLite *tx = nodeDB->getMeshNode(getFrom(p.get())); + const meshtastic_NodeInfoLite *rx = nodeDB->getMeshNode(p->to); + // Only accept PKI messages to us, or if we have both the sender and receiver in our nodeDB, as then it's + // likely they discovered each other via a channel we have downlink enabled for + if (isToUs(p.get()) || (tx && tx->has_user && rx && rx->has_user)) + router->enqueueReceivedMessage(p.release()); + } else if (router && perhapsDecode(p.get())) // ignore messages if we don't have the channel key + router->enqueueReceivedMessage(p.release()); +} + +// returns true if this is a valid JSON envelope which we accept on downlink +inline bool isValidJsonEnvelope(JSONObject &json) +{ + // if "sender" is provided, avoid processing packets we uplinked + return (json.find("sender") != json.end() ? (json["sender"]->AsString().compare(owner.id) != 0) : true) && + (json.find("hopLimit") != json.end() ? json["hopLimit"]->IsNumber() : true) && // hop limit should be a number + (json.find("from") != json.end()) && json["from"]->IsNumber() && + (json["from"]->AsNumber() == nodeDB->getNodeNum()) && // only accept message if the "from" is us + (json.find("type") != json.end()) && json["type"]->IsString() && // should specify a type + (json.find("payload") != json.end()); // should have a payload +} + +inline void onReceiveJson(byte *payload, size_t length) +{ + char payloadStr[length + 1]; + memcpy(payloadStr, payload, length); + payloadStr[length] = 0; // null terminated string + std::unique_ptr json_value(JSON::Parse(payloadStr)); + if (json_value == nullptr) { + LOG_ERROR("JSON received payload on MQTT but not a valid JSON"); + return; + } + + JSONObject json; + json = json_value->AsObject(); + + if (!isValidJsonEnvelope(json)) { + LOG_ERROR("JSON received payload on MQTT but not a valid envelope"); + return; + } + + // this is a valid envelope + if (json["type"]->AsString().compare("sendtext") == 0 && json["payload"]->IsString()) { + std::string jsonPayloadStr = json["payload"]->AsString(); + LOG_INFO("JSON payload %s, length %u", jsonPayloadStr.c_str(), jsonPayloadStr.length()); + + // construct protobuf data packet using TEXT_MESSAGE, send it to the mesh + meshtastic_MeshPacket *p = router->allocForSending(); + p->decoded.portnum = meshtastic_PortNum_TEXT_MESSAGE_APP; + if (json.find("channel") != json.end() && json["channel"]->IsNumber() && + (json["channel"]->AsNumber() < channels.getNumChannels())) + p->channel = json["channel"]->AsNumber(); + if (json.find("to") != json.end() && json["to"]->IsNumber()) + p->to = json["to"]->AsNumber(); + if (json.find("hopLimit") != json.end() && json["hopLimit"]->IsNumber()) + p->hop_limit = json["hopLimit"]->AsNumber(); + if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) { + memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length()); + p->decoded.payload.size = jsonPayloadStr.length(); + service->sendToMesh(p, RX_SRC_LOCAL); + } else { + LOG_WARN("Received MQTT json payload too long, drop"); + } + } else if (json["type"]->AsString().compare("sendposition") == 0 && json["payload"]->IsObject()) { + // invent the "sendposition" type for a valid envelope + JSONObject posit; + posit = json["payload"]->AsObject(); // get nested JSON Position + meshtastic_Position pos = meshtastic_Position_init_default; + if (posit.find("latitude_i") != posit.end() && posit["latitude_i"]->IsNumber()) + pos.latitude_i = posit["latitude_i"]->AsNumber(); + if (posit.find("longitude_i") != posit.end() && posit["longitude_i"]->IsNumber()) + pos.longitude_i = posit["longitude_i"]->AsNumber(); + if (posit.find("altitude") != posit.end() && posit["altitude"]->IsNumber()) + pos.altitude = posit["altitude"]->AsNumber(); + if (posit.find("time") != posit.end() && posit["time"]->IsNumber()) + pos.time = posit["time"]->AsNumber(); + + // construct protobuf data packet using POSITION, send it to the mesh + meshtastic_MeshPacket *p = router->allocForSending(); + p->decoded.portnum = meshtastic_PortNum_POSITION_APP; + if (json.find("channel") != json.end() && json["channel"]->IsNumber() && + (json["channel"]->AsNumber() < channels.getNumChannels())) + p->channel = json["channel"]->AsNumber(); + if (json.find("to") != json.end() && json["to"]->IsNumber()) + p->to = json["to"]->AsNumber(); + if (json.find("hopLimit") != json.end() && json["hopLimit"]->IsNumber()) + p->hop_limit = json["hopLimit"]->AsNumber(); + p->decoded.payload.size = + pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &meshtastic_Position_msg, + &pos); // make the Data protobuf from position + service->sendToMesh(p, RX_SRC_LOCAL); + } else { + LOG_DEBUG("JSON ignore downlink message with unsupported type"); + } +} +} // namespace + void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { mqtt->onReceive(topic, payload, length); @@ -49,170 +213,30 @@ void MQTT::onClientProxyReceive(meshtastic_MqttClientProxyMessage msg) void MQTT::onReceive(char *topic, byte *payload, size_t length) { - meshtastic_ServiceEnvelope e = meshtastic_ServiceEnvelope_init_default; - - if (moduleConfig.mqtt.json_enabled && (strncmp(topic, jsonTopic.c_str(), jsonTopic.length()) == 0)) { - // check if this is a json payload message by comparing the topic start - char payloadStr[length + 1]; - memcpy(payloadStr, payload, length); - payloadStr[length] = 0; // null terminated string - JSONValue *json_value = JSON::Parse(payloadStr); - if (json_value != NULL) { - // check if it is a valid envelope - JSONObject json; - json = json_value->AsObject(); - - // parse the channel name from the topic string - // the topic has been checked above for having jsonTopic prefix, so just move past it - char *ptr = topic + jsonTopic.length(); - ptr = strtok(ptr, "/") ? strtok(ptr, "/") : ptr; // if another "/" was added, parse string up to that character - meshtastic_Channel sendChannel = channels.getByName(ptr); - // We allow downlink JSON packets only on a channel named "mqtt" - if (strncasecmp(channels.getGlobalId(sendChannel.index), Channels::mqttChannel, strlen(Channels::mqttChannel)) == 0 && - sendChannel.settings.downlink_enabled) { - if (isValidJsonEnvelope(json)) { - // this is a valid envelope - if (json["type"]->AsString().compare("sendtext") == 0 && json["payload"]->IsString()) { - std::string jsonPayloadStr = json["payload"]->AsString(); - LOG_INFO("JSON payload %s, length %u", jsonPayloadStr.c_str(), jsonPayloadStr.length()); - - // construct protobuf data packet using TEXT_MESSAGE, send it to the mesh - meshtastic_MeshPacket *p = router->allocForSending(); - p->decoded.portnum = meshtastic_PortNum_TEXT_MESSAGE_APP; - if (json.find("channel") != json.end() && json["channel"]->IsNumber() && - (json["channel"]->AsNumber() < channels.getNumChannels())) - p->channel = json["channel"]->AsNumber(); - if (json.find("to") != json.end() && json["to"]->IsNumber()) - p->to = json["to"]->AsNumber(); - if (json.find("hopLimit") != json.end() && json["hopLimit"]->IsNumber()) - p->hop_limit = json["hopLimit"]->AsNumber(); - if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) { - memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length()); - p->decoded.payload.size = jsonPayloadStr.length(); - service->sendToMesh(p, RX_SRC_LOCAL); - } else { - LOG_WARN("Received MQTT json payload too long, drop"); - } - } else if (json["type"]->AsString().compare("sendposition") == 0 && json["payload"]->IsObject()) { - // invent the "sendposition" type for a valid envelope - JSONObject posit; - posit = json["payload"]->AsObject(); // get nested JSON Position - meshtastic_Position pos = meshtastic_Position_init_default; - if (posit.find("latitude_i") != posit.end() && posit["latitude_i"]->IsNumber()) - pos.latitude_i = posit["latitude_i"]->AsNumber(); - if (posit.find("longitude_i") != posit.end() && posit["longitude_i"]->IsNumber()) - pos.longitude_i = posit["longitude_i"]->AsNumber(); - if (posit.find("altitude") != posit.end() && posit["altitude"]->IsNumber()) - pos.altitude = posit["altitude"]->AsNumber(); - if (posit.find("time") != posit.end() && posit["time"]->IsNumber()) - pos.time = posit["time"]->AsNumber(); - - // construct protobuf data packet using POSITION, send it to the mesh - meshtastic_MeshPacket *p = router->allocForSending(); - p->decoded.portnum = meshtastic_PortNum_POSITION_APP; - if (json.find("channel") != json.end() && json["channel"]->IsNumber() && - (json["channel"]->AsNumber() < channels.getNumChannels())) - p->channel = json["channel"]->AsNumber(); - if (json.find("to") != json.end() && json["to"]->IsNumber()) - p->to = json["to"]->AsNumber(); - if (json.find("hopLimit") != json.end() && json["hopLimit"]->IsNumber()) - p->hop_limit = json["hopLimit"]->AsNumber(); - p->decoded.payload.size = - pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), - &meshtastic_Position_msg, &pos); // make the Data protobuf from position - service->sendToMesh(p, RX_SRC_LOCAL); - } else { - LOG_DEBUG("JSON ignore downlink message with unsupported type"); - } - } else { - LOG_ERROR("JSON received payload on MQTT but not a valid envelope"); - } - } else { - LOG_WARN("JSON downlink received on channel not called 'mqtt' or without downlink enabled"); - } - } else { - // no json, this is an invalid payload - LOG_ERROR("JSON received payload on MQTT but not a valid JSON"); - } - delete json_value; - } else { - if (length == 0) { - LOG_WARN("Empty MQTT payload received, topic %s!", topic); - return; - } else if (!pb_decode_from_bytes(payload, length, &meshtastic_ServiceEnvelope_msg, &e)) { - LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!", topic, length); - return; - } else { - if (e.channel_id == NULL || e.gateway_id == NULL) { - LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!", topic, length); - return; - } - meshtastic_Channel ch = channels.getByName(e.channel_id); - if (strcmp(e.gateway_id, owner.id) == 0) { - // Generate an implicit ACK towards ourselves (handled and processed only locally!) for this message. - // We do this because packets are not rebroadcasted back into MQTT anymore and we assume that at least one node - // receives it when we get our own packet back. Then we'll stop our retransmissions. - if (e.packet && isFromUs(e.packet)) - routingModule->sendAckNak(meshtastic_Routing_Error_NONE, getFrom(e.packet), e.packet->id, ch.index); - else - LOG_INFO("Ignore downlink message we originally sent"); - } else { - // Find channel by channel_id and check downlink_enabled - if ((strcmp(e.channel_id, "PKI") == 0 && e.packet) || - (strcmp(e.channel_id, channels.getGlobalId(ch.index)) == 0 && e.packet && ch.settings.downlink_enabled)) { - LOG_INFO("Received MQTT topic %s, len=%u", topic, length); - meshtastic_MeshPacket *p = packetPool.allocCopy(*e.packet); - p->via_mqtt = true; // Mark that the packet was received via MQTT - - if (isFromUs(p)) { - LOG_INFO("Ignore downlink message we originally sent"); - packetPool.release(p); - free(e.channel_id); - free(e.gateway_id); - free(e.packet); - return; - } - if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) { - if (moduleConfig.mqtt.encryption_enabled) { - LOG_INFO("Ignore decoded message on MQTT, encryption is enabled"); - packetPool.release(p); - free(e.channel_id); - free(e.gateway_id); - free(e.packet); - return; - } - if (p->decoded.portnum == meshtastic_PortNum_ADMIN_APP) { - LOG_INFO("Ignore decoded admin packet"); - packetPool.release(p); - free(e.channel_id); - free(e.gateway_id); - free(e.packet); - return; - } - p->channel = ch.index; - } - - // PKI messages get accepted even if we can't decrypt - if (router && p->which_payload_variant == meshtastic_MeshPacket_encrypted_tag && - strcmp(e.channel_id, "PKI") == 0) { - const meshtastic_NodeInfoLite *tx = nodeDB->getMeshNode(getFrom(p)); - const meshtastic_NodeInfoLite *rx = nodeDB->getMeshNode(p->to); - // Only accept PKI messages to us, or if we have both the sender and receiver in our nodeDB, as then it's - // likely they discovered each other via a channel we have downlink enabled for - if (isToUs(p) || (tx && tx->has_user && rx && rx->has_user)) - router->enqueueReceivedMessage(p); - } else if (router && perhapsDecode(p)) // ignore messages if we don't have the channel key - router->enqueueReceivedMessage(p); - else - packetPool.release(p); - } - } - } - // make sure to free both strings and the MeshPacket (passing in NULL is acceptable) - free(e.channel_id); - free(e.gateway_id); - free(e.packet); + if (length == 0) { + LOG_WARN("Empty MQTT payload received, topic %s!", topic); + return; } + + // check if this is a json payload message by comparing the topic start + if (moduleConfig.mqtt.json_enabled && (strncmp(topic, jsonTopic.c_str(), jsonTopic.length()) == 0)) { + // parse the channel name from the topic string + // the topic has been checked above for having jsonTopic prefix, so just move past it + char *channelName = topic + jsonTopic.length(); + // if another "/" was added, parse string up to that character + channelName = strtok(channelName, "/") ? strtok(channelName, "/") : channelName; + // We allow downlink JSON packets only on a channel named "mqtt" + meshtastic_Channel &sendChannel = channels.getByName(channelName); + if (!(strncasecmp(channels.getGlobalId(sendChannel.index), Channels::mqttChannel, strlen(Channels::mqttChannel)) == 0 && + sendChannel.settings.downlink_enabled)) { + LOG_WARN("JSON downlink received on channel not called 'mqtt' or without downlink enabled"); + return; + } + onReceiveJson(payload, length); + return; + } + + onReceiveProto(topic, payload, length); } void mqttInit() @@ -705,17 +729,6 @@ void MQTT::perhapsReportToMap() } } -bool MQTT::isValidJsonEnvelope(JSONObject &json) -{ - // if "sender" is provided, avoid processing packets we uplinked - return (json.find("sender") != json.end() ? (json["sender"]->AsString().compare(owner.id) != 0) : true) && - (json.find("hopLimit") != json.end() ? json["hopLimit"]->IsNumber() : true) && // hop limit should be a number - (json.find("from") != json.end()) && json["from"]->IsNumber() && - (json["from"]->AsNumber() == nodeDB->getNodeNum()) && // only accept message if the "from" is us - (json.find("type") != json.end()) && json["type"]->IsString() && // should specify a type - (json.find("payload") != json.end()); // should have a payload -} - bool MQTT::isPrivateIpAddress(const char address[]) { // Min. length like 10.0.0.0 (8), max like 192.168.255.255:65535 (21) diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index 7e0378238..dc82c1a74 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -117,9 +117,6 @@ class MQTT : private concurrency::OSThread // Check if we should report unencrypted information about our node for consumption by a map void perhapsReportToMap(); - // returns true if this is a valid JSON envelope which we accept on downlink - bool isValidJsonEnvelope(JSONObject &json); - /// Determines if the given address is a private IPv4 address, i.e. not routable on the public internet. /// These are the ranges: 127.0.0.1, 10.0.0.0-10.255.255.255, 172.16.0.0-172.31.255.255, 192.168.0.0-192.168.255.255. bool isPrivateIpAddress(const char address[]);