Handle repeated/fallback to flooding packets properly

First check if it's not still in the TxQueue
This commit is contained in:
GUVWAF 2024-11-16 16:32:24 +01:00
parent 360637c25d
commit bfc6a1940d
15 changed files with 122 additions and 49 deletions

View File

@ -22,23 +22,16 @@ ErrorCode FloodingRouter::send(meshtastic_MeshPacket *p)
bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
{ {
if (wasSeenRecently(p)) { // Note: this will also add a recent packet record if (wasSeenRecently(p)) { // Note: this will also add a recent packet record
printPacket("Ignore dupe incoming msg", p);
rxDupe++;
if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER &&
config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER) {
// cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater!
if (Router::cancelSending(p->from, p->id))
txRelayCanceled++;
}
/* If the original transmitter is doing retransmissions (hopStart equals hopLimit) for a reliable transmission, e.g., when /* If the original transmitter is doing retransmissions (hopStart equals hopLimit) for a reliable transmission, e.g., when
the ACK got lost, we will handle the packet again to make sure it gets an ACK to its packet. */ the ACK got lost, we will handle the packet again to make sure it gets an implicit ACK. */
bool isRepeated = p->hop_start > 0 && p->hop_start == p->hop_limit; bool isRepeated = p->hop_start > 0 && p->hop_start == p->hop_limit;
if (isRepeated) { if (isRepeated) {
LOG_DEBUG("Repeated reliable tx"); LOG_DEBUG("Repeated reliable tx");
if (!perhapsRebroadcast(p) && isToUs(p) && p->want_ack) { // Check if it's still in the Tx queue, if not, we have to relay it again
sendAckNak(meshtastic_Routing_Error_NONE, getFrom(p), p->id, p->channel, 0); if (!findInTxQueue(p->from, p->id))
} perhapsRebroadcast(p);
} else {
perhapsCancelDupe(p);
} }
return true; return true;
@ -47,13 +40,25 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
return Router::shouldFilterReceived(p); return Router::shouldFilterReceived(p);
} }
void FloodingRouter::perhapsCancelDupe(const meshtastic_MeshPacket *p)
{
printPacket("Ignore dupe incoming msg", p);
rxDupe++;
if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER &&
config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER) {
// cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater!
if (Router::cancelSending(p->from, p->id))
txRelayCanceled++;
}
}
bool FloodingRouter::isRebroadcaster() bool FloodingRouter::isRebroadcaster()
{ {
return config.device.role != meshtastic_Config_DeviceConfig_Role_CLIENT_MUTE && return config.device.role != meshtastic_Config_DeviceConfig_Role_CLIENT_MUTE &&
config.device.rebroadcast_mode != meshtastic_Config_DeviceConfig_RebroadcastMode_NONE; config.device.rebroadcast_mode != meshtastic_Config_DeviceConfig_RebroadcastMode_NONE;
} }
bool FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p) void FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p)
{ {
if (!isToUs(p) && (p->hop_limit > 0) && !isFromUs(p)) { if (!isToUs(p) && (p->hop_limit > 0) && !isFromUs(p)) {
if (p->id != 0) { if (p->id != 0) {
@ -73,8 +78,6 @@ bool FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p)
// Note: we are careful to resend using the original senders node id // Note: we are careful to resend using the original senders node id
// We are careful not to call our hooked version of send() - because we don't want to check this again // We are careful not to call our hooked version of send() - because we don't want to check this again
Router::send(tosend); Router::send(tosend);
return true;
} else { } else {
LOG_DEBUG("No rebroadcast: Role = CLIENT_MUTE or Rebroadcast Mode = NONE"); LOG_DEBUG("No rebroadcast: Role = CLIENT_MUTE or Rebroadcast Mode = NONE");
} }
@ -82,8 +85,6 @@ bool FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p)
LOG_DEBUG("Ignore 0 id broadcast"); LOG_DEBUG("Ignore 0 id broadcast");
} }
} }
return false;
} }
void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c)

View File

