From bfc6a1940d85e17ea42c9509a0e26151850710ea Mon Sep 17 00:00:00 2001 From: GUVWAF Date: Sat, 16 Nov 2024 16:32:24 +0100 Subject: [PATCH] Handle repeated/fallback to flooding packets properly First check if it's not still in the TxQueue --- src/mesh/FloodingRouter.cpp | 37 +++++++++++++------------ src/mesh/FloodingRouter.h | 8 ++++-- src/mesh/MeshPacketQueue.cpp | 13 +++++++++ src/mesh/MeshPacketQueue.h | 5 +++- src/mesh/NextHopRouter.cpp | 43 +++++++++++++++++++++-------- src/mesh/NextHopRouter.h | 4 +++ src/mesh/PacketHistory.cpp | 26 ++++++++--------- src/mesh/PacketHistory.h | 3 +- src/mesh/RadioInterface.h | 3 ++ src/mesh/RadioLibInterface.cpp | 6 ++++ src/mesh/RadioLibInterface.h | 3 ++ src/mesh/Router.cpp | 6 ++++ src/mesh/Router.h | 3 ++ src/platform/portduino/SimRadio.cpp | 8 +++++- src/platform/portduino/SimRadio.h | 3 ++ 15 files changed, 122 insertions(+), 49 deletions(-) diff --git a/src/mesh/FloodingRouter.cpp b/src/mesh/FloodingRouter.cpp index dfc2ded22..e177d3883 100644 --- a/src/mesh/FloodingRouter.cpp +++ b/src/mesh/FloodingRouter.cpp @@ -22,23 +22,16 @@ ErrorCode FloodingRouter::send(meshtastic_MeshPacket *p) bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) { if (wasSeenRecently(p)) { // Note: this will also add a recent packet record - printPacket("Ignore dupe incoming msg", p); - rxDupe++; - if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER && - config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER) { - // cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater! - if (Router::cancelSending(p->from, p->id)) - txRelayCanceled++; - } - /* If the original transmitter is doing retransmissions (hopStart equals hopLimit) for a reliable transmission, e.g., when - the ACK got lost, we will handle the packet again to make sure it gets an ACK to its packet. */ + the ACK got lost, we will handle the packet again to make sure it gets an implicit ACK. */ bool isRepeated = p->hop_start > 0 && p->hop_start == p->hop_limit; if (isRepeated) { LOG_DEBUG("Repeated reliable tx"); - if (!perhapsRebroadcast(p) && isToUs(p) && p->want_ack) { - sendAckNak(meshtastic_Routing_Error_NONE, getFrom(p), p->id, p->channel, 0); - } + // Check if it's still in the Tx queue, if not, we have to relay it again + if (!findInTxQueue(p->from, p->id)) + perhapsRebroadcast(p); + } else { + perhapsCancelDupe(p); } return true; @@ -47,13 +40,25 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) return Router::shouldFilterReceived(p); } +void FloodingRouter::perhapsCancelDupe(const meshtastic_MeshPacket *p) +{ + printPacket("Ignore dupe incoming msg", p); + rxDupe++; + if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER && + config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER) { + // cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater! + if (Router::cancelSending(p->from, p->id)) + txRelayCanceled++; + } +} + bool FloodingRouter::isRebroadcaster() { return config.device.role != meshtastic_Config_DeviceConfig_Role_CLIENT_MUTE && config.device.rebroadcast_mode != meshtastic_Config_DeviceConfig_RebroadcastMode_NONE; } -bool FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p) +void FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p) { if (!isToUs(p) && (p->hop_limit > 0) && !isFromUs(p)) { if (p->id != 0) { @@ -73,8 +78,6 @@ bool FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p) // Note: we are careful to resend using the original senders node id // We are careful not to call our hooked version of send() - because we don't want to check this again Router::send(tosend); - - return true; } else { LOG_DEBUG("No rebroadcast: Role = CLIENT_MUTE or Rebroadcast Mode = NONE"); } @@ -82,8 +85,6 @@ bool FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p) LOG_DEBUG("Ignore 0 id broadcast"); } } - - return false; } void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) diff --git a/src/mesh/FloodingRouter.h b/src/mesh/FloodingRouter.h index 1b5c78c0f..36c6ad8aa 100644 --- a/src/mesh/FloodingRouter.h +++ b/src/mesh/FloodingRouter.h @@ -28,9 +28,8 @@ class FloodingRouter : public Router { private: - /** Check if we should rebroadcast this packet, and do so if needed - * @return true if rebroadcasted */ - bool perhapsRebroadcast(const meshtastic_MeshPacket *p); + /* Check if we should rebroadcast this packet, and do so if needed */ + void perhapsRebroadcast(const meshtastic_MeshPacket *p); public: /** @@ -60,6 +59,9 @@ class FloodingRouter : public Router */ virtual void sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) override; + /* Call when receiving a duplicate packet to check whether we should cancel a packet in the Tx queue */ + void perhapsCancelDupe(const meshtastic_MeshPacket *p); + // Return true if we are a rebroadcaster bool isRebroadcaster(); }; \ No newline at end of file diff --git a/src/mesh/MeshPacketQueue.cpp b/src/mesh/MeshPacketQueue.cpp index 99ef41c1e..58aa8f128 100644 --- a/src/mesh/MeshPacketQueue.cpp +++ b/src/mesh/MeshPacketQueue.cpp @@ -107,6 +107,19 @@ meshtastic_MeshPacket *MeshPacketQueue::remove(NodeNum from, PacketId id) return NULL; } +/* Attempt to find a packet from this queue. Return true if it was found. */ +bool MeshPacketQueue::find(NodeNum from, PacketId id) +{ + for (auto it = queue.begin(); it != queue.end(); it++) { + auto p = (*it); + if (getFrom(p) == from && p->id == id) { + return true; + } + } + + return false; +} + /** Attempt to find and remove a packet from this queue. Returns the packet which was removed from the queue */ bool MeshPacketQueue::replaceLowerPriorityPacket(meshtastic_MeshPacket *p) { diff --git a/src/mesh/MeshPacketQueue.h b/src/mesh/MeshPacketQueue.h index 3c28fc5ce..00c15e493 100644 --- a/src/mesh/MeshPacketQueue.h +++ b/src/mesh/MeshPacketQueue.h @@ -37,4 +37,7 @@ class MeshPacketQueue /** Attempt to find and remove a packet from this queue. Returns the packet which was removed from the queue */ meshtastic_MeshPacket *remove(NodeNum from, PacketId id); -}; + + /* Attempt to find a packet from this queue. Return true if it was found. */ + bool find(NodeNum from, PacketId id); +}; \ No newline at end of file diff --git a/src/mesh/NextHopRouter.cpp b/src/mesh/NextHopRouter.cpp index aaa38c359..a7dff63f6 100644 --- a/src/mesh/NextHopRouter.cpp +++ b/src/mesh/NextHopRouter.cpp @@ -30,15 +30,25 @@ ErrorCode NextHopRouter::send(meshtastic_MeshPacket *p) bool NextHopRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) { - if (wasSeenRecently(p)) { // Note: this will return false for a fallback to flooding - printPacket("Already seen, try stop re-Tx and cancel sending", p); - rxDupe++; + bool wasFallback = false; + if (wasSeenRecently(p, true, &wasFallback)) { // Note: this will also add a recent packet record stopRetransmission(p->from, p->id); - if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER && - config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER) { - // cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater! - if (Router::cancelSending(p->from, p->id)) - txRelayCanceled++; + + // If it was a fallback to flooding, try to relay again + if (wasFallback) { + LOG_INFO("Fallback to flooding from relay_node=0x%x", p->relay_node); + // Check if it's still in the Tx queue, if not, we have to relay it again + if (!findInTxQueue(p->from, p->id)) + perhapsRelay(p); + } else { + bool isRepeated = p->hop_start > 0 && p->hop_start == p->hop_limit; + // If repeated and not in Tx queue anymore, try relaying again, or if we are the destination, send the ACK again + if (isRepeated) { + if (!findInTxQueue(p->from, p->id) && !perhapsRelay(p) && isToUs(p) && p->want_ack) + sendAckNak(meshtastic_Routing_Error_NONE, getFrom(p), p->id, p->channel, 0); + } else { + perhapsCancelDupe(p); // If it's a dupe, cancel relay + } } return true; } @@ -75,21 +85,32 @@ void NextHopRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtast } } + perhapsRelay(p); + + // handle the packet as normal + Router::sniffReceived(p, c); +} + +/* Check if we should be relaying this packet if so, do so. */ +bool NextHopRouter::perhapsRelay(const meshtastic_MeshPacket *p) +{ if (!isToUs(p) && !isFromUs(p) && p->hop_limit > 0) { - if (p->next_hop == NO_NEXT_HOP_PREFERENCE || p->next_hop == ourRelayID) { + if (p->next_hop == NO_NEXT_HOP_PREFERENCE || p->next_hop == nodeDB->getLastByteOfNodeNum(getNodeNum())) { if (isRebroadcaster()) { meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it LOG_INFO("Relaying received message coming from %x", p->relay_node); tosend->hop_limit--; // bump down the hop count NextHopRouter::send(tosend); + + return true; } else { LOG_DEBUG("Not rebroadcasting: Role = CLIENT_MUTE or Rebroadcast Mode = NONE"); } } } - // handle the packet as normal - Router::sniffReceived(p, c); + + return false; } /** diff --git a/src/mesh/NextHopRouter.h b/src/mesh/NextHopRouter.h index 2ac0b7fa3..6c2764aff 100644 --- a/src/mesh/NextHopRouter.h +++ b/src/mesh/NextHopRouter.h @@ -144,4 +144,8 @@ class NextHopRouter : public FloodingRouter * @return the node number of the next hop, 0 if no preference (fallback to FloodingRouter) */ uint8_t getNextHop(NodeNum to, uint8_t relay_node); + + /** Check if we should be relaying this packet if so, do so. + * @return true if we did relay */ + bool perhapsRelay(const meshtastic_MeshPacket *p); }; \ No newline at end of file diff --git a/src/mesh/PacketHistory.cpp b/src/mesh/PacketHistory.cpp index c663584c2..73c3d56f1 100644 --- a/src/mesh/PacketHistory.cpp +++ b/src/mesh/PacketHistory.cpp @@ -16,7 +16,7 @@ PacketHistory::PacketHistory() /** * Update recentBroadcasts and return true if we have already seen this packet */ -bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate) +bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate, bool *wasFallback) { if (p->id == 0) { LOG_DEBUG("Ignore message with zero id"); @@ -41,21 +41,19 @@ bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpd seenRecently = false; } - if (seenRecently) { - // If it was seen with a next-hop not set to us and now it's NO_NEXT_HOP_PREFERENCE, and the relayer relayed already - // before, it's a fallback to flooding. If we didn't already relay and the next-hop neither, consider it unseen because we - // might need to handle it now - uint8_t ourRelayID = nodeDB->getLastByteOfNodeNum(nodeDB->getNodeNum()); - if (found->sender != nodeDB->getNodeNum() && found->next_hop != NO_NEXT_HOP_PREFERENCE && found->next_hop != ourRelayID && - p->next_hop == NO_NEXT_HOP_PREFERENCE && wasRelayer(p->relay_node, found) && !wasRelayer(ourRelayID, found) && - !wasRelayer(found->next_hop, found)) { - LOG_INFO("Fallback to flooding, consider unseen relay_node=0x%x", p->relay_node); - seenRecently = false; - } - } - if (seenRecently) { LOG_DEBUG("Found existing packet record for fr=0x%x,to=0x%x,id=0x%x", p->from, p->to, p->id); + if (wasFallback) { + // If it was seen with a next-hop not set to us and now it's NO_NEXT_HOP_PREFERENCE, and the relayer relayed already + // before, it's a fallback to flooding. If we didn't already relay and the next-hop neither, we might need to handle + // it now. + uint8_t ourRelayID = nodeDB->getLastByteOfNodeNum(nodeDB->getNodeNum()); + if (found->sender != nodeDB->getNodeNum() && found->next_hop != NO_NEXT_HOP_PREFERENCE && + found->next_hop != ourRelayID && p->next_hop == NO_NEXT_HOP_PREFERENCE && wasRelayer(p->relay_node, found) && + !wasRelayer(ourRelayID, found) && !wasRelayer(found->next_hop, found)) { + *wasFallback = true; + } + } } if (withUpdate) { diff --git a/src/mesh/PacketHistory.h b/src/mesh/PacketHistory.h index aae5ebff1..15c071da7 100644 --- a/src/mesh/PacketHistory.h +++ b/src/mesh/PacketHistory.h @@ -45,8 +45,9 @@ class PacketHistory * Update recentBroadcasts and return true if we have already seen this packet * * @param withUpdate if true and not found we add an entry to recentPackets + * @param wasFallback if not nullptr, packet will be checked for fallback to flooding and value will be set to true if so */ - bool wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate = true); + bool wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate = true, bool *wasFallback = nullptr); /* Check if a certain node was a relayer of a packet in the history given an ID and sender * @return true if node was indeed a relayer, false if not */ diff --git a/src/mesh/RadioInterface.h b/src/mesh/RadioInterface.h index 8d3c19c42..1e0adb7a7 100644 --- a/src/mesh/RadioInterface.h +++ b/src/mesh/RadioInterface.h @@ -155,6 +155,9 @@ class RadioInterface /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ virtual bool cancelSending(NodeNum from, PacketId id) { return false; } + /** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */ + virtual bool findInTxQueue(NodeNum from, PacketId id) { return false; } + // methods from radiohead /// Initialise the Driver transport hardware and software. diff --git a/src/mesh/RadioLibInterface.cpp b/src/mesh/RadioLibInterface.cpp index 2f6ffc2a0..eece61ca5 100644 --- a/src/mesh/RadioLibInterface.cpp +++ b/src/mesh/RadioLibInterface.cpp @@ -246,6 +246,12 @@ bool RadioLibInterface::cancelSending(NodeNum from, PacketId id) return result; } +/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */ +bool RadioLibInterface::findInTxQueue(NodeNum from, PacketId id) +{ + return txQueue.find(from, id); +} + /** radio helper thread callback. We never immediately transmit after any operation (either Rx or Tx). Instead we should wait a random multiple of 'slotTimes' (see definition in RadioInterface.h) taken from a contention window (CW) to lower the chance of collision. diff --git a/src/mesh/RadioLibInterface.h b/src/mesh/RadioLibInterface.h index a5c2e30dd..f9dfbfd70 100644 --- a/src/mesh/RadioLibInterface.h +++ b/src/mesh/RadioLibInterface.h @@ -142,6 +142,9 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ virtual bool cancelSending(NodeNum from, PacketId id) override; + /** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */ + virtual bool findInTxQueue(NodeNum from, PacketId id) override; + private: /** if we have something waiting to send, start a short (random) timer so we can come check for collision before actually * doing the transmit */ diff --git a/src/mesh/Router.cpp b/src/mesh/Router.cpp index cc33bd0a2..93913bda7 100644 --- a/src/mesh/Router.cpp +++ b/src/mesh/Router.cpp @@ -299,6 +299,12 @@ bool Router::cancelSending(NodeNum from, PacketId id) return false; } +/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */ +bool Router::findInTxQueue(NodeNum from, PacketId id) +{ + return iface->findInTxQueue(from, id); +} + /** * Every (non duplicate) packet this node receives will be passed through this method. This allows subclasses to * update routing tables etc... based on what we overhear (even for messages not destined to our node) diff --git a/src/mesh/Router.h b/src/mesh/Router.h index 14ce4810b..e74f7c2fd 100644 --- a/src/mesh/Router.h +++ b/src/mesh/Router.h @@ -51,6 +51,9 @@ class Router : protected concurrency::OSThread, protected PacketHistory /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ bool cancelSending(NodeNum from, PacketId id); + /** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */ + bool findInTxQueue(NodeNum from, PacketId id); + /** Allocate and return a meshpacket which defaults as send to broadcast from the current node. * The returned packet is guaranteed to have a unique packet ID already assigned */ diff --git a/src/platform/portduino/SimRadio.cpp b/src/platform/portduino/SimRadio.cpp index 0a77b6088..845fce7fe 100644 --- a/src/platform/portduino/SimRadio.cpp +++ b/src/platform/portduino/SimRadio.cpp @@ -133,6 +133,12 @@ bool SimRadio::cancelSending(NodeNum from, PacketId id) return result; } +/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */ +bool SimRadio::findInTxQueue(NodeNum from, PacketId id) +{ + return txQueue.find(from, id); +} + void SimRadio::onNotify(uint32_t notification) { switch (notification) { @@ -265,4 +271,4 @@ int16_t SimRadio::readData(uint8_t *data, size_t len) } return state; -} +} \ No newline at end of file diff --git a/src/platform/portduino/SimRadio.h b/src/platform/portduino/SimRadio.h index 1edb4963b..886705a7e 100644 --- a/src/platform/portduino/SimRadio.h +++ b/src/platform/portduino/SimRadio.h @@ -38,6 +38,9 @@ class SimRadio : public RadioInterface, protected concurrency::NotifiedWorkerThr /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ virtual bool cancelSending(NodeNum from, PacketId id) override; + /** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */ + virtual bool findInTxQueue(NodeNum from, PacketId id) override; + /** * Start waiting to receive a message *