diff --git a/src/main.cpp b/src/main.cpp index 6e337bb8c..82acb6e1d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -605,7 +605,7 @@ void setup() // If we're taking on the repeater role, use flood router and turn off 3V3_S rail because peripherals are not needed if (config.device.role == meshtastic_Config_DeviceConfig_Role_REPEATER) { - router = new FloodingRouter(); + router = new NextHopRouter(); #ifdef PIN_3V3_EN digitalWrite(PIN_3V3_EN, LOW); #endif diff --git a/src/mesh/FloodingRouter.cpp b/src/mesh/FloodingRouter.cpp index 0a50e9556..bac84203f 100644 --- a/src/mesh/FloodingRouter.cpp +++ b/src/mesh/FloodingRouter.cpp @@ -35,12 +35,6 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) { - bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) && (p->decoded.request_id != 0); - if (isAckorReply && p->to != getNodeNum() && p->to != NODENUM_BROADCAST) { - // do not flood direct message that is ACKed or replied to - LOG_DEBUG("Receiving an ACK or reply not for me, but don't need to rebroadcast this direct message anymore.\n"); - Router::cancelSending(p->to, p->decoded.request_id); // cancel rebroadcast for this DM - } if ((p->to != getNodeNum()) && (p->hop_limit > 0) && (getFrom(p) != getNodeNum())) { if (p->id != 0) { if (config.device.role != meshtastic_Config_DeviceConfig_Role_CLIENT_MUTE) { @@ -68,4 +62,4 @@ void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtas } // handle the packet as normal Router::sniffReceived(p, c); -} +} \ No newline at end of file diff --git a/src/mesh/NextHopRouter.cpp b/src/mesh/NextHopRouter.cpp index 8b29217b6..5c7b9bd49 100644 --- a/src/mesh/NextHopRouter.cpp +++ b/src/mesh/NextHopRouter.cpp @@ -2,6 +2,12 @@ NextHopRouter::NextHopRouter() {} +PendingPacket::PendingPacket(meshtastic_MeshPacket *p, uint8_t numRetransmissions) +{ + packet = p; + this->numRetransmissions = numRetransmissions - 1; // We subtract one, because we assume the user just did the first send +} + /** * Send a packet */ @@ -13,6 +19,9 @@ ErrorCode NextHopRouter::send(meshtastic_MeshPacket *p) p->next_hop = getNextHop(p->to, p->relay_node); // set the next hop LOG_DEBUG("Setting next hop for packet with dest %x to %x\n", p->to, p->next_hop); + if (p->next_hop != NO_NEXT_HOP_PREFERENCE && (p->want_ack || (p->to != p->next_hop && p->hop_limit > 0))) + startRetransmission(packetPool.allocCopy(*p)); // start retransmission for relayed packet + return Router::send(p); } @@ -23,6 +32,7 @@ bool NextHopRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) LOG_DEBUG("Ignoring incoming msg, because we've already seen it.\n"); } else { LOG_DEBUG("Ignoring incoming msg, because we've already seen it and cancel any outgoing packets.\n"); + stopRetransmission(p->from, p->id); Router::cancelSending(p->from, p->id); } return true; @@ -34,34 +44,34 @@ bool NextHopRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) void NextHopRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) { NodeNum ourNodeNum = getNodeNum(); - bool isAck = - ((c && c->error_reason == meshtastic_Routing_Error_NONE)); // consider only ROUTING_APP message without error as ACK - if (isAck || (p->to == ourNodeNum)) { + bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) && (p->decoded.request_id != 0); + if (isAckorReply) { // Update next-hop for the original transmitter of this successful transmission to the relay node, but ONLY if "from" is - // not 0 or ourselves (means implicit ACK or someone is relaying our ACK) - if (p->from != 0 && p->from != ourNodeNum) { - if (p->relay_node) { + // not 0 (means implicit ACK) + if (p->to == ourNodeNum && p->from != 0) { + if (p->hop_start && p->relay_node) { // Only if hopStart is set, relay_node is valid (both introduced in v2.3) meshtastic_NodeInfoLite *origTx = nodeDB->getMeshNode(p->from); if (origTx) { - LOG_DEBUG("Update next hop of 0x%x to 0x%x based on received DM or ACK.\n", p->from, p->relay_node); + LOG_DEBUG("Update next hop of 0x%x to 0x%x based on received ACK or reply.\n", p->from, p->relay_node); origTx->next_hop = p->relay_node; } } } + + LOG_DEBUG("Receiving an ACK or reply, don't need to relay this packet anymore.\n"); + Router::cancelSending(p->to, p->decoded.request_id); // cancel rebroadcast for this DM + stopRetransmission(p->from, p->decoded.request_id); } if (config.device.role != meshtastic_Config_DeviceConfig_Role_CLIENT_MUTE) { if ((p->to != ourNodeNum) && (getFrom(p) != ourNodeNum)) { - if (p->next_hop == nodeDB->getLastByteOfNodeNum(ourNodeNum)) { + if (p->hop_start == 0 || p->next_hop == NO_NEXT_HOP_PREFERENCE || + p->next_hop == nodeDB->getLastByteOfNodeNum(ourNodeNum)) { meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it - LOG_INFO("Relaying received next-hop message coming from %x\n", p->relay_node); + LOG_INFO("Relaying received message coming from %x\n", p->relay_node); tosend->hop_limit--; // bump down the hop count NextHopRouter::send(tosend); - } else if (p->next_hop == NO_NEXT_HOP_PREFERENCE) { - // No preference for next hop, use FloodingRouter - LOG_DEBUG("No preference for next hop, using FloodingRouter\n"); - FloodingRouter::sniffReceived(p, c); } // else don't relay } } else { @@ -80,14 +90,144 @@ uint8_t NextHopRouter::getNextHop(NodeNum to, uint8_t relay_node) meshtastic_NodeInfoLite *node = nodeDB->getMeshNode(to); if (node) { // We are careful not to return the relay node as the next hop - if (node->next_hop != relay_node) { + if (node->next_hop && node->next_hop != relay_node) { LOG_DEBUG("Next hop for 0x%x is 0x%x\n", to, node->next_hop); return node->next_hop; } else { - LOG_WARN("Next hop for 0x%x is 0x%x, same as relayer; setting as no preference\n", to, node->next_hop); - return NO_NEXT_HOP_PREFERENCE; + if (node->next_hop) + LOG_WARN("Next hop for 0x%x is 0x%x, same as relayer; setting as no preference\n", to, node->next_hop); } - } else { - return NO_NEXT_HOP_PREFERENCE; } + return NO_NEXT_HOP_PREFERENCE; +} + +PendingPacket *NextHopRouter::findPendingPacket(GlobalPacketId key) +{ + auto old = pending.find(key); // If we have an old record, someone messed up because id got reused + if (old != pending.end()) { + return &old->second; + } else + return NULL; +} +/** + * Stop any retransmissions we are doing of the specified node/packet ID pair + */ +bool NextHopRouter::stopRetransmission(NodeNum from, PacketId id) +{ + auto key = GlobalPacketId(from, id); + return stopRetransmission(key); +} + +bool NextHopRouter::stopRetransmission(GlobalPacketId key) +{ + auto old = findPendingPacket(key); + if (old) { + auto p = old->packet; + /* Only when we already transmitted a packet via LoRa, we will cancel the packet in the Tx queue + to avoid canceling a transmission if it was ACKed super fast via MQTT */ + if (old->numRetransmissions < NUM_RETRANSMISSIONS - 1) { + // remove the 'original' (identified by originator and packet->id) from the txqueue and free it + cancelSending(getFrom(p), p->id); + // now free the pooled copy for retransmission too + packetPool.release(p); + } + auto numErased = pending.erase(key); + assert(numErased == 1); + return true; + } else + return false; +} + +/** + * Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting. + */ +PendingPacket *NextHopRouter::startRetransmission(meshtastic_MeshPacket *p) +{ + auto id = GlobalPacketId(p); + auto rec = PendingPacket(p, this->NUM_RETRANSMISSIONS); + + stopRetransmission(getFrom(p), p->id); + + setNextTx(&rec); + pending[id] = rec; + + return &pending[id]; +} + +/** + * Do any retransmissions that are scheduled (FIXME - for the time being called from loop) + */ +int32_t NextHopRouter::doRetransmissions() +{ + uint32_t now = millis(); + int32_t d = INT32_MAX; + + // FIXME, we should use a better datastructure rather than walking through this map. + // for(auto el: pending) { + for (auto it = pending.begin(), nextIt = it; it != pending.end(); it = nextIt) { + ++nextIt; // we use this odd pattern because we might be deleting it... + auto &p = it->second; + + bool stillValid = true; // assume we'll keep this record around + + // FIXME, handle 51 day rolloever here!!! + if (p.nextTxMsec <= now) { + if (p.numRetransmissions == 0) { + if (p.packet->from == getNodeNum()) { + LOG_DEBUG("Reliable send failed, returning a nak for fr=0x%x,to=0x%x,id=0x%x\n", p.packet->from, p.packet->to, + p.packet->id); + sendAckNak(meshtastic_Routing_Error_MAX_RETRANSMIT, getFrom(p.packet), p.packet->id, p.packet->channel); + } + // Note: we don't stop retransmission here, instead the Nak packet gets processed in sniffReceived + stopRetransmission(it->first); + stillValid = false; // just deleted it + } else { + LOG_DEBUG("Sending retransmission fr=0x%x,to=0x%x,id=0x%x, tries left=%d\n", p.packet->from, p.packet->to, + p.packet->id, p.numRetransmissions); + + if (config.lora.next_hop_routing && p.packet->to != NODENUM_BROADCAST) { + if (p.numRetransmissions == 1) { + // Last retransmission, reset next_hop (fallback to FloodingRouter) + p.packet->next_hop = NO_NEXT_HOP_PREFERENCE; + // Also reset it in the nodeDB + meshtastic_NodeInfoLite *sentTo = nodeDB->getMeshNode(p.packet->to); + if (sentTo) { + LOG_DEBUG("Resetting next hop for packet with dest 0x%x\n", p.packet->to); + sentTo->next_hop = NO_NEXT_HOP_PREFERENCE; + } + FloodingRouter::send(packetPool.allocCopy(*p.packet)); + } else { + NextHopRouter::send(packetPool.allocCopy(*p.packet)); + } + } else { + // Note: we call the superclass version because we don't want to have our version of send() add a new + // retransmission record + FloodingRouter::send(packetPool.allocCopy(*p.packet)); + } + + // Queue again + --p.numRetransmissions; + setNextTx(&p); + } + } + + if (stillValid) { + // Update our desired sleep delay + int32_t t = p.nextTxMsec - now; + + d = min(t, d); + } + } + + return d; +} + +void NextHopRouter::setNextTx(PendingPacket *pending) +{ + assert(iface); + auto d = iface->getRetransmissionMsec(pending->packet); + pending->nextTxMsec = millis() + d; + LOG_DEBUG("Setting next retransmission in %u msecs: ", d); + printPacket("", pending->packet); + setReceivedMessage(); // Run ASAP, so we can figure out our correct sleep time } \ No newline at end of file diff --git a/src/mesh/NextHopRouter.h b/src/mesh/NextHopRouter.h index 47cfbfe9b..731ed8b8b 100644 --- a/src/mesh/NextHopRouter.h +++ b/src/mesh/NextHopRouter.h @@ -1,6 +1,7 @@ #pragma once #include "FloodingRouter.h" +#include /** * An identifier for a globally unique message - a pair of the sending nodenum and the packet id assigned @@ -34,11 +35,11 @@ struct PendingPacket { /** The next time we should try to retransmit this packet */ uint32_t nextTxMsec = 0; - /** Starts at NUM_RETRANSMISSIONS -1(normally 3) and counts down. Once zero it will be removed from the list */ + /** Starts at NUM_RETRANSMISSIONS -1 and counts down. Once zero it will be removed from the list */ uint8_t numRetransmissions = 0; PendingPacket() {} - explicit PendingPacket(meshtastic_MeshPacket *p); + explicit PendingPacket(meshtastic_MeshPacket *p, uint8_t numRetransmissions); }; class GlobalPacketIdHashFunction @@ -82,6 +83,8 @@ class NextHopRouter : public FloodingRouter return min(d, r); } + constexpr static uint8_t NUM_RETRANSMISSIONS = 2; + protected: /** * Pending retransmissions @@ -114,13 +117,6 @@ class NextHopRouter : public FloodingRouter */ PendingPacket *startRetransmission(meshtastic_MeshPacket *p); - private: - /** - * Get the next hop for a destination, given the relay node - * @return the node number of the next hop, 0 if no preference (fallback to FloodingRouter) - */ - uint8_t getNextHop(NodeNum to, uint8_t relay_node); - /** * Stop any retransmissions we are doing of the specified node/packet ID pair * @@ -137,4 +133,11 @@ class NextHopRouter : public FloodingRouter int32_t doRetransmissions(); void setNextTx(PendingPacket *pending); + + private: + /** + * Get the next hop for a destination, given the relay node + * @return the node number of the next hop, 0 if no preference (fallback to FloodingRouter) + */ + uint8_t getNextHop(NodeNum to, uint8_t relay_node); }; \ No newline at end of file diff --git a/src/mesh/NodeDB.cpp b/src/mesh/NodeDB.cpp index efc2733d6..cb333edf9 100644 --- a/src/mesh/NodeDB.cpp +++ b/src/mesh/NodeDB.cpp @@ -1095,11 +1095,6 @@ void NodeDB::updateFrom(const meshtastic_MeshPacket &mp) // If hopStart was set and there wasn't someone messing with the limit in the middle, add hopsAway if (mp.hop_start != 0 && mp.hop_limit <= mp.hop_start) { info->hops_away = mp.hop_start - mp.hop_limit; - if (info->hops_away == 0) { - info->next_hop = getLastByteOfNodeNum(mp.from); - } else if (info->hops_away == 1) { - info->next_hop = getLastByteOfNodeNum(mp.relay_node); - } } } } diff --git a/src/mesh/RadioInterface.cpp b/src/mesh/RadioInterface.cpp index 5ffd55328..1909d92e6 100644 --- a/src/mesh/RadioInterface.cpp +++ b/src/mesh/RadioInterface.cpp @@ -306,6 +306,10 @@ void printPacket(const char *prefix, const meshtastic_MeshPacket *p) out += " encrypted"; } + if (p->next_hop != 0) + out += DEBUG_PORT.mt_sprintf(" nextHop=0x%x", p->next_hop); + if (p->relay_node != 0) + out += DEBUG_PORT.mt_sprintf(" relay=0x%x", p->relay_node); if (p->rx_time != 0) out += DEBUG_PORT.mt_sprintf(" rxtime=%u", p->rx_time); if (p->rx_snr != 0.0) @@ -617,4 +621,4 @@ size_t RadioInterface::beginSending(meshtastic_MeshPacket *p) sendingPacket = p; return p->encrypted.size + sizeof(PacketHeader); -} +} \ No newline at end of file diff --git a/src/mesh/ReliableRouter.cpp b/src/mesh/ReliableRouter.cpp index d26deaead..a4bc9b431 100644 --- a/src/mesh/ReliableRouter.cpp +++ b/src/mesh/ReliableRouter.cpp @@ -85,7 +85,7 @@ bool ReliableRouter::shouldFilterReceived(const meshtastic_MeshPacket *p) Router::send(tosend); } - return config.lora.next_hop_routing ? NextHopRouter::shouldFilterReceived(p) : FloodingRouter::shouldFilterReceived(p); + return p->to == NODENUM_BROADCAST ? FloodingRouter::shouldFilterReceived(p) : NextHopRouter::shouldFilterReceived(p); } /** @@ -146,142 +146,5 @@ void ReliableRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtas } } - // For DMs, we use the NextHopRouter, whereas for broadcasts, we use the FloodingRouter - config.lora.next_hop_routing && (p->to != NODENUM_BROADCAST) ? NextHopRouter::sniffReceived(p, c) - : FloodingRouter::sniffReceived(p, c); -} - -#define NUM_RETRANSMISSIONS 3 - -PendingPacket::PendingPacket(meshtastic_MeshPacket *p) -{ - packet = p; - numRetransmissions = NUM_RETRANSMISSIONS - 1; // We subtract one, because we assume the user just did the first send -} - -PendingPacket *ReliableRouter::findPendingPacket(GlobalPacketId key) -{ - auto old = pending.find(key); // If we have an old record, someone messed up because id got reused - if (old != pending.end()) { - return &old->second; - } else - return NULL; -} -/** - * Stop any retransmissions we are doing of the specified node/packet ID pair - */ -bool ReliableRouter::stopRetransmission(NodeNum from, PacketId id) -{ - auto key = GlobalPacketId(from, id); - return stopRetransmission(key); -} - -bool ReliableRouter::stopRetransmission(GlobalPacketId key) -{ - auto old = findPendingPacket(key); - if (old) { - auto p = old->packet; - /* Only when we already transmitted a packet via LoRa, we will cancel the packet in the Tx queue - to avoid canceling a transmission if it was ACKed super fast via MQTT */ - if (old->numRetransmissions < NUM_RETRANSMISSIONS - 1) { - // remove the 'original' (identified by originator and packet->id) from the txqueue and free it - cancelSending(getFrom(p), p->id); - // now free the pooled copy for retransmission too - packetPool.release(p); - } - auto numErased = pending.erase(key); - assert(numErased == 1); - return true; - } else - return false; -} - -/** - * Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting. - */ -PendingPacket *ReliableRouter::startRetransmission(meshtastic_MeshPacket *p) -{ - auto id = GlobalPacketId(p); - auto rec = PendingPacket(p); - - stopRetransmission(getFrom(p), p->id); - - setNextTx(&rec); - pending[id] = rec; - - return &pending[id]; -} - -/** - * Do any retransmissions that are scheduled (FIXME - for the time being called from loop) - */ -int32_t ReliableRouter::doRetransmissions() -{ - uint32_t now = millis(); - int32_t d = INT32_MAX; - - // FIXME, we should use a better datastructure rather than walking through this map. - // for(auto el: pending) { - for (auto it = pending.begin(), nextIt = it; it != pending.end(); it = nextIt) { - ++nextIt; // we use this odd pattern because we might be deleting it... - auto &p = it->second; - - bool stillValid = true; // assume we'll keep this record around - - // FIXME, handle 51 day rolloever here!!! - if (p.nextTxMsec <= now) { - if (p.numRetransmissions == 0) { - LOG_DEBUG("Reliable send failed, returning a nak for fr=0x%x,to=0x%x,id=0x%x\n", p.packet->from, p.packet->to, - p.packet->id); - sendAckNak(meshtastic_Routing_Error_MAX_RETRANSMIT, getFrom(p.packet), p.packet->id, p.packet->channel); - // Note: we don't stop retransmission here, instead the Nak packet gets processed in sniffReceived - stopRetransmission(it->first); - stillValid = false; // just deleted it - } else { - LOG_DEBUG("Sending reliable retransmission fr=0x%x,to=0x%x,id=0x%x, tries left=%d\n", p.packet->from, - p.packet->to, p.packet->id, p.numRetransmissions); - - if (config.lora.next_hop_routing && p.packet->to != NODENUM_BROADCAST) { - if (p.numRetransmissions == 1) { - // Last retransmission, reset next_hop (fallback to FloodingRouter) - p.packet->next_hop = NO_NEXT_HOP_PREFERENCE; - // Also reset it in the nodeDB - meshtastic_NodeInfoLite *sentTo = nodeDB->getMeshNode(p.packet->to); - if (sentTo) { - LOG_DEBUG("Resetting next hop for packet with dest 0x%x\n", p.packet->to); - sentTo->next_hop = NO_NEXT_HOP_PREFERENCE; - } - } - NextHopRouter::send(packetPool.allocCopy(*p.packet)); - } else { - // Note: we call the superclass version because we don't want to have our version of send() add a new - // retransmission record - FloodingRouter::send(packetPool.allocCopy(*p.packet)); - } - - // Queue again - --p.numRetransmissions; - setNextTx(&p); - } - } - - if (stillValid) { - // Update our desired sleep delay - int32_t t = p.nextTxMsec - now; - - d = min(t, d); - } - } - - return d; -} - -void ReliableRouter::setNextTx(PendingPacket *pending) -{ - assert(iface); - auto d = iface->getRetransmissionMsec(pending->packet); - pending->nextTxMsec = millis() + d; - LOG_DEBUG("Setting next retransmission in %u msecs: ", d); - printPacket("", pending->packet); - setReceivedMessage(); // Run ASAP, so we can figure out our correct sleep time + p->to == NODENUM_BROADCAST ? FloodingRouter::sniffReceived(p, c) : NextHopRouter::sniffReceived(p, c); } \ No newline at end of file diff --git a/src/mesh/ReliableRouter.h b/src/mesh/ReliableRouter.h index ed278349a..b1ad99cd3 100644 --- a/src/mesh/ReliableRouter.h +++ b/src/mesh/ReliableRouter.h @@ -1,7 +1,6 @@ #pragma once #include "NextHopRouter.h" -#include /** * This is a mixin that extends Router with the ability to do (one hop only) reliable message sends. @@ -32,4 +31,6 @@ class ReliableRouter : public NextHopRouter * We hook this method so we can see packets before FloodingRouter says they should be discarded */ virtual bool shouldFilterReceived(const meshtastic_MeshPacket *p) override; + + constexpr static uint8_t NUM_RETRANSMISSIONS = 3; }; \ No newline at end of file