@ -28,9 +28,8 @@
class FloodingRouter : public Router class FloodingRouter : public Router
{ {
private: private:
/** Check if we should rebroadcast this packet, and do so if needed /* Check if we should rebroadcast this packet, and do so if needed */
* @return true if rebroadcasted */ void perhapsRebroadcast(const meshtastic_MeshPacket *p);
bool perhapsRebroadcast(const meshtastic_MeshPacket *p);
public: public:
/** /**
@ -60,6 +59,9 @@ class FloodingRouter : public Router
*/ */
virtual void sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) override; virtual void sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) override;
/* Call when receiving a duplicate packet to check whether we should cancel a packet in the Tx queue */
void perhapsCancelDupe(const meshtastic_MeshPacket *p);
// Return true if we are a rebroadcaster // Return true if we are a rebroadcaster
bool isRebroadcaster(); bool isRebroadcaster();
}; };

View File

@ -107,6 +107,19 @@ meshtastic_MeshPacket *MeshPacketQueue::remove(NodeNum from, PacketId id)
return NULL; return NULL;
} }
/* Attempt to find a packet from this queue. Return true if it was found. */
bool MeshPacketQueue::find(NodeNum from, PacketId id)
{
for (auto it = queue.begin(); it != queue.end(); it++) {
auto p = (*it);
if (getFrom(p) == from && p->id == id) {
return true;
}
}
return false;
}
/** Attempt to find and remove a packet from this queue. Returns the packet which was removed from the queue */ /** Attempt to find and remove a packet from this queue. Returns the packet which was removed from the queue */
bool MeshPacketQueue::replaceLowerPriorityPacket(meshtastic_MeshPacket *p) bool MeshPacketQueue::replaceLowerPriorityPacket(meshtastic_MeshPacket *p)
{ {

View File

@ -37,4 +37,7 @@ class MeshPacketQueue
/** Attempt to find and remove a packet from this queue. Returns the packet which was removed from the queue */ /** Attempt to find and remove a packet from this queue. Returns the packet which was removed from the queue */
meshtastic_MeshPacket *remove(NodeNum from, PacketId id); meshtastic_MeshPacket *remove(NodeNum from, PacketId id);
/* Attempt to find a packet from this queue. Return true if it was found. */
bool find(NodeNum from, PacketId id);
}; };

View File

@ -30,15 +30,25 @@ ErrorCode NextHopRouter::send(meshtastic_MeshPacket *p)
bool NextHopRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) bool NextHopRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
{ {
if (wasSeenRecently(p)) { // Note: this will return false for a fallback to flooding bool wasFallback = false;
printPacket("Already seen, try stop re-Tx and cancel sending", p); if (wasSeenRecently(p, true, &wasFallback)) { // Note: this will also add a recent packet record
rxDupe++;
stopRetransmission(p->from, p->id); stopRetransmission(p->from, p->id);
if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER &&
config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER) { // If it was a fallback to flooding, try to relay again
// cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater! if (wasFallback) {
if (Router::cancelSending(p->from, p->id)) LOG_INFO("Fallback to flooding from relay_node=0x%x", p->relay_node);
txRelayCanceled++; // Check if it's still in the Tx queue, if not, we have to relay it again
if (!findInTxQueue(p->from, p->id))
perhapsRelay(p);
} else {
bool isRepeated = p->hop_start > 0 && p->hop_start == p->hop_limit;
// If repeated and not in Tx queue anymore, try relaying again, or if we are the destination, send the ACK again
if (isRepeated) {
if (!findInTxQueue(p->from, p->id) && !perhapsRelay(p) && isToUs(p) && p->want_ack)
sendAckNak(meshtastic_Routing_Error_NONE, getFrom(p), p->id, p->channel, 0);
} else {
perhapsCancelDupe(p); // If it's a dupe, cancel relay
}
} }
return true; return true;
} }
@ -75,21 +85,32 @@ void NextHopRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtast
} }
} }
perhapsRelay(p);
// handle the packet as normal
Router::sniffReceived(p, c);
}
/* Check if we should be relaying this packet if so, do so. */
bool NextHopRouter::perhapsRelay(const meshtastic_MeshPacket *p)
{
if (!isToUs(p) && !isFromUs(p) && p->hop_limit > 0) { if (!isToUs(p) && !isFromUs(p) && p->hop_limit > 0) {
if (p->next_hop == NO_NEXT_HOP_PREFERENCE || p->next_hop == ourRelayID) { if (p->next_hop == NO_NEXT_HOP_PREFERENCE || p->next_hop == nodeDB->getLastByteOfNodeNum(getNodeNum())) {
if (isRebroadcaster()) { if (isRebroadcaster()) {
meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it
LOG_INFO("Relaying received message coming from %x", p->relay_node); LOG_INFO("Relaying received message coming from %x", p->relay_node);
tosend->hop_limit--; // bump down the hop count tosend->hop_limit--; // bump down the hop count
NextHopRouter::send(tosend); NextHopRouter::send(tosend);
return true;
} else { } else {
LOG_DEBUG("Not rebroadcasting: Role = CLIENT_MUTE or Rebroadcast Mode = NONE"); LOG_DEBUG("Not rebroadcasting: Role = CLIENT_MUTE or Rebroadcast Mode = NONE");
} }
} }
} }
// handle the packet as normal
Router::sniffReceived(p, c); return false;
} }
/** /**

View File

@ -144,4 +144,8 @@ class NextHopRouter : public FloodingRouter
* @return the node number of the next hop, 0 if no preference (fallback to FloodingRouter) * @return the node number of the next hop, 0 if no preference (fallback to FloodingRouter)
*/ */
uint8_t getNextHop(NodeNum to, uint8_t relay_node); uint8_t getNextHop(NodeNum to, uint8_t relay_node);
/** Check if we should be relaying this packet if so, do so.
* @return true if we did relay */
bool perhapsRelay(const meshtastic_MeshPacket *p);
}; };

