From 30ae4c2a38abb376a2bf61acc5184ec5cff5176f Mon Sep 17 00:00:00 2001 From: pavelb-techspark <107929268+pavelb-techspark@users.noreply.github.com> Date: Wed, 4 Jan 2023 14:56:52 +0200 Subject: [PATCH] Add QueueStatus sending to the firmware (#1820) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Yank mqtt service envelope queue * trybuildfix mqtt system * removed too much * no excessive heap debugging on release builds * send QueueStatus messages The QueueStatus message is sent as a response to the attempt to queue an outgoing MeshPacket and contains statuses of the last queue attempt, TX Queue space and capacity and MeshPacket.id that was queued. When TX Queue changes status from completely full to at least a single slot free a QueueStatus message is also sent to notify that user can queue more messages. Signed-off-by: Pavel Boldin * WIP: update protobufs Signed-off-by: Pavel Boldin * update protobufs * regen protos Signed-off-by: Pavel Boldin Co-authored-by: Ben Meadors Co-authored-by: Thomas Göttgens Co-authored-by: Sacha Weatherstone --- arch/esp32/esp32.ini | 1 - arch/esp32/esp32s2.ini | 1 - arch/esp32/esp32s3.ini | 1 - protobufs | 2 +- src/mesh/MeshPacketQueue.h | 6 +++ src/mesh/MeshService.cpp | 45 +++++++++++++++++++- src/mesh/MeshService.h | 16 +++++++ src/mesh/PhoneAPI.cpp | 26 ++++++++++-- src/mesh/PhoneAPI.h | 5 +++ src/mesh/RadioInterface.h | 7 +++ src/mesh/RadioLibInterface.cpp | 11 +++++ src/mesh/RadioLibInterface.h | 2 + src/mesh/Router.cpp | 5 +++ src/mesh/Router.h | 3 ++ src/mesh/generated/mesh.pb.c | 3 ++ src/mesh/generated/mesh.pb.h | 36 +++++++++++++++- src/mesh/generated/module_config.pb.h | 1 + src/mqtt/MQTT.cpp | 61 ++++----------------------- src/mqtt/MQTT.h | 5 +-- src/platform/portduino/SimRadio.cpp | 10 +++++ src/platform/portduino/SimRadio.h | 3 ++ 21 files changed, 184 insertions(+), 66 deletions(-) diff --git a/arch/esp32/esp32.ini b/arch/esp32/esp32.ini index 0011cc39f..70654e8ec 100644 --- a/arch/esp32/esp32.ini +++ b/arch/esp32/esp32.ini @@ -26,7 +26,6 @@ build_flags = -DCONFIG_NIMBLE_CPP_LOG_LEVEL=2 -DCONFIG_BT_NIMBLE_MAX_CCCDS=20 -DESP_OPENSSL_SUPPRESS_LEGACY_WARNING - -DDEBUG_HEAP lib_deps = ${arduino_base.lib_deps} diff --git a/arch/esp32/esp32s2.ini b/arch/esp32/esp32s2.ini index ca4f576d6..cd47c4ca1 100644 --- a/arch/esp32/esp32s2.ini +++ b/arch/esp32/esp32s2.ini @@ -27,7 +27,6 @@ build_flags = -DCONFIG_BT_NIMBLE_MAX_CCCDS=20 -DESP_OPENSSL_SUPPRESS_LEGACY_WARNING -DHAS_BLUETOOTH=0 - -DDEBUG_HEAP lib_deps = ${arduino_base.lib_deps} diff --git a/arch/esp32/esp32s3.ini b/arch/esp32/esp32s3.ini index b276ceff9..ce64fdbe2 100644 --- a/arch/esp32/esp32s3.ini +++ b/arch/esp32/esp32s3.ini @@ -26,7 +26,6 @@ build_flags = -DCONFIG_NIMBLE_CPP_LOG_LEVEL=2 -DCONFIG_BT_NIMBLE_MAX_CCCDS=20 -DESP_OPENSSL_SUPPRESS_LEGACY_WARNING - -DDEBUG_HEAP lib_deps = ${arduino_base.lib_deps} diff --git a/protobufs b/protobufs index 6048ecd2f..3b0d871ca 160000 --- a/protobufs +++ b/protobufs @@ -1 +1 @@ -Subproject commit 6048ecd2f8433005bb73ab4505386a7bbdb78c86 +Subproject commit 3b0d871ca1e0f8a2ed823f0696e2d7cf31ed2ebd diff --git a/src/mesh/MeshPacketQueue.h b/src/mesh/MeshPacketQueue.h index 8c93b452e..ee6c6954a 100644 --- a/src/mesh/MeshPacketQueue.h +++ b/src/mesh/MeshPacketQueue.h @@ -26,6 +26,12 @@ class MeshPacketQueue /** return true if the queue is empty */ bool empty(); + /** return amount of free packets in Queue */ + size_t getFree() { return maxLen - queue.size(); } + + /** return total size of the Queue */ + size_t getMaxLen() { return maxLen; } + MeshPacket *dequeue(); MeshPacket *getFront(); diff --git a/src/mesh/MeshService.cpp b/src/mesh/MeshService.cpp index c300104a4..208b1b9ea 100644 --- a/src/mesh/MeshService.cpp +++ b/src/mesh/MeshService.cpp @@ -52,10 +52,15 @@ FIXME in the initial proof of concept we just skip the entire want/deny flow and MeshService service; +static MemoryDynamic staticQueueStatusPool; + +Allocator &queueStatusPool = staticQueueStatusPool; + #include "Router.h" -MeshService::MeshService() : toPhoneQueue(MAX_RX_TOPHONE) +MeshService::MeshService() : toPhoneQueue(MAX_RX_TOPHONE), toPhoneQueueStatusQueue(MAX_RX_TOPHONE) { + lastQueueStatus = { 0, 0, 16, 0 }; // assert(MAX_RX_TOPHONE == 32); // FIXME, delete this, just checking my clever macro } @@ -83,6 +88,11 @@ int MeshService::handleFromRadio(const MeshPacket *mp) /// Do idle processing (mostly processing messages which have been queued from the radio) void MeshService::loop() { + if (lastQueueStatus.free == 0) { // check if there is now free space in TX queue + QueueStatus qs = router->getQueueStatus(); + if (qs.free != lastQueueStatus.free) + (void)sendQueueStatusToPhone(qs, 0, 0); + } if (oldFromNum != fromNum) { // We don't want to generate extra notifies for multiple new packets fromNumChanged.notifyObservers(fromNum); oldFromNum = fromNum; @@ -179,12 +189,43 @@ bool MeshService::cancelSending(PacketId id) return router->cancelSending(nodeDB.getNodeNum(), id); } +ErrorCode MeshService::sendQueueStatusToPhone(const QueueStatus &qs, ErrorCode res, uint32_t mesh_packet_id) +{ + QueueStatus *copied = queueStatusPool.allocCopy(qs); + + copied->res = res; + copied->mesh_packet_id = mesh_packet_id; + + if (toPhoneQueueStatusQueue.numFree() == 0) { + LOG_DEBUG("NOTE: tophone queue status queue is full, discarding oldest\n"); + QueueStatus *d = toPhoneQueueStatusQueue.dequeuePtr(0); + if (d) + releaseQueueStatusToPool(d); + } + + lastQueueStatus = *copied; + + res = toPhoneQueueStatusQueue.enqueue(copied, 0); + fromNum++; + + return res ? ERRNO_OK : ERRNO_UNKNOWN; +} + void MeshService::sendToMesh(MeshPacket *p, RxSource src, bool ccToPhone) { + uint32_t mesh_packet_id = p->id; nodeDB.updateFrom(*p); // update our local DB for this packet (because phone might have sent position packets etc...) // Note: We might return !OK if our fifo was full, at that point the only option we have is to drop it - router->sendLocal(p, src); + ErrorCode res = router->sendLocal(p, src); + + /* NOTE(pboldin): Prepare and send QueueStatus message to the phone as a + * high-priority message. */ + QueueStatus qs = router->getQueueStatus(); + ErrorCode r = sendQueueStatusToPhone(qs, res, mesh_packet_id); + if (r != ERRNO_OK) { + LOG_DEBUG("Can't send status to phone"); + } if (ccToPhone) { sendToPhone(p); diff --git a/src/mesh/MeshService.h b/src/mesh/MeshService.h index c3b3c95d9..31b63bd3f 100644 --- a/src/mesh/MeshService.h +++ b/src/mesh/MeshService.h @@ -14,6 +14,8 @@ #include "../platform/portduino/SimRadio.h" #endif +extern Allocator &queueStatusPool; + /** * Top level app for this service. keeps the mesh, the radio config and the queue of received packets. * @@ -29,6 +31,12 @@ class MeshService /// FIXME - save this to flash on deep sleep PointerQueue toPhoneQueue; + // keep list of QueueStatus packets to be send to the phone + PointerQueue toPhoneQueueStatusQueue; + + // This holds the last QueueStatus send + QueueStatus lastQueueStatus; + /// The current nonce for the newest packet which has been queued for the phone uint32_t fromNum = 0; @@ -56,6 +64,12 @@ class MeshService /// Allows the bluetooth handler to free packets after they have been sent void releaseToPool(MeshPacket *p) { packetPool.release(p); } + /// Return the next QueueStatus packet destined to the phone. + QueueStatus *getQueueStatusForPhone() { return toPhoneQueueStatusQueue.dequeuePtr(0); } + + // Release QueueStatus packet to pool + void releaseQueueStatusToPool(QueueStatus *p) { queueStatusPool.release(p); } + /** * Given a ToRadio buffer parse it and properly handle it (setup radio, owner or send packet into the mesh) * Called by PhoneAPI.handleToRadio. Note: p is a scratch buffer, this function is allowed to write to it but it can not keep @@ -100,6 +114,8 @@ class MeshService /// needs to keep the packet around it makes a copy int handleFromRadio(const MeshPacket *p); friend class RoutingModule; + + ErrorCode sendQueueStatusToPhone(const QueueStatus &qs, ErrorCode res, uint32_t mesh_packet_id); }; extern MeshService service; diff --git a/src/mesh/PhoneAPI.cpp b/src/mesh/PhoneAPI.cpp index 897ddfe8a..c1d56b7ec 100644 --- a/src/mesh/PhoneAPI.cpp +++ b/src/mesh/PhoneAPI.cpp @@ -50,6 +50,7 @@ void PhoneAPI::close() unobserve(&service.fromNumChanged); releasePhonePacket(); // Don't leak phone packets on shutdown + releaseQueueStatusPhonePacket(); onConnectionChanged(false); } @@ -282,14 +283,19 @@ size_t PhoneAPI::getFromRadio(uint8_t *buf) case STATE_SEND_PACKETS: // Do we have a message from the mesh? LOG_INFO("getFromRadio=STATE_SEND_PACKETS\n"); - if (packetForPhone) { + if (queueStatusPacketForPhone) { + + fromRadioScratch.which_payload_variant = FromRadio_queueStatus_tag; + fromRadioScratch.queueStatus = *queueStatusPacketForPhone; + releaseQueueStatusPhonePacket(); + } else if (packetForPhone) { printPacket("phone downloaded packet", packetForPhone); // Encapsulate as a FromRadio packet fromRadioScratch.which_payload_variant = FromRadio_packet_tag; fromRadioScratch.packet = *packetForPhone; + releasePhonePacket(); } - releasePhonePacket(); break; default: @@ -322,6 +328,14 @@ void PhoneAPI::releasePhonePacket() } } +void PhoneAPI::releaseQueueStatusPhonePacket() +{ + if (queueStatusPacketForPhone) { + service.releaseQueueStatusToPool(queueStatusPacketForPhone); + queueStatusPacketForPhone = NULL; + } +} + /** * Return true if we have data available to send to the phone */ @@ -342,9 +356,15 @@ bool PhoneAPI::available() return true; // Always say we have something, because we might need to advance our state machine case STATE_SEND_PACKETS: { + if (!queueStatusPacketForPhone) + queueStatusPacketForPhone = service.getQueueStatusForPhone(); + bool hasPacket = !!queueStatusPacketForPhone; + if (hasPacket) + return true; + if (!packetForPhone) packetForPhone = service.getForPhone(); - bool hasPacket = !!packetForPhone; + hasPacket = !!packetForPhone; // LOG_DEBUG("available hasPacket=%d\n", hasPacket); return hasPacket; } diff --git a/src/mesh/PhoneAPI.h b/src/mesh/PhoneAPI.h index cbac5f688..2f2695807 100644 --- a/src/mesh/PhoneAPI.h +++ b/src/mesh/PhoneAPI.h @@ -42,6 +42,9 @@ class PhoneAPI : public Observer // FIXME, we shouldn't be inheriting /// downloads it MeshPacket *packetForPhone = NULL; + // Keep QueueStatus packet just as packetForPhone + QueueStatus *queueStatusPacketForPhone = NULL; + /// We temporarily keep the nodeInfo here between the call to available and getFromRadio const NodeInfo *nodeInfoForPhone = NULL; @@ -115,6 +118,8 @@ class PhoneAPI : public Observer // FIXME, we shouldn't be inheriting private: void releasePhonePacket(); + void releaseQueueStatusPhonePacket(); + /// begin a new connection void handleStartConfig(); diff --git a/src/mesh/RadioInterface.h b/src/mesh/RadioInterface.h index f50c0ae77..59c63add2 100644 --- a/src/mesh/RadioInterface.h +++ b/src/mesh/RadioInterface.h @@ -115,6 +115,13 @@ class RadioInterface */ virtual ErrorCode send(MeshPacket *p) = 0; + /** Return TX queue status */ + virtual QueueStatus getQueueStatus() { + QueueStatus qs; + qs.res = qs.mesh_packet_id = qs.free = qs.maxlen = 0; + return qs; + } + /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ virtual bool cancelSending(NodeNum from, PacketId id) { return false; } diff --git a/src/mesh/RadioLibInterface.cpp b/src/mesh/RadioLibInterface.cpp index 53eaeca60..81a4d803e 100644 --- a/src/mesh/RadioLibInterface.cpp +++ b/src/mesh/RadioLibInterface.cpp @@ -158,6 +158,17 @@ ErrorCode RadioLibInterface::send(MeshPacket *p) #endif } +QueueStatus RadioLibInterface::getQueueStatus() +{ + QueueStatus qs; + + qs.res = qs.mesh_packet_id = 0; + qs.free = txQueue.getFree(); + qs.maxlen = txQueue.getMaxLen(); + + return qs; +} + bool RadioLibInterface::canSleep() { bool res = txQueue.empty(); diff --git a/src/mesh/RadioLibInterface.h b/src/mesh/RadioLibInterface.h index 16495c2f4..628ea863f 100644 --- a/src/mesh/RadioLibInterface.h +++ b/src/mesh/RadioLibInterface.h @@ -153,6 +153,8 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified */ virtual void startSend(MeshPacket *txp); + QueueStatus getQueueStatus(); + protected: /** Do any hardware setup needed on entry into send configuration for the radio. Subclasses can customize */ diff --git a/src/mesh/Router.cpp b/src/mesh/Router.cpp index 0a8497463..0f34010ec 100644 --- a/src/mesh/Router.cpp +++ b/src/mesh/Router.cpp @@ -148,6 +148,11 @@ void Router::setReceivedMessage() runASAP = true; } +QueueStatus Router::getQueueStatus() +{ + return iface->getQueueStatus(); +} + ErrorCode Router::sendLocal(MeshPacket *p, RxSource src) { // No need to deliver externally if the destination is the local node diff --git a/src/mesh/Router.h b/src/mesh/Router.h index f7748bb2b..5a9cc0702 100644 --- a/src/mesh/Router.h +++ b/src/mesh/Router.h @@ -55,6 +55,9 @@ class Router : protected concurrency::OSThread */ MeshPacket *allocForSending(); + /** Return Underlying interface's TX queue status */ + QueueStatus getQueueStatus(); + /** * @return our local nodenum */ NodeNum getNodeNum(); diff --git a/src/mesh/generated/mesh.pb.c b/src/mesh/generated/mesh.pb.c index bd749b6cb..29594c612 100644 --- a/src/mesh/generated/mesh.pb.c +++ b/src/mesh/generated/mesh.pb.c @@ -36,6 +36,9 @@ PB_BIND(MyNodeInfo, MyNodeInfo, AUTO) PB_BIND(LogRecord, LogRecord, AUTO) +PB_BIND(QueueStatus, QueueStatus, AUTO) + + PB_BIND(FromRadio, FromRadio, 2) diff --git a/src/mesh/generated/mesh.pb.h b/src/mesh/generated/mesh.pb.h index 31e7817a0..d1f3e5bf5 100644 --- a/src/mesh/generated/mesh.pb.h +++ b/src/mesh/generated/mesh.pb.h @@ -621,6 +621,17 @@ typedef struct _LogRecord { LogRecord_Level level; } LogRecord; +typedef struct _QueueStatus { + /* Last attempt to queue status, ErrorCode */ + int8_t res; + /* Free entries in the outgoing queue */ + uint8_t free; + /* Maximum entries in the outgoing queue */ + uint8_t maxlen; + /* What was mesh packet id that generated this response? */ + uint32_t mesh_packet_id; +} QueueStatus; + /* Packets from the radio to the phone will appear on the fromRadio characteristic. It will support READ and NOTIFY. When a new packet arrives the device will BLE notify? It will sit in that descriptor until consumed by the phone, @@ -657,6 +668,8 @@ typedef struct _FromRadio { ModuleConfig moduleConfig; /* One packet is sent for each channel */ Channel channel; + /* Queue status info */ + QueueStatus queueStatus; }; } FromRadio; @@ -755,6 +768,7 @@ extern "C" { + #define Compressed_portnum_ENUMTYPE PortNum @@ -769,6 +783,7 @@ extern "C" { #define NodeInfo_init_default {0, false, User_init_default, false, Position_init_default, 0, 0, false, DeviceMetrics_init_default} #define MyNodeInfo_init_default {0, 0, 0, "", _CriticalErrorCode_MIN, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, 0, 0} #define LogRecord_init_default {"", 0, "", _LogRecord_Level_MIN} +#define QueueStatus_init_default {0, 0, 0, 0} #define FromRadio_init_default {0, 0, {MeshPacket_init_default}} #define ToRadio_init_default {0, {MeshPacket_init_default}} #define Compressed_init_default {_PortNum_MIN, {0, {0}}} @@ -782,6 +797,7 @@ extern "C" { #define NodeInfo_init_zero {0, false, User_init_zero, false, Position_init_zero, 0, 0, false, DeviceMetrics_init_zero} #define MyNodeInfo_init_zero {0, 0, 0, "", _CriticalErrorCode_MIN, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, 0, 0} #define LogRecord_init_zero {"", 0, "", _LogRecord_Level_MIN} +#define QueueStatus_init_zero {0, 0, 0, 0} #define FromRadio_init_zero {0, 0, {MeshPacket_init_zero}} #define ToRadio_init_zero {0, {MeshPacket_init_zero}} #define Compressed_init_zero {_PortNum_MIN, {0, {0}}} @@ -873,6 +889,10 @@ extern "C" { #define LogRecord_time_tag 2 #define LogRecord_source_tag 3 #define LogRecord_level_tag 4 +#define QueueStatus_res_tag 1 +#define QueueStatus_free_tag 2 +#define QueueStatus_maxlen_tag 3 +#define QueueStatus_mesh_packet_id_tag 4 #define FromRadio_id_tag 1 #define FromRadio_packet_tag 2 #define FromRadio_my_info_tag 3 @@ -883,6 +903,7 @@ extern "C" { #define FromRadio_rebooted_tag 8 #define FromRadio_moduleConfig_tag 9 #define FromRadio_channel_tag 10 +#define FromRadio_queueStatus_tag 11 #define ToRadio_packet_tag 1 #define ToRadio_want_config_id_tag 3 #define ToRadio_disconnect_tag 4 @@ -1022,6 +1043,14 @@ X(a, STATIC, SINGULAR, UENUM, level, 4) #define LogRecord_CALLBACK NULL #define LogRecord_DEFAULT NULL +#define QueueStatus_FIELDLIST(X, a) \ +X(a, STATIC, SINGULAR, INT32, res, 1) \ +X(a, STATIC, SINGULAR, UINT32, free, 2) \ +X(a, STATIC, SINGULAR, UINT32, maxlen, 3) \ +X(a, STATIC, SINGULAR, UINT32, mesh_packet_id, 4) +#define QueueStatus_CALLBACK NULL +#define QueueStatus_DEFAULT NULL + #define FromRadio_FIELDLIST(X, a) \ X(a, STATIC, SINGULAR, UINT32, id, 1) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,packet,packet), 2) \ @@ -1032,7 +1061,8 @@ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,log_record,log_record), 6) X(a, STATIC, ONEOF, UINT32, (payload_variant,config_complete_id,config_complete_id), 7) \ X(a, STATIC, ONEOF, BOOL, (payload_variant,rebooted,rebooted), 8) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,moduleConfig,moduleConfig), 9) \ -X(a, STATIC, ONEOF, MESSAGE, (payload_variant,channel,channel), 10) +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,channel,channel), 10) \ +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,queueStatus,queueStatus), 11) #define FromRadio_CALLBACK NULL #define FromRadio_DEFAULT NULL #define FromRadio_payload_variant_packet_MSGTYPE MeshPacket @@ -1042,6 +1072,7 @@ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,channel,channel), 10) #define FromRadio_payload_variant_log_record_MSGTYPE LogRecord #define FromRadio_payload_variant_moduleConfig_MSGTYPE ModuleConfig #define FromRadio_payload_variant_channel_MSGTYPE Channel +#define FromRadio_payload_variant_queueStatus_MSGTYPE QueueStatus #define ToRadio_FIELDLIST(X, a) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,packet,packet), 1) \ @@ -1067,6 +1098,7 @@ extern const pb_msgdesc_t MeshPacket_msg; extern const pb_msgdesc_t NodeInfo_msg; extern const pb_msgdesc_t MyNodeInfo_msg; extern const pb_msgdesc_t LogRecord_msg; +extern const pb_msgdesc_t QueueStatus_msg; extern const pb_msgdesc_t FromRadio_msg; extern const pb_msgdesc_t ToRadio_msg; extern const pb_msgdesc_t Compressed_msg; @@ -1082,6 +1114,7 @@ extern const pb_msgdesc_t Compressed_msg; #define NodeInfo_fields &NodeInfo_msg #define MyNodeInfo_fields &MyNodeInfo_msg #define LogRecord_fields &LogRecord_msg +#define QueueStatus_fields &QueueStatus_msg #define FromRadio_fields &FromRadio_msg #define ToRadio_fields &ToRadio_msg #define Compressed_fields &Compressed_msg @@ -1095,6 +1128,7 @@ extern const pb_msgdesc_t Compressed_msg; #define MyNodeInfo_size 179 #define NodeInfo_size 258 #define Position_size 137 +#define QueueStatus_size 23 #define RouteDiscovery_size 40 #define Routing_size 42 #define ToRadio_size 324 diff --git a/src/mesh/generated/module_config.pb.h b/src/mesh/generated/module_config.pb.h index 52b551590..da13ac706 100644 --- a/src/mesh/generated/module_config.pb.h +++ b/src/mesh/generated/module_config.pb.h @@ -301,6 +301,7 @@ extern "C" { + #define ModuleConfig_AudioConfig_bitrate_ENUMTYPE ModuleConfig_AudioConfig_Audio_Baud #define ModuleConfig_SerialConfig_baud_ENUMTYPE ModuleConfig_SerialConfig_Serial_Baud diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index f4004139a..49ad586dd 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -21,10 +21,6 @@ String statusTopic = "msh/2/stat/"; String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID -static MemoryDynamic staticMqttPool; - -Allocator &mqttPool = staticMqttPool; - void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { mqtt->onPublish(topic, payload, length); @@ -126,7 +122,7 @@ void mqttInit() new MQTT(); } -MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE) +MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient) { if(moduleConfig.mqtt.enabled) { @@ -253,36 +249,9 @@ int32_t MQTT::runOnce() if (!pubSub.loop()) { if (wantConnection) { reconnect(); - - // If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) - if (pubSub.connected()) { - if (!mqttQueue.isEmpty()) { - // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets - ServiceEnvelope *env = mqttQueue.dequeuePtr(0); - static uint8_t bytes[MeshPacket_size + 64]; - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env); - - String topic = cryptTopic + env->channel_id + "/" + owner.id; - LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes); - - - pubSub.publish(topic.c_str(), bytes, numBytes, false); - - if (moduleConfig.mqtt.json_enabled) { - // handle json topic - auto jsonString = this->downstreamPacketToJson(env->packet); - if (jsonString.length() != 0) { - String topicJson = jsonTopic + env->channel_id + "/" + owner.id; - LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); - pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); - } - } - mqttPool.release(env); - } - return 20; - } else { - return 30000; - } + + // If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) + return pubSub.connected() ? 20 : 30000; } else return 5000; // If we don't want connection now, check again in 5 secs } else { @@ -304,17 +273,17 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) if (ch.settings.uplink_enabled) { const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel - ServiceEnvelope *env = mqttPool.allocZeroed(); - env->channel_id = (char *)channelId; - env->gateway_id = owner.id; - env->packet = (MeshPacket *)∓ + ServiceEnvelope env = ServiceEnvelope_init_default; + env.channel_id = (char *)channelId; + env.gateway_id = owner.id; + env.packet = (MeshPacket *)∓ // don't bother sending if not connected... if (pubSub.connected()) { // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets static uint8_t bytes[MeshPacket_size + 64]; - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env); + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, &env); String topic = cryptTopic + channelId + "/" + owner.id; LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes); @@ -330,19 +299,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); } } - } else { - LOG_INFO("MQTT not connected, queueing packet\n"); - if (mqttQueue.numFree() == 0) { - LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n"); - ServiceEnvelope *d = mqttQueue.dequeuePtr(0); - if (d) - mqttPool.release(d); - } - // make a copy of serviceEnvelope and queue it - ServiceEnvelope *copied = mqttPool.allocCopy(*env); - assert(mqttQueue.enqueue(copied, 0)); } - mqttPool.release(env); } } diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index ddbacbcc4..33fbbb8eb 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -13,8 +13,6 @@ #include #endif -#define MAX_MQTT_QUEUE 32 - /** * Our wrapper/singleton for sending/receiving MQTT "udp" packets. This object isolates the MQTT protocol implementation from * the two components that use it: MQTTPlugin and MQTTSimInterface. @@ -55,10 +53,9 @@ class MQTT : private concurrency::OSThread bool connected(); protected: - PointerQueue mqttQueue; int reconnectCount = 0; - + virtual int32_t runOnce() override; private: diff --git a/src/platform/portduino/SimRadio.cpp b/src/platform/portduino/SimRadio.cpp index 87800de22..a2611a464 100644 --- a/src/platform/portduino/SimRadio.cpp +++ b/src/platform/portduino/SimRadio.cpp @@ -215,6 +215,16 @@ void SimRadio::startReceive(MeshPacket *p) { handleReceiveInterrupt(p); } +QueueStatus SimRadio::getQueueStatus() +{ + QueueStatus qs; + + qs.res = qs.mesh_packet_id = 0; + qs.free = txQueue.getFree(); + qs.maxlen = txQueue.getMaxLen(); + + return qs; +} void SimRadio::handleReceiveInterrupt(MeshPacket *p) { diff --git a/src/platform/portduino/SimRadio.h b/src/platform/portduino/SimRadio.h index a71cf22f8..d2a36c81e 100644 --- a/src/platform/portduino/SimRadio.h +++ b/src/platform/portduino/SimRadio.h @@ -45,6 +45,9 @@ class SimRadio : public RadioInterface */ virtual void startReceive(MeshPacket *p); + QueueStatus getQueueStatus() override; + + protected: /// are _trying_ to receive a packet currently (note - we might just be waiting for one) bool isReceiving = false;