mirror of
https://github.com/meshtastic/firmware.git
synced 2025-10-28 07:13:25 +00:00
De-duplicate handling upgraded packet and rebroadcasting logic
This commit is contained in:
parent
775595cb37
commit
de6a02756d
@ -31,33 +31,8 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
|
|||||||
wasSeenRecently(p, true, nullptr, nullptr, &wasUpgraded); // Updates history; returns false when an upgrade is detected
|
wasSeenRecently(p, true, nullptr, nullptr, &wasUpgraded); // Updates history; returns false when an upgrade is detected
|
||||||
|
|
||||||
// Handle hop_limit upgrade scenario for rebroadcasters
|
// Handle hop_limit upgrade scenario for rebroadcasters
|
||||||
// isRebroadcaster() is duplicated in perhapsRebroadcast(), but this avoids confusing log messages
|
if (wasUpgraded && perhapsHandleUpgradedPacket(p)) {
|
||||||
if (wasUpgraded && isRebroadcaster() && iface && p->hop_limit > 0) {
|
return true; // we handled it, so stop processing
|
||||||
// wasSeenRecently() reports false in upgrade cases so we handle replacement before the duplicate short-circuit
|
|
||||||
// If we overhear a duplicate copy of the packet with more hops left than the one we are waiting to
|
|
||||||
// rebroadcast, then remove the packet currently sitting in the TX queue and use this one instead.
|
|
||||||
uint8_t dropThreshold = p->hop_limit; // remove queued packets that have fewer hops remaining
|
|
||||||
if (iface->removePendingTXPacket(getFrom(p), p->id, dropThreshold)) {
|
|
||||||
LOG_DEBUG("Processing upgraded packet 0x%08x for rebroadcast with hop limit %d (dropping queued < %d)", p->id,
|
|
||||||
p->hop_limit, dropThreshold);
|
|
||||||
|
|
||||||
if (nodeDB)
|
|
||||||
nodeDB->updateFrom(*p);
|
|
||||||
#if !MESHTASTIC_EXCLUDE_TRACEROUTE
|
|
||||||
if (traceRouteModule && p->which_payload_variant == meshtastic_MeshPacket_decoded_tag &&
|
|
||||||
p->decoded.portnum == meshtastic_PortNum_TRACEROUTE_APP)
|
|
||||||
traceRouteModule->processUpgradedPacket(*p);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
perhapsRebroadcast(p);
|
|
||||||
|
|
||||||
// We already enqueued the improved copy, so make sure the incoming packet stops here.
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// No queue entry was replaced by this upgraded copy, so treat it as a duplicate to avoid
|
|
||||||
// delivering the same packet to applications/phone twice with different hop limits.
|
|
||||||
seenRecently = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (seenRecently) {
|
if (seenRecently) {
|
||||||
@ -82,6 +57,40 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
|
|||||||
return Router::shouldFilterReceived(p);
|
return Router::shouldFilterReceived(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool FloodingRouter::perhapsHandleUpgradedPacket(const meshtastic_MeshPacket *p)
|
||||||
|
{
|
||||||
|
// isRebroadcaster() is duplicated in perhapsRebroadcast(), but this avoids confusing log messages
|
||||||
|
if (isRebroadcaster() && iface && p->hop_limit > 0) {
|
||||||
|
// If we overhear a duplicate copy of the packet with more hops left than the one we are waiting to
|
||||||
|
// rebroadcast, then remove the packet currently sitting in the TX queue and use this one instead.
|
||||||
|
uint8_t dropThreshold = p->hop_limit; // remove queued packets that have fewer hops remaining
|
||||||
|
if (iface->removePendingTXPacket(getFrom(p), p->id, dropThreshold)) {
|
||||||
|
LOG_DEBUG("Processing upgraded packet 0x%08x for rebroadcast with hop limit %d (dropping queued < %d)", p->id,
|
||||||
|
p->hop_limit, dropThreshold);
|
||||||
|
|
||||||
|
reprocessPacket(p);
|
||||||
|
perhapsRebroadcast(p);
|
||||||
|
|
||||||
|
rxDupe++;
|
||||||
|
// We already enqueued the improved copy, so make sure the incoming packet stops here.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FloodingRouter::reprocessPacket(const meshtastic_MeshPacket *p)
|
||||||
|
{
|
||||||
|
if (nodeDB)
|
||||||
|
nodeDB->updateFrom(*p);
|
||||||
|
#if !MESHTASTIC_EXCLUDE_TRACEROUTE
|
||||||
|
if (traceRouteModule && p->which_payload_variant == meshtastic_MeshPacket_decoded_tag &&
|
||||||
|
p->decoded.portnum == meshtastic_PortNum_TRACEROUTE_APP)
|
||||||
|
traceRouteModule->processUpgradedPacket(*p);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
bool FloodingRouter::roleAllowsCancelingDupe(const meshtastic_MeshPacket *p)
|
bool FloodingRouter::roleAllowsCancelingDupe(const meshtastic_MeshPacket *p)
|
||||||
{
|
{
|
||||||
if (config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER ||
|
if (config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER ||
|
||||||
@ -121,41 +130,6 @@ bool FloodingRouter::isRebroadcaster()
|
|||||||
config.device.rebroadcast_mode != meshtastic_Config_DeviceConfig_RebroadcastMode_NONE;
|
config.device.rebroadcast_mode != meshtastic_Config_DeviceConfig_RebroadcastMode_NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
void FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p)
|
|
||||||
{
|
|
||||||
if (!isToUs(p) && (p->hop_limit > 0) && !isFromUs(p)) {
|
|
||||||
if (p->id != 0) {
|
|
||||||
if (isRebroadcaster()) {
|
|
||||||
meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it
|
|
||||||
|
|
||||||
// Use shared logic to determine if hop_limit should be decremented
|
|
||||||
if (shouldDecrementHopLimit(p)) {
|
|
||||||
tosend->hop_limit--; // bump down the hop count
|
|
||||||
} else {
|
|
||||||
LOG_INFO("favorite-ROUTER/CLIENT_BASE-to-ROUTER/CLIENT_BASE flood: preserving hop_limit");
|
|
||||||
}
|
|
||||||
#if USERPREFS_EVENT_MODE
|
|
||||||
if (tosend->hop_limit > 2) {
|
|
||||||
// if we are "correcting" the hop_limit, "correct" the hop_start by the same amount to preserve hops away.
|
|
||||||
tosend->hop_start -= (tosend->hop_limit - 2);
|
|
||||||
tosend->hop_limit = 2;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
tosend->next_hop = NO_NEXT_HOP_PREFERENCE; // this should already be the case, but just in case
|
|
||||||
|
|
||||||
LOG_INFO("Rebroadcast received floodmsg");
|
|
||||||
// Note: we are careful to resend using the original senders node id
|
|
||||||
send(tosend);
|
|
||||||
} else {
|
|
||||||
LOG_DEBUG("No rebroadcast: Role = CLIENT_MUTE or Rebroadcast Mode = NONE");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG_DEBUG("Ignore 0 id broadcast");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c)
|
void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c)
|
||||||
{
|
{
|
||||||
bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) &&
|
bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) &&
|
||||||
|
|||||||
@ -27,10 +27,6 @@
|
|||||||
*/
|
*/
|
||||||
class FloodingRouter : public Router
|
class FloodingRouter : public Router
|
||||||
{
|
{
|
||||||
private:
|
|
||||||
/* Check if we should rebroadcast this packet, and do so if needed */
|
|
||||||
void perhapsRebroadcast(const meshtastic_MeshPacket *p);
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
@ -59,6 +55,17 @@ class FloodingRouter : public Router
|
|||||||
*/
|
*/
|
||||||
virtual void sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) override;
|
virtual void sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) override;
|
||||||
|
|
||||||
|
/* Check if we should rebroadcast this packet, and do so if needed */
|
||||||
|
virtual bool perhapsRebroadcast(const meshtastic_MeshPacket *p) = 0;
|
||||||
|
|
||||||
|
/* Check if we should handle an upgraded packet (with higher hop_limit)
|
||||||
|
* @return true if we handled it (so stop processing)
|
||||||
|
*/
|
||||||
|
bool perhapsHandleUpgradedPacket(const meshtastic_MeshPacket *p);
|
||||||
|
|
||||||
|
/* Call when we receive a packet that needs some reprocessing, but afterwards should be filtered */
|
||||||
|
void reprocessPacket(const meshtastic_MeshPacket *p);
|
||||||
|
|
||||||
// Return false for roles like ROUTER which should always rebroadcast even when we've heard another rebroadcast of
|
// Return false for roles like ROUTER which should always rebroadcast even when we've heard another rebroadcast of
|
||||||
// the same packet
|
// the same packet
|
||||||
bool roleAllowsCancelingDupe(const meshtastic_MeshPacket *p);
|
bool roleAllowsCancelingDupe(const meshtastic_MeshPacket *p);
|
||||||
|
|||||||
@ -43,31 +43,8 @@ bool NextHopRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
|
|||||||
&wasUpgraded); // Updates history; returns false when an upgrade is detected
|
&wasUpgraded); // Updates history; returns false when an upgrade is detected
|
||||||
|
|
||||||
// Handle hop_limit upgrade scenario for rebroadcasters
|
// Handle hop_limit upgrade scenario for rebroadcasters
|
||||||
// isRebroadcaster() is duplicated in perhapsRelay(), but this avoids confusing log messages
|
if (wasUpgraded && perhapsHandleUpgradedPacket(p)) {
|
||||||
if (wasUpgraded && isRebroadcaster() && iface && p->hop_limit > 0) {
|
return true; // we handled it, so stop processing
|
||||||
// Upgrade detection bypasses the duplicate short-circuit so we replace the queued packet before exiting
|
|
||||||
uint8_t dropThreshold = p->hop_limit; // remove queued packets that have fewer hops remaining
|
|
||||||
if (iface->removePendingTXPacket(getFrom(p), p->id, dropThreshold)) {
|
|
||||||
LOG_DEBUG("Processing upgraded packet 0x%08x for relay with hop limit %d (dropping queued < %d)", p->id, p->hop_limit,
|
|
||||||
dropThreshold);
|
|
||||||
|
|
||||||
if (nodeDB)
|
|
||||||
nodeDB->updateFrom(*p);
|
|
||||||
#if !MESHTASTIC_EXCLUDE_TRACEROUTE
|
|
||||||
if (traceRouteModule && p->which_payload_variant == meshtastic_MeshPacket_decoded_tag &&
|
|
||||||
p->decoded.portnum == meshtastic_PortNum_TRACEROUTE_APP)
|
|
||||||
traceRouteModule->processUpgradedPacket(*p);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
perhapsRelay(p);
|
|
||||||
|
|
||||||
// We already enqueued the improved copy, so make sure the incoming packet stops here.
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// No queue entry was replaced by this upgraded copy, so treat it as a duplicate to avoid
|
|
||||||
// delivering the same packet to applications/phone twice with different hop limits.
|
|
||||||
seenRecently = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (seenRecently) {
|
if (seenRecently) {
|
||||||
@ -107,13 +84,14 @@ void NextHopRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtast
|
|||||||
bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) &&
|
bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) &&
|
||||||
(p->decoded.request_id != 0 || p->decoded.reply_id != 0);
|
(p->decoded.request_id != 0 || p->decoded.reply_id != 0);
|
||||||
if (isAckorReply) {
|
if (isAckorReply) {
|
||||||
// Update next-hop for the original transmitter of this successful transmission to the relay node, but ONLY if "from" is
|
// Update next-hop for the original transmitter of this successful transmission to the relay node, but ONLY if "from"
|
||||||
// not 0 (means implicit ACK) and original packet was also relayed by this node, or we sent it directly to the destination
|
// is not 0 (means implicit ACK) and original packet was also relayed by this node, or we sent it directly to the
|
||||||
|
// destination
|
||||||
if (p->from != 0) {
|
if (p->from != 0) {
|
||||||
meshtastic_NodeInfoLite *origTx = nodeDB->getMeshNode(p->from);
|
meshtastic_NodeInfoLite *origTx = nodeDB->getMeshNode(p->from);
|
||||||
if (origTx) {
|
if (origTx) {
|
||||||
// Either relayer of ACK was also a relayer of the packet, or we were the *only* relayer and the ACK came directly
|
// Either relayer of ACK was also a relayer of the packet, or we were the *only* relayer and the ACK came
|
||||||
// from the destination
|
// directly from the destination
|
||||||
bool wasAlreadyRelayer = wasRelayer(p->relay_node, p->decoded.request_id, p->to);
|
bool wasAlreadyRelayer = wasRelayer(p->relay_node, p->decoded.request_id, p->to);
|
||||||
bool weWereSoleRelayer = false;
|
bool weWereSoleRelayer = false;
|
||||||
bool weWereRelayer = wasRelayer(ourRelayID, p->decoded.request_id, p->to, &weWereSoleRelayer);
|
bool weWereRelayer = wasRelayer(ourRelayID, p->decoded.request_id, p->to, &weWereSoleRelayer);
|
||||||
@ -134,34 +112,49 @@ void NextHopRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtast
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
perhapsRelay(p);
|
perhapsRebroadcast(p);
|
||||||
|
|
||||||
// handle the packet as normal
|
// handle the packet as normal
|
||||||
Router::sniffReceived(p, c);
|
Router::sniffReceived(p, c);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Check if we should be relaying this packet if so, do so. */
|
/* Check if we should be rebroadcasting this packet if so, do so. */
|
||||||
bool NextHopRouter::perhapsRelay(const meshtastic_MeshPacket *p)
|
bool NextHopRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p)
|
||||||
{
|
{
|
||||||
if (!isToUs(p) && !isFromUs(p) && p->hop_limit > 0) {
|
if (!isToUs(p) && !isFromUs(p) && p->hop_limit > 0) {
|
||||||
if (p->next_hop == NO_NEXT_HOP_PREFERENCE || p->next_hop == nodeDB->getLastByteOfNodeNum(getNodeNum())) {
|
if (p->id != 0) {
|
||||||
if (isRebroadcaster()) {
|
if (isRebroadcaster()) {
|
||||||
meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it
|
if (p->next_hop == NO_NEXT_HOP_PREFERENCE || p->next_hop == nodeDB->getLastByteOfNodeNum(getNodeNum())) {
|
||||||
LOG_INFO("Relaying received message coming from %x", p->relay_node);
|
meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it
|
||||||
|
LOG_INFO("Rebroadcast received message coming from %x", p->relay_node);
|
||||||
|
|
||||||
// Use shared logic to determine if hop_limit should be decremented
|
// Use shared logic to determine if hop_limit should be decremented
|
||||||
if (shouldDecrementHopLimit(p)) {
|
if (shouldDecrementHopLimit(p)) {
|
||||||
tosend->hop_limit--; // bump down the hop count
|
tosend->hop_limit--; // bump down the hop count
|
||||||
} else {
|
} else {
|
||||||
LOG_INFO("Router/CLIENT_BASE-to-favorite-router/CLIENT_BASE relay: preserving hop_limit");
|
LOG_INFO("favorite-ROUTER/CLIENT_BASE-to-ROUTER/CLIENT_BASE flood: preserving hop_limit");
|
||||||
|
}
|
||||||
|
#if USERPREFS_EVENT_MODE
|
||||||
|
if (tosend->hop_limit > 2) {
|
||||||
|
// if we are "correcting" the hop_limit, "correct" the hop_start by the same amount to preserve hops away.
|
||||||
|
tosend->hop_start -= (tosend->hop_limit - 2);
|
||||||
|
tosend->hop_limit = 2;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (p->next_hop == NO_NEXT_HOP_PREFERENCE) {
|
||||||
|
FloodingRouter::send(tosend);
|
||||||
|
} else {
|
||||||
|
NextHopRouter::send(tosend);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
NextHopRouter::send(tosend);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} else {
|
} else {
|
||||||
LOG_DEBUG("Not rebroadcasting: Role = CLIENT_MUTE or Rebroadcast Mode = NONE");
|
LOG_DEBUG("No rebroadcast: Role = CLIENT_MUTE or Rebroadcast Mode = NONE");
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
LOG_DEBUG("Ignore 0 id broadcast");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,13 +224,13 @@ bool NextHopRouter::stopRetransmission(GlobalPacketId key)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Regardless of whether or not we canceled this packet from the txQueue, remove it from our pending list so it doesn't
|
// Regardless of whether or not we canceled this packet from the txQueue, remove it from our pending list so it
|
||||||
// get scheduled again. (This is the core of stopRetransmission.)
|
// doesn't get scheduled again. (This is the core of stopRetransmission.)
|
||||||
auto numErased = pending.erase(key);
|
auto numErased = pending.erase(key);
|
||||||
assert(numErased == 1);
|
assert(numErased == 1);
|
||||||
|
|
||||||
// When we remove an entry from pending, always be sure to release the copy of the packet that was allocated in the call
|
// When we remove an entry from pending, always be sure to release the copy of the packet that was allocated in the
|
||||||
// to startRetransmission.
|
// call to startRetransmission.
|
||||||
packetPool.release(p);
|
packetPool.release(p);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|||||||
@ -148,7 +148,7 @@ class NextHopRouter : public FloodingRouter
|
|||||||
*/
|
*/
|
||||||
uint8_t getNextHop(NodeNum to, uint8_t relay_node);
|
uint8_t getNextHop(NodeNum to, uint8_t relay_node);
|
||||||
|
|
||||||
/** Check if we should be relaying this packet if so, do so.
|
/** Check if we should be rebroadcasting this packet if so, do so.
|
||||||
* @return true if we did relay */
|
* @return true if we did rebroadcast */
|
||||||
bool perhapsRelay(const meshtastic_MeshPacket *p);
|
bool perhapsRebroadcast(const meshtastic_MeshPacket *p) override;
|
||||||
};
|
};
|
||||||
@ -94,7 +94,6 @@ bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpd
|
|||||||
LOG_DEBUG("Packet History - Hop limit upgrade: packet 0x%08x from hop_limit=%d to hop_limit=%d", p->id, found->hop_limit,
|
LOG_DEBUG("Packet History - Hop limit upgrade: packet 0x%08x from hop_limit=%d to hop_limit=%d", p->id, found->hop_limit,
|
||||||
p->hop_limit);
|
p->hop_limit);
|
||||||
*wasUpgraded = true;
|
*wasUpgraded = true;
|
||||||
seenRecently = false; // Allow router processing but prevent duplicate app delivery
|
|
||||||
} else if (wasUpgraded) {
|
} else if (wasUpgraded) {
|
||||||
*wasUpgraded = false; // Initialize to false if not an upgrade
|
*wasUpgraded = false; // Initialize to false if not an upgrade
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user