View File

@ -16,7 +16,7 @@ PacketHistory::PacketHistory()
/** /**
* Update recentBroadcasts and return true if we have already seen this packet * Update recentBroadcasts and return true if we have already seen this packet
*/ */
bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate) bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate, bool *wasFallback)
{ {
if (p->id == 0) { if (p->id == 0) {
LOG_DEBUG("Ignore message with zero id"); LOG_DEBUG("Ignore message with zero id");
@ -41,21 +41,19 @@ bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpd
seenRecently = false; seenRecently = false;
} }
if (seenRecently) {
// If it was seen with a next-hop not set to us and now it's NO_NEXT_HOP_PREFERENCE, and the relayer relayed already
// before, it's a fallback to flooding. If we didn't already relay and the next-hop neither, consider it unseen because we
// might need to handle it now
uint8_t ourRelayID = nodeDB->getLastByteOfNodeNum(nodeDB->getNodeNum());
if (found->sender != nodeDB->getNodeNum() && found->next_hop != NO_NEXT_HOP_PREFERENCE && found->next_hop != ourRelayID &&
p->next_hop == NO_NEXT_HOP_PREFERENCE && wasRelayer(p->relay_node, found) && !wasRelayer(ourRelayID, found) &&
!wasRelayer(found->next_hop, found)) {
LOG_INFO("Fallback to flooding, consider unseen relay_node=0x%x", p->relay_node);
seenRecently = false;
}
}
if (seenRecently) { if (seenRecently) {
LOG_DEBUG("Found existing packet record for fr=0x%x,to=0x%x,id=0x%x", p->from, p->to, p->id); LOG_DEBUG("Found existing packet record for fr=0x%x,to=0x%x,id=0x%x", p->from, p->to, p->id);
if (wasFallback) {
// If it was seen with a next-hop not set to us and now it's NO_NEXT_HOP_PREFERENCE, and the relayer relayed already
// before, it's a fallback to flooding. If we didn't already relay and the next-hop neither, we might need to handle
// it now.
uint8_t ourRelayID = nodeDB->getLastByteOfNodeNum(nodeDB->getNodeNum());
if (found->sender != nodeDB->getNodeNum() && found->next_hop != NO_NEXT_HOP_PREFERENCE &&
found->next_hop != ourRelayID && p->next_hop == NO_NEXT_HOP_PREFERENCE && wasRelayer(p->relay_node, found) &&
!wasRelayer(ourRelayID, found) && !wasRelayer(found->next_hop, found)) {
*wasFallback = true;
}
}
} }
if (withUpdate) { if (withUpdate) {

View File

@ -45,8 +45,9 @@ class PacketHistory
* Update recentBroadcasts and return true if we have already seen this packet * Update recentBroadcasts and return true if we have already seen this packet
* *
* @param withUpdate if true and not found we add an entry to recentPackets * @param withUpdate if true and not found we add an entry to recentPackets
* @param wasFallback if not nullptr, packet will be checked for fallback to flooding and value will be set to true if so
*/ */
bool wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate = true); bool wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate = true, bool *wasFallback = nullptr);
/* Check if a certain node was a relayer of a packet in the history given an ID and sender /* Check if a certain node was a relayer of a packet in the history given an ID and sender
* @return true if node was indeed a relayer, false if not */ * @return true if node was indeed a relayer, false if not */

View File

@ -155,6 +155,9 @@ class RadioInterface
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
virtual bool cancelSending(NodeNum from, PacketId id) { return false; } virtual bool cancelSending(NodeNum from, PacketId id) { return false; }
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
virtual bool findInTxQueue(NodeNum from, PacketId id) { return false; }
// methods from radiohead // methods from radiohead
/// Initialise the Driver transport hardware and software. /// Initialise the Driver transport hardware and software.

View File

@ -246,6 +246,12 @@ bool RadioLibInterface::cancelSending(NodeNum from, PacketId id)
return result; return result;
} }
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
bool RadioLibInterface::findInTxQueue(NodeNum from, PacketId id)
{
return txQueue.find(from, id);
}
/** radio helper thread callback. /** radio helper thread callback.
We never immediately transmit after any operation (either Rx or Tx). Instead we should wait a random multiple of We never immediately transmit after any operation (either Rx or Tx). Instead we should wait a random multiple of
'slotTimes' (see definition in RadioInterface.h) taken from a contention window (CW) to lower the chance of collision. 'slotTimes' (see definition in RadioInterface.h) taken from a contention window (CW) to lower the chance of collision.

View File

@ -142,6 +142,9 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
virtual bool cancelSending(NodeNum from, PacketId id) override; virtual bool cancelSending(NodeNum from, PacketId id) override;
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
virtual bool findInTxQueue(NodeNum from, PacketId id) override;
private: private:
/** if we have something waiting to send, start a short (random) timer so we can come check for collision before actually /** if we have something waiting to send, start a short (random) timer so we can come check for collision before actually
* doing the transmit */ * doing the transmit */

