mirror of
https://github.com/meshtastic/firmware.git
synced 2025-08-14 01:05:16 +00:00
More comprehensive client proxy queue guards (#3414)
* More comprehensive MQTT thread and queue guards * Consolidate logic * Remove channel check * Check for map_reporting_enabled as well * Update message * Remove channel check from here as well * One liner * Start the mqtt thread back up when channels change and we want mqtt
This commit is contained in:
parent
4d0d82f7e7
commit
52cfec29fc
@ -7,6 +7,8 @@
|
|||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
|
#include "mqtt/MQTT.h"
|
||||||
|
|
||||||
/// 16 bytes of random PSK for our _public_ default channel that all devices power up on (AES128)
|
/// 16 bytes of random PSK for our _public_ default channel that all devices power up on (AES128)
|
||||||
static const uint8_t defaultpsk[] = {0xd4, 0xf1, 0xbb, 0x3a, 0x20, 0x29, 0x07, 0x59,
|
static const uint8_t defaultpsk[] = {0xd4, 0xf1, 0xbb, 0x3a, 0x20, 0x29, 0x07, 0x59,
|
||||||
0xf0, 0xbc, 0xff, 0xab, 0xcf, 0x4e, 0x69, 0x01};
|
0xf0, 0xbc, 0xff, 0xab, 0xcf, 0x4e, 0x69, 0x01};
|
||||||
@ -193,6 +195,10 @@ void Channels::onConfigChanged()
|
|||||||
if (ch.role == meshtastic_Channel_Role_PRIMARY)
|
if (ch.role == meshtastic_Channel_Role_PRIMARY)
|
||||||
primaryIndex = i;
|
primaryIndex = i;
|
||||||
}
|
}
|
||||||
|
if (channels.anyMqttEnabled() && mqtt && !mqtt->isEnabled()) {
|
||||||
|
LOG_DEBUG("MQTT is enabled on at least one channel, so set MQTT thread to run immediately\n");
|
||||||
|
mqtt->start();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
meshtastic_Channel &Channels::getByIndex(ChannelIndex chIndex)
|
meshtastic_Channel &Channels::getByIndex(ChannelIndex chIndex)
|
||||||
@ -237,6 +243,16 @@ void Channels::setChannel(const meshtastic_Channel &c)
|
|||||||
old = c; // slam in the new settings/role
|
old = c; // slam in the new settings/role
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Channels::anyMqttEnabled()
|
||||||
|
{
|
||||||
|
for (int i = 0; i < getNumChannels(); i++)
|
||||||
|
if (channelFile.channels[i].role != meshtastic_Channel_Role_DISABLED && channelFile.channels[i].has_settings &&
|
||||||
|
(channelFile.channels[i].settings.downlink_enabled || channelFile.channels[i].settings.uplink_enabled))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
const char *Channels::getName(size_t chIndex)
|
const char *Channels::getName(size_t chIndex)
|
||||||
{
|
{
|
||||||
// Convert the short "" representation for Default into a usable string
|
// Convert the short "" representation for Default into a usable string
|
||||||
|
@ -105,6 +105,9 @@ class Channels
|
|||||||
// Returns true if we can be reached via a channel with the default settings given a region and modem preset
|
// Returns true if we can be reached via a channel with the default settings given a region and modem preset
|
||||||
bool hasDefaultChannel();
|
bool hasDefaultChannel();
|
||||||
|
|
||||||
|
// Returns true if any of our channels have enabled MQTT uplink or downlink
|
||||||
|
bool anyMqttEnabled();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/** Given a channel index, change to use the crypto key specified by that index
|
/** Given a channel index, change to use the crypto key specified by that index
|
||||||
*
|
*
|
||||||
|
@ -105,8 +105,12 @@ bool PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength)
|
|||||||
break;
|
break;
|
||||||
case meshtastic_ToRadio_mqttClientProxyMessage_tag:
|
case meshtastic_ToRadio_mqttClientProxyMessage_tag:
|
||||||
LOG_INFO("Got MqttClientProxy message\n");
|
LOG_INFO("Got MqttClientProxy message\n");
|
||||||
if (mqtt && moduleConfig.mqtt.proxy_to_client_enabled) {
|
if (mqtt && moduleConfig.mqtt.proxy_to_client_enabled && moduleConfig.mqtt.enabled &&
|
||||||
|
(channels.anyMqttEnabled() || moduleConfig.mqtt.map_reporting_enabled)) {
|
||||||
mqtt->onClientProxyReceive(toRadioScratch.mqttClientProxyMessage);
|
mqtt->onClientProxyReceive(toRadioScratch.mqttClientProxyMessage);
|
||||||
|
} else {
|
||||||
|
LOG_WARN("MqttClientProxy received but proxy is not enabled, no channels have up/downlink, or map reporting "
|
||||||
|
"not enabled\n");
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -371,22 +371,9 @@ void MQTT::sendSubscriptions()
|
|||||||
|
|
||||||
bool MQTT::wantsLink() const
|
bool MQTT::wantsLink() const
|
||||||
{
|
{
|
||||||
bool hasChannelorMapReport = false;
|
bool hasChannelorMapReport =
|
||||||
|
moduleConfig.mqtt.enabled && (moduleConfig.mqtt.map_reporting_enabled || channels.anyMqttEnabled());
|
||||||
|
|
||||||
if (moduleConfig.mqtt.enabled) {
|
|
||||||
hasChannelorMapReport = moduleConfig.mqtt.map_reporting_enabled;
|
|
||||||
if (!hasChannelorMapReport) {
|
|
||||||
// No need for link if no channel needed it
|
|
||||||
size_t numChan = channels.getNumChannels();
|
|
||||||
for (size_t i = 0; i < numChan; i++) {
|
|
||||||
const auto &ch = channels.getByIndex(i);
|
|
||||||
if (ch.settings.uplink_enabled || ch.settings.downlink_enabled) {
|
|
||||||
hasChannelorMapReport = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (hasChannelorMapReport && moduleConfig.mqtt.proxy_to_client_enabled)
|
if (hasChannelorMapReport && moduleConfig.mqtt.proxy_to_client_enabled)
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
@ -401,7 +388,7 @@ bool MQTT::wantsLink() const
|
|||||||
|
|
||||||
int32_t MQTT::runOnce()
|
int32_t MQTT::runOnce()
|
||||||
{
|
{
|
||||||
if (!moduleConfig.mqtt.enabled)
|
if (!moduleConfig.mqtt.enabled || !(moduleConfig.mqtt.map_reporting_enabled || channels.anyMqttEnabled()))
|
||||||
return disable();
|
return disable();
|
||||||
|
|
||||||
bool wantConnection = wantsLink();
|
bool wantConnection = wantsLink();
|
||||||
|
@ -71,6 +71,10 @@ class MQTT : private concurrency::OSThread
|
|||||||
|
|
||||||
void onClientProxyReceive(meshtastic_MqttClientProxyMessage msg);
|
void onClientProxyReceive(meshtastic_MqttClientProxyMessage msg);
|
||||||
|
|
||||||
|
bool isEnabled() { return this->enabled; };
|
||||||
|
|
||||||
|
void start() { setIntervalFromNow(0); };
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
PointerQueue<meshtastic_ServiceEnvelope> mqttQueue;
|
PointerQueue<meshtastic_ServiceEnvelope> mqttQueue;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user