Merge pull request #2113 from meshtastic/mqtt-fix

Find the downlink channel to post packet to.
This commit is contained in:
Thomas Göttgens 2023-01-10 20:49:18 +01:00 committed by GitHub
commit 6b0f18e1e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 22 deletions

View File

@ -210,7 +210,7 @@ Channel &Channels::getByIndex(ChannelIndex chIndex)
Channel &Channels::getByName(const char* chName)
{
for (ChannelIndex i = 0; i < getNumChannels(); i++) {
if (strcasecmp(channelFile.channels[i].settings.name, chName) == 0) {
if (strcasecmp(getGlobalId(i), chName) == 0) {
return channelFile.channels[i];
}
}

View File

@ -55,11 +55,13 @@ static int32_t reconnectWiFi()
// Make sure we clear old connection credentials
WiFi.disconnect(false, true);
LOG_INFO("Reconnecting to WiFi access point %s\n",wifiName);
WiFi.mode(WIFI_MODE_STA);
WiFi.begin(wifiName, wifiPsw);
delay(5000);
if (!WiFi.isConnected()) {
WiFi.begin(wifiName, wifiPsw);
}
}
#ifndef DISABLE_NTP
@ -167,7 +169,7 @@ bool initWifi()
WiFi.mode(WIFI_MODE_STA);
WiFi.setHostname(ourHost);
WiFi.onEvent(WiFiEvent);
WiFi.setAutoReconnect(false);
WiFi.setAutoReconnect(true);
WiFi.setSleep(false);
if (config.network.address_mode == Config_NetworkConfig_AddressMode_STATIC && config.network.ipv4_config.ip != 0) {
WiFi.config(config.network.ipv4_config.ip,
@ -182,7 +184,8 @@ bool initWifi()
WiFi.onEvent(
[](WiFiEvent_t event, WiFiEventInfo_t info) {
LOG_WARN("WiFi lost connection. Reason: %d", info.wifi_sta_disconnected.reason);
LOG_WARN("WiFi lost connection. Reason: %d\n", info.wifi_sta_disconnected.reason);
/*
If we are disconnected from the AP for some reason,

View File

@ -43,9 +43,20 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
JSONValue *json_value = JSON::Parse(payloadStr);
if (json_value != NULL) {
LOG_INFO("JSON Received on MQTT, parsing..\n");
// check if it is a valid envelope
JSONObject json;
json = json_value->AsObject();
// parse the channel name from the topic string
char *ptr = strtok(topic, "/");
for (int i = 0; i < 3; i++) {
ptr = strtok(NULL, "/");
}
LOG_DEBUG("Looking for Channel name: %s\n", ptr);
Channel sendChannel = channels.getByName(ptr);
LOG_DEBUG("Found Channel name: %s (Index %d)\n", channels.getGlobalId(sendChannel.settings.channel_num), sendChannel.settings.channel_num);
if ((json.find("sender") != json.end()) && (json.find("payload") != json.end()) && (json.find("type") != json.end()) && json["type"]->IsString() && (json["type"]->AsString().compare("sendtext") == 0)) {
// this is a valid envelope
if (json["payload"]->IsString() && json["type"]->IsString() && (json["sender"]->AsString().compare(owner.id) != 0)) {
@ -55,13 +66,18 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
// construct protobuf data packet using TEXT_MESSAGE, send it to the mesh
MeshPacket *p = router->allocForSending();
p->decoded.portnum = PortNum_TEXT_MESSAGE_APP;
if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) {
memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length());
p->decoded.payload.size = jsonPayloadStr.length();
MeshPacket *packet = packetPool.allocCopy(*p);
service.sendToMesh(packet, RX_SRC_LOCAL);
p->channel = sendChannel.settings.channel_num;
if (sendChannel.settings.downlink_enabled) {
if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) {
memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length());
p->decoded.payload.size = jsonPayloadStr.length();
MeshPacket *packet = packetPool.allocCopy(*p);
service.sendToMesh(packet, RX_SRC_LOCAL);
} else {
LOG_WARN("Received MQTT json payload too long, dropping\n");
}
} else {
LOG_WARN("Received MQTT json payload too long, dropping\n");
LOG_WARN("Received MQTT json payload on channel %s, but downlink is disabled, dropping\n", sendChannel.settings.name);
}
} else {
LOG_DEBUG("JSON Ignoring downlink message we originally sent.\n");
@ -80,9 +96,13 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
// construct protobuf data packet using POSITION, send it to the mesh
MeshPacket *p = router->allocForSending();
p->decoded.portnum = PortNum_POSITION_APP;
p->decoded.payload.size = pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &Position_msg, &pos); //make the Data protobuf from position
service.sendToMesh(p, RX_SRC_LOCAL);
p->channel = sendChannel.settings.channel_num;
if (sendChannel.settings.downlink_enabled) {
p->decoded.payload.size = pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &Position_msg, &pos); //make the Data protobuf from position
service.sendToMesh(p, RX_SRC_LOCAL);
} else {
LOG_WARN("Received MQTT json payload on channel %s, but downlink is disabled, dropping\n", sendChannel.settings.name);
}
} else {
LOG_DEBUG("JSON Ignoring downlink message we originally sent.\n");
}
@ -187,14 +207,17 @@ void MQTT::reconnect()
sendSubscriptions();
} else {
LOG_ERROR("Failed to contact MQTT server (%d/10)...\n",reconnectCount);
#if HAS_WIFI && !defined(ARCH_PORTDUINO)
if (reconnectCount > 9) {
LOG_ERROR("Failed to contact MQTT server (%d/5)...\n",reconnectCount + 1);
if (reconnectCount >= 4) {
needReconnect = true;
wifiReconnect->setIntervalFromNow(1000);
wifiReconnect->setIntervalFromNow(0);
reconnectCount = 0;
} else {
reconnectCount++;
}
#endif
reconnectCount++;
}
}
}
@ -264,7 +287,6 @@ int32_t MQTT::runOnce()
String topic = cryptTopic + env->channel_id + "/" + owner.id;
LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);
pubSub.publish(topic.c_str(), bytes, numBytes, false);
@ -279,7 +301,7 @@ int32_t MQTT::runOnce()
}
mqttPool.release(env);
}
return 20;
return 200;
} else {
return 30000;
}

View File

@ -13,7 +13,7 @@
#include <EthernetClient.h>
#endif
#define MAX_MQTT_QUEUE 32
#define MAX_MQTT_QUEUE 16
/**
* Our wrapper/singleton for sending/receiving MQTT "udp" packets. This object isolates the MQTT protocol implementation from