Merge pull request #2133 from meshtastic/master

Master downstream merge
This commit is contained in:
Ben Meadors 2023-01-12 10:22:56 -06:00 committed by GitHub
commit a3636ae8a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 28 additions and 11 deletions

View File

@ -26,7 +26,7 @@ build_flags =
-DCONFIG_NIMBLE_CPP_LOG_LEVEL=2 -DCONFIG_NIMBLE_CPP_LOG_LEVEL=2
-DCONFIG_BT_NIMBLE_MAX_CCCDS=20 -DCONFIG_BT_NIMBLE_MAX_CCCDS=20
-DESP_OPENSSL_SUPPRESS_LEGACY_WARNING -DESP_OPENSSL_SUPPRESS_LEGACY_WARNING
-DDDEBUG_HEAP -DDEBUG_HEAP
lib_deps = lib_deps =
${arduino_base.lib_deps} ${arduino_base.lib_deps}

View File

@ -27,7 +27,7 @@ build_flags =
-DCONFIG_BT_NIMBLE_MAX_CCCDS=20 -DCONFIG_BT_NIMBLE_MAX_CCCDS=20
-DESP_OPENSSL_SUPPRESS_LEGACY_WARNING -DESP_OPENSSL_SUPPRESS_LEGACY_WARNING
-DHAS_BLUETOOTH=0 -DHAS_BLUETOOTH=0
-DDDEBUG_HEAP -DDEBUG_HEAP
lib_deps = lib_deps =
${arduino_base.lib_deps} ${arduino_base.lib_deps}

View File

@ -26,7 +26,7 @@ build_flags =
-DCONFIG_NIMBLE_CPP_LOG_LEVEL=2 -DCONFIG_NIMBLE_CPP_LOG_LEVEL=2
-DCONFIG_BT_NIMBLE_MAX_CCCDS=20 -DCONFIG_BT_NIMBLE_MAX_CCCDS=20
-DESP_OPENSSL_SUPPRESS_LEGACY_WARNING -DESP_OPENSSL_SUPPRESS_LEGACY_WARNING
-DDDEBUG_HEAP -DDEBUG_HEAP
lib_deps = lib_deps =
${arduino_base.lib_deps} ${arduino_base.lib_deps}

View File

@ -21,6 +21,10 @@ String statusTopic = "msh/2/stat/";
String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID
String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID
static MemoryDynamic<ServiceEnvelope> staticMqttPool;
Allocator<ServiceEnvelope> &mqttPool = staticMqttPool;
void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
{ {
mqtt->onPublish(topic, payload, length); mqtt->onPublish(topic, payload, length);
@ -142,7 +146,7 @@ void mqttInit()
new MQTT(); new MQTT();
} }
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient) MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE)
{ {
if(moduleConfig.mqtt.enabled) { if(moduleConfig.mqtt.enabled) {
@ -297,7 +301,7 @@ int32_t MQTT::runOnce()
} }
mqttPool.release(env); mqttPool.release(env);
} }
return 20; return 200;
} else { } else {
return 30000; return 30000;
} }
@ -322,17 +326,17 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
if (ch.settings.uplink_enabled) { if (ch.settings.uplink_enabled) {
const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel
ServiceEnvelope env = ServiceEnvelope_init_default; ServiceEnvelope *env = mqttPool.allocZeroed();
env.channel_id = (char *)channelId; env->channel_id = (char *)channelId;
env.gateway_id = owner.id; env->gateway_id = owner.id;
env.packet = (MeshPacket *)&mp; env->packet = (MeshPacket *)&mp;
// don't bother sending if not connected... // don't bother sending if not connected...
if (pubSub.connected()) { if (pubSub.connected()) {
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
static uint8_t bytes[MeshPacket_size + 64]; static uint8_t bytes[MeshPacket_size + 64];
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, &env); size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env);
String topic = cryptTopic + channelId + "/" + owner.id; String topic = cryptTopic + channelId + "/" + owner.id;
LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes); LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes);
@ -348,7 +352,19 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
} }
} }
} else {
LOG_INFO("MQTT not connected, queueing packet\n");
if (mqttQueue.numFree() == 0) {
LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n");
ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
if (d)
mqttPool.release(d);
} }
// make a copy of serviceEnvelope and queue it
ServiceEnvelope *copied = mqttPool.allocCopy(*env);
assert(mqttQueue.enqueue(copied, 0));
}
mqttPool.release(env);
} }
} }

View File

@ -55,6 +55,7 @@ class MQTT : private concurrency::OSThread
bool connected(); bool connected();
protected: protected:
PointerQueue<ServiceEnvelope> mqttQueue;
int reconnectCount = 0; int reconnectCount = 0;