diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index f4004139a..f72271324 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -21,10 +21,6 @@ String statusTopic = "msh/2/stat/"; String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID -static MemoryDynamic staticMqttPool; - -Allocator &mqttPool = staticMqttPool; - void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { mqtt->onPublish(topic, payload, length); @@ -126,7 +122,7 @@ void mqttInit() new MQTT(); } -MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE) +MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient) { if(moduleConfig.mqtt.enabled) { @@ -253,36 +249,9 @@ int32_t MQTT::runOnce() if (!pubSub.loop()) { if (wantConnection) { reconnect(); - - // If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) - if (pubSub.connected()) { - if (!mqttQueue.isEmpty()) { - // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets - ServiceEnvelope *env = mqttQueue.dequeuePtr(0); - static uint8_t bytes[MeshPacket_size + 64]; - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env); - - String topic = cryptTopic + env->channel_id + "/" + owner.id; - LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes); - - - pubSub.publish(topic.c_str(), bytes, numBytes, false); - - if (moduleConfig.mqtt.json_enabled) { - // handle json topic - auto jsonString = this->downstreamPacketToJson(env->packet); - if (jsonString.length() != 0) { - String topicJson = jsonTopic + env->channel_id + "/" + owner.id; - LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); - pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); - } - } - mqttPool.release(env); - } - return 20; - } else { - return 30000; - } + + // If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) + return pubSub.connected() ? 20 : 30000; } else return 5000; // If we don't want connection now, check again in 5 secs } else { @@ -304,7 +273,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) if (ch.settings.uplink_enabled) { const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel - ServiceEnvelope *env = mqttPool.allocZeroed(); + ServiceEnvelope env = ServiceEnvelope_init_default; env->channel_id = (char *)channelId; env->gateway_id = owner.id; env->packet = (MeshPacket *)∓ @@ -314,7 +283,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets static uint8_t bytes[MeshPacket_size + 64]; - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env); + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, &env); String topic = cryptTopic + channelId + "/" + owner.id; LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes); @@ -330,19 +299,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); } } - } else { - LOG_INFO("MQTT not connected, queueing packet\n"); - if (mqttQueue.numFree() == 0) { - LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n"); - ServiceEnvelope *d = mqttQueue.dequeuePtr(0); - if (d) - mqttPool.release(d); - } - // make a copy of serviceEnvelope and queue it - ServiceEnvelope *copied = mqttPool.allocCopy(*env); - assert(mqttQueue.enqueue(copied, 0)); } - mqttPool.release(env); } }