View File

@ -299,6 +299,12 @@ bool Router::cancelSending(NodeNum from, PacketId id)
return false; return false;
} }
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
bool Router::findInTxQueue(NodeNum from, PacketId id)
{
return iface->findInTxQueue(from, id);
}
/** /**
* Every (non duplicate) packet this node receives will be passed through this method. This allows subclasses to * Every (non duplicate) packet this node receives will be passed through this method. This allows subclasses to
* update routing tables etc... based on what we overhear (even for messages not destined to our node) * update routing tables etc... based on what we overhear (even for messages not destined to our node)

View File

@ -51,6 +51,9 @@ class Router : protected concurrency::OSThread, protected PacketHistory
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
bool cancelSending(NodeNum from, PacketId id); bool cancelSending(NodeNum from, PacketId id);
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
bool findInTxQueue(NodeNum from, PacketId id);
/** Allocate and return a meshpacket which defaults as send to broadcast from the current node. /** Allocate and return a meshpacket which defaults as send to broadcast from the current node.
* The returned packet is guaranteed to have a unique packet ID already assigned * The returned packet is guaranteed to have a unique packet ID already assigned
*/ */

View File

@ -133,6 +133,12 @@ bool SimRadio::cancelSending(NodeNum from, PacketId id)
return result; return result;
} }
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
bool SimRadio::findInTxQueue(NodeNum from, PacketId id)
{
return txQueue.find(from, id);
}
void SimRadio::onNotify(uint32_t notification) void SimRadio::onNotify(uint32_t notification)
{ {
switch (notification) { switch (notification) {

View File

@ -38,6 +38,9 @@ class SimRadio : public RadioInterface, protected concurrency::NotifiedWorkerThr
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
virtual bool cancelSending(NodeNum from, PacketId id) override; virtual bool cancelSending(NodeNum from, PacketId id) override;
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
virtual bool findInTxQueue(NodeNum from, PacketId id) override;
/** /**
* Start waiting to receive a message * Start waiting to receive a message
* *