Refactor MQTT: only publish on LoRa Tx if packet is from us and on Rx if not (#3245)

Such that direct message to MQTT node gets published and we get rid of always rebroadcasting when MQTT is enabled

Co-authored-by: Ben Meadors <benmmeadors@gmail.com>
This commit is contained in:
GUVWAF 2024-02-21 20:00:14 +01:00 committed by GitHub
parent 9784758c7b
commit eb8a12e5a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 18 additions and 9 deletions

View File

@ -21,7 +21,7 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
{ {
if (wasSeenRecently(p)) { // Note: this will also add a recent packet record if (wasSeenRecently(p)) { // Note: this will also add a recent packet record
printPacket("Ignoring incoming msg, because we've already seen it", p); printPacket("Ignoring incoming msg, because we've already seen it", p);
if (!moduleConfig.mqtt.enabled && config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER && if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER &&
config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER_CLIENT && config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER_CLIENT &&
config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER) { config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER) {
// cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater! // cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater!

View File

@ -257,10 +257,9 @@ ErrorCode Router::send(meshtastic_MeshPacket *p)
return encodeResult; // FIXME - this isn't a valid ErrorCode return encodeResult; // FIXME - this isn't a valid ErrorCode
} }
if (moduleConfig.mqtt.enabled) { // Only publish to MQTT if we're the original transmitter of the packet
LOG_INFO("Should encrypt MQTT?: %d\n", moduleConfig.mqtt.encryption_enabled); if (moduleConfig.mqtt.enabled && p->from == nodeDB.getNodeNum() && mqtt) {
if (mqtt) mqtt->onSend(*p, *p_decoded, chIndex);
mqtt->onSend(*p, *p_decoded, chIndex);
} }
packetPool.release(p_decoded); packetPool.release(p_decoded);
} }
@ -438,6 +437,8 @@ void Router::handleReceived(meshtastic_MeshPacket *p, RxSource src)
{ {
// Also, we should set the time from the ISR and it should have msec level resolution // Also, we should set the time from the ISR and it should have msec level resolution
p->rx_time = getValidTime(RTCQualityFromNet); // store the arrival timestamp for the phone p->rx_time = getValidTime(RTCQualityFromNet); // store the arrival timestamp for the phone
// Store a copy of encrypted packet for MQTT
meshtastic_MeshPacket *p_encrypted = packetPool.allocCopy(*p);
// Take those raw bytes and convert them back into a well structured protobuf we can understand // Take those raw bytes and convert them back into a well structured protobuf we can understand
bool decoded = perhapsDecode(p); bool decoded = perhapsDecode(p);
@ -449,10 +450,16 @@ void Router::handleReceived(meshtastic_MeshPacket *p, RxSource src)
printPacket("handleReceived(USER)", p); printPacket("handleReceived(USER)", p);
else else
printPacket("handleReceived(REMOTE)", p); printPacket("handleReceived(REMOTE)", p);
// Publish received message to MQTT if we're not the original transmitter of the packet
if (moduleConfig.mqtt.enabled && getFrom(p) != nodeDB.getNodeNum() && mqtt)
mqtt->onSend(*p_encrypted, *p, p->channel);
} else { } else {
printPacket("packet decoding failed or skipped (no PSK?)", p); printPacket("packet decoding failed or skipped (no PSK?)", p);
} }
packetPool.release(p_encrypted); // Release the encrypted packet
// call modules here // call modules here
MeshModule::callPlugins(*p, src); MeshModule::callPlugins(*p, src);
} }
@ -474,4 +481,4 @@ void Router::perhapsHandleReceived(meshtastic_MeshPacket *p)
handleReceived(p); handleReceived(p);
packetPool.release(p); packetPool.release(p);
} }

View File

@ -485,14 +485,15 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket &
env->channel_id = (char *)channelId; env->channel_id = (char *)channelId;
env->gateway_id = owner.id; env->gateway_id = owner.id;
LOG_DEBUG("MQTT onSend - Publishing ");
if (moduleConfig.mqtt.encryption_enabled) { if (moduleConfig.mqtt.encryption_enabled) {
env->packet = (meshtastic_MeshPacket *)&mp; env->packet = (meshtastic_MeshPacket *)&mp;
LOG_DEBUG("encrypted message\n");
} else { } else {
env->packet = (meshtastic_MeshPacket *)&mp_decoded; env->packet = (meshtastic_MeshPacket *)&mp_decoded;
LOG_DEBUG("portnum %i message\n", env->packet->decoded.portnum);
} }
LOG_DEBUG("MQTT onSend - Publishing portnum %i message\n", env->packet->decoded.portnum);
if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) { if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) {
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
static uint8_t bytes[meshtastic_MeshPacket_size + 64]; static uint8_t bytes[meshtastic_MeshPacket_size + 64];

View File

@ -50,7 +50,8 @@ class MQTT : private concurrency::OSThread
/** /**
* Publish a packet on the global MQTT server. * Publish a packet on the global MQTT server.
* This hook must be called **after** the packet is encrypted (including the channel being changed to a hash). * @param mp the encrypted packet to publish
* @param mp_decoded the decrypted packet to publish
* @param chIndex the index of the channel for this message * @param chIndex the index of the channel for this message
* *
* Note: for messages we are forwarding on the mesh that we can't find the channel for (because we don't have the keys), we * Note: for messages we are forwarding on the mesh that we can't find the channel for (because we don't have the keys), we