From d9f0dc7ea41766842cfd8204b1fdedd139390c74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20G=C3=B6ttgens?= Date: Wed, 4 Jan 2023 14:41:58 +0100 Subject: [PATCH] add MQTT outqueue back in (still broken) --- src/mqtt/MQTT.cpp | 61 ++++++++++++++++++++++++++++++++++++++++------- src/mqtt/MQTT.h | 5 +++- 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index 49ad586dd..f4004139a 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -21,6 +21,10 @@ String statusTopic = "msh/2/stat/"; String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID +static MemoryDynamic staticMqttPool; + +Allocator &mqttPool = staticMqttPool; + void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { mqtt->onPublish(topic, payload, length); @@ -122,7 +126,7 @@ void mqttInit() new MQTT(); } -MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient) +MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE) { if(moduleConfig.mqtt.enabled) { @@ -249,9 +253,36 @@ int32_t MQTT::runOnce() if (!pubSub.loop()) { if (wantConnection) { reconnect(); - - // If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) - return pubSub.connected() ? 20 : 30000; + + // If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) + if (pubSub.connected()) { + if (!mqttQueue.isEmpty()) { + // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets + ServiceEnvelope *env = mqttQueue.dequeuePtr(0); + static uint8_t bytes[MeshPacket_size + 64]; + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env); + + String topic = cryptTopic + env->channel_id + "/" + owner.id; + LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes); + + + pubSub.publish(topic.c_str(), bytes, numBytes, false); + + if (moduleConfig.mqtt.json_enabled) { + // handle json topic + auto jsonString = this->downstreamPacketToJson(env->packet); + if (jsonString.length() != 0) { + String topicJson = jsonTopic + env->channel_id + "/" + owner.id; + LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); + pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); + } + } + mqttPool.release(env); + } + return 20; + } else { + return 30000; + } } else return 5000; // If we don't want connection now, check again in 5 secs } else { @@ -273,17 +304,17 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) if (ch.settings.uplink_enabled) { const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel - ServiceEnvelope env = ServiceEnvelope_init_default; - env.channel_id = (char *)channelId; - env.gateway_id = owner.id; - env.packet = (MeshPacket *)∓ + ServiceEnvelope *env = mqttPool.allocZeroed(); + env->channel_id = (char *)channelId; + env->gateway_id = owner.id; + env->packet = (MeshPacket *)∓ // don't bother sending if not connected... if (pubSub.connected()) { // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets static uint8_t bytes[MeshPacket_size + 64]; - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, &env); + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env); String topic = cryptTopic + channelId + "/" + owner.id; LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes); @@ -299,7 +330,19 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); } } + } else { + LOG_INFO("MQTT not connected, queueing packet\n"); + if (mqttQueue.numFree() == 0) { + LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n"); + ServiceEnvelope *d = mqttQueue.dequeuePtr(0); + if (d) + mqttPool.release(d); + } + // make a copy of serviceEnvelope and queue it + ServiceEnvelope *copied = mqttPool.allocCopy(*env); + assert(mqttQueue.enqueue(copied, 0)); } + mqttPool.release(env); } } diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index 33fbbb8eb..ddbacbcc4 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -13,6 +13,8 @@ #include #endif +#define MAX_MQTT_QUEUE 32 + /** * Our wrapper/singleton for sending/receiving MQTT "udp" packets. This object isolates the MQTT protocol implementation from * the two components that use it: MQTTPlugin and MQTTSimInterface. @@ -53,9 +55,10 @@ class MQTT : private concurrency::OSThread bool connected(); protected: + PointerQueue mqttQueue; int reconnectCount = 0; - + virtual int32_t runOnce() override; private: