Add 1 retransmission for intermediate hops when using NextHopRouter

This commit is contained in:
GUVWAF 2024-08-10 20:46:27 +02:00
parent 913268b132
commit 2e303a33be
8 changed files with 181 additions and 181 deletions

View File

@ -605,7 +605,7 @@ void setup()
// If we're taking on the repeater role, use flood router and turn off 3V3_S rail because peripherals are not needed
if (config.device.role == meshtastic_Config_DeviceConfig_Role_REPEATER) {
router = new FloodingRouter();
router = new NextHopRouter();
#ifdef PIN_3V3_EN
digitalWrite(PIN_3V3_EN, LOW);
#endif

View File

@ -35,12 +35,6 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c)
{
bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) && (p->decoded.request_id != 0);
if (isAckorReply && p->to != getNodeNum() && p->to != NODENUM_BROADCAST) {
// do not flood direct message that is ACKed or replied to
LOG_DEBUG("Receiving an ACK or reply not for me, but don't need to rebroadcast this direct message anymore.\n");
Router::cancelSending(p->to, p->decoded.request_id); // cancel rebroadcast for this DM
}
if ((p->to != getNodeNum()) && (p->hop_limit > 0) && (getFrom(p) != getNodeNum())) {
if (p->id != 0) {
if (config.device.role != meshtastic_Config_DeviceConfig_Role_CLIENT_MUTE) {
@ -68,4 +62,4 @@ void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtas
}
// handle the packet as normal
Router::sniffReceived(p, c);
}
}

View File

@ -2,6 +2,12 @@
NextHopRouter::NextHopRouter() {}
PendingPacket::PendingPacket(meshtastic_MeshPacket *p, uint8_t numRetransmissions)
{
packet = p;
this->numRetransmissions = numRetransmissions - 1; // We subtract one, because we assume the user just did the first send
}
/**
* Send a packet
*/
@ -13,6 +19,9 @@ ErrorCode NextHopRouter::send(meshtastic_MeshPacket *p)
p->next_hop = getNextHop(p->to, p->relay_node); // set the next hop
LOG_DEBUG("Setting next hop for packet with dest %x to %x\n", p->to, p->next_hop);
if (p->next_hop != NO_NEXT_HOP_PREFERENCE && (p->want_ack || (p->to != p->next_hop && p->hop_limit > 0)))
startRetransmission(packetPool.allocCopy(*p)); // start retransmission for relayed packet
return Router::send(p);
}
@ -23,6 +32,7 @@ bool NextHopRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
LOG_DEBUG("Ignoring incoming msg, because we've already seen it.\n");
} else {
LOG_DEBUG("Ignoring incoming msg, because we've already seen it and cancel any outgoing packets.\n");
stopRetransmission(p->from, p->id);
Router::cancelSending(p->from, p->id);
}
return true;
@ -34,34 +44,34 @@ bool NextHopRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
void NextHopRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c)
{
NodeNum ourNodeNum = getNodeNum();
bool isAck =
((c && c->error_reason == meshtastic_Routing_Error_NONE)); // consider only ROUTING_APP message without error as ACK
if (isAck || (p->to == ourNodeNum)) {
bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) && (p->decoded.request_id != 0);
if (isAckorReply) {
// Update next-hop for the original transmitter of this successful transmission to the relay node, but ONLY if "from" is
// not 0 or ourselves (means implicit ACK or someone is relaying our ACK)
if (p->from != 0 && p->from != ourNodeNum) {
if (p->relay_node) {
// not 0 (means implicit ACK)
if (p->to == ourNodeNum && p->from != 0) {
if (p->hop_start && p->relay_node) { // Only if hopStart is set, relay_node is valid (both introduced in v2.3)
meshtastic_NodeInfoLite *origTx = nodeDB->getMeshNode(p->from);
if (origTx) {
LOG_DEBUG("Update next hop of 0x%x to 0x%x based on received DM or ACK.\n", p->from, p->relay_node);
LOG_DEBUG("Update next hop of 0x%x to 0x%x based on received ACK or reply.\n", p->from, p->relay_node);
origTx->next_hop = p->relay_node;
}
}
}
LOG_DEBUG("Receiving an ACK or reply, don't need to relay this packet anymore.\n");
Router::cancelSending(p->to, p->decoded.request_id); // cancel rebroadcast for this DM
stopRetransmission(p->from, p->decoded.request_id);
}
if (config.device.role != meshtastic_Config_DeviceConfig_Role_CLIENT_MUTE) {
if ((p->to != ourNodeNum) && (getFrom(p) != ourNodeNum)) {
if (p->next_hop == nodeDB->getLastByteOfNodeNum(ourNodeNum)) {
if (p->hop_start == 0 || p->next_hop == NO_NEXT_HOP_PREFERENCE ||
p->next_hop == nodeDB->getLastByteOfNodeNum(ourNodeNum)) {
meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it
LOG_INFO("Relaying received next-hop message coming from %x\n", p->relay_node);
LOG_INFO("Relaying received message coming from %x\n", p->relay_node);
tosend->hop_limit--; // bump down the hop count
NextHopRouter::send(tosend);
} else if (p->next_hop == NO_NEXT_HOP_PREFERENCE) {
// No preference for next hop, use FloodingRouter
LOG_DEBUG("No preference for next hop, using FloodingRouter\n");
FloodingRouter::sniffReceived(p, c);
} // else don't relay
}
} else {
@ -80,14 +90,144 @@ uint8_t NextHopRouter::getNextHop(NodeNum to, uint8_t relay_node)
meshtastic_NodeInfoLite *node = nodeDB->getMeshNode(to);
if (node) {
// We are careful not to return the relay node as the next hop
if (node->next_hop != relay_node) {
if (node->next_hop && node->next_hop != relay_node) {
LOG_DEBUG("Next hop for 0x%x is 0x%x\n", to, node->next_hop);
return node->next_hop;
} else {
LOG_WARN("Next hop for 0x%x is 0x%x, same as relayer; setting as no preference\n", to, node->next_hop);
return NO_NEXT_HOP_PREFERENCE;
if (node->next_hop)
LOG_WARN("Next hop for 0x%x is 0x%x, same as relayer; setting as no preference\n", to, node->next_hop);
}
} else {
return NO_NEXT_HOP_PREFERENCE;
}
return NO_NEXT_HOP_PREFERENCE;
}
PendingPacket *NextHopRouter::findPendingPacket(GlobalPacketId key)
{
auto old = pending.find(key); // If we have an old record, someone messed up because id got reused
if (old != pending.end()) {
return &old->second;
} else
return NULL;
}
/**
* Stop any retransmissions we are doing of the specified node/packet ID pair
*/
bool NextHopRouter::stopRetransmission(NodeNum from, PacketId id)
{
auto key = GlobalPacketId(from, id);
return stopRetransmission(key);
}
bool NextHopRouter::stopRetransmission(GlobalPacketId key)
{
auto old = findPendingPacket(key);
if (old) {
auto p = old->packet;
/* Only when we already transmitted a packet via LoRa, we will cancel the packet in the Tx queue
to avoid canceling a transmission if it was ACKed super fast via MQTT */
if (old->numRetransmissions < NUM_RETRANSMISSIONS - 1) {
// remove the 'original' (identified by originator and packet->id) from the txqueue and free it
cancelSending(getFrom(p), p->id);
// now free the pooled copy for retransmission too
packetPool.release(p);
}
auto numErased = pending.erase(key);
assert(numErased == 1);
return true;
} else
return false;
}
/**
* Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting.
*/
PendingPacket *NextHopRouter::startRetransmission(meshtastic_MeshPacket *p)
{
auto id = GlobalPacketId(p);
auto rec = PendingPacket(p, this->NUM_RETRANSMISSIONS);
stopRetransmission(getFrom(p), p->id);
setNextTx(&rec);
pending[id] = rec;
return &pending[id];
}
/**
* Do any retransmissions that are scheduled (FIXME - for the time being called from loop)
*/
int32_t NextHopRouter::doRetransmissions()
{
uint32_t now = millis();
int32_t d = INT32_MAX;
// FIXME, we should use a better datastructure rather than walking through this map.
// for(auto el: pending) {
for (auto it = pending.begin(), nextIt = it; it != pending.end(); it = nextIt) {
++nextIt; // we use this odd pattern because we might be deleting it...
auto &p = it->second;
bool stillValid = true; // assume we'll keep this record around
// FIXME, handle 51 day rolloever here!!!
if (p.nextTxMsec <= now) {
if (p.numRetransmissions == 0) {
if (p.packet->from == getNodeNum()) {
LOG_DEBUG("Reliable send failed, returning a nak for fr=0x%x,to=0x%x,id=0x%x\n", p.packet->from, p.packet->to,
p.packet->id);
sendAckNak(meshtastic_Routing_Error_MAX_RETRANSMIT, getFrom(p.packet), p.packet->id, p.packet->channel);
}
// Note: we don't stop retransmission here, instead the Nak packet gets processed in sniffReceived
stopRetransmission(it->first);
stillValid = false; // just deleted it
} else {
LOG_DEBUG("Sending retransmission fr=0x%x,to=0x%x,id=0x%x, tries left=%d\n", p.packet->from, p.packet->to,
p.packet->id, p.numRetransmissions);
if (config.lora.next_hop_routing && p.packet->to != NODENUM_BROADCAST) {
if (p.numRetransmissions == 1) {
// Last retransmission, reset next_hop (fallback to FloodingRouter)
p.packet->next_hop = NO_NEXT_HOP_PREFERENCE;
// Also reset it in the nodeDB
meshtastic_NodeInfoLite *sentTo = nodeDB->getMeshNode(p.packet->to);
if (sentTo) {
LOG_DEBUG("Resetting next hop for packet with dest 0x%x\n", p.packet->to);
sentTo->next_hop = NO_NEXT_HOP_PREFERENCE;
}
FloodingRouter::send(packetPool.allocCopy(*p.packet));
} else {
NextHopRouter::send(packetPool.allocCopy(*p.packet));
}
} else {
// Note: we call the superclass version because we don't want to have our version of send() add a new
// retransmission record
FloodingRouter::send(packetPool.allocCopy(*p.packet));
}
// Queue again
--p.numRetransmissions;
setNextTx(&p);
}
}
if (stillValid) {
// Update our desired sleep delay
int32_t t = p.nextTxMsec - now;
d = min(t, d);
}
}
return d;
}
void NextHopRouter::setNextTx(PendingPacket *pending)
{
assert(iface);
auto d = iface->getRetransmissionMsec(pending->packet);
pending->nextTxMsec = millis() + d;
LOG_DEBUG("Setting next retransmission in %u msecs: ", d);
printPacket("", pending->packet);
setReceivedMessage(); // Run ASAP, so we can figure out our correct sleep time
}

View File

@ -1,6 +1,7 @@
#pragma once
#include "FloodingRouter.h"
#include <unordered_map>
/**
* An identifier for a globally unique message - a pair of the sending nodenum and the packet id assigned
@ -34,11 +35,11 @@ struct PendingPacket {
/** The next time we should try to retransmit this packet */
uint32_t nextTxMsec = 0;
/** Starts at NUM_RETRANSMISSIONS -1(normally 3) and counts down. Once zero it will be removed from the list */
/** Starts at NUM_RETRANSMISSIONS -1 and counts down. Once zero it will be removed from the list */
uint8_t numRetransmissions = 0;
PendingPacket() {}
explicit PendingPacket(meshtastic_MeshPacket *p);
explicit PendingPacket(meshtastic_MeshPacket *p, uint8_t numRetransmissions);
};
class GlobalPacketIdHashFunction
@ -82,6 +83,8 @@ class NextHopRouter : public FloodingRouter
return min(d, r);
}
constexpr static uint8_t NUM_RETRANSMISSIONS = 2;
protected:
/**
* Pending retransmissions
@ -114,13 +117,6 @@ class NextHopRouter : public FloodingRouter
*/
PendingPacket *startRetransmission(meshtastic_MeshPacket *p);
private:
/**
* Get the next hop for a destination, given the relay node
* @return the node number of the next hop, 0 if no preference (fallback to FloodingRouter)
*/
uint8_t getNextHop(NodeNum to, uint8_t relay_node);
/**
* Stop any retransmissions we are doing of the specified node/packet ID pair
*
@ -137,4 +133,11 @@ class NextHopRouter : public FloodingRouter
int32_t doRetransmissions();
void setNextTx(PendingPacket *pending);
private:
/**
* Get the next hop for a destination, given the relay node
* @return the node number of the next hop, 0 if no preference (fallback to FloodingRouter)
*/
uint8_t getNextHop(NodeNum to, uint8_t relay_node);
};

View File

@ -1095,11 +1095,6 @@ void NodeDB::updateFrom(const meshtastic_MeshPacket &mp)
// If hopStart was set and there wasn't someone messing with the limit in the middle, add hopsAway
if (mp.hop_start != 0 && mp.hop_limit <= mp.hop_start) {
info->hops_away = mp.hop_start - mp.hop_limit;
if (info->hops_away == 0) {
info->next_hop = getLastByteOfNodeNum(mp.from);
} else if (info->hops_away == 1) {
info->next_hop = getLastByteOfNodeNum(mp.relay_node);
}
}
}
}

View File

@ -306,6 +306,10 @@ void printPacket(const char *prefix, const meshtastic_MeshPacket *p)
out += " encrypted";
}
if (p->next_hop != 0)
out += DEBUG_PORT.mt_sprintf(" nextHop=0x%x", p->next_hop);
if (p->relay_node != 0)
out += DEBUG_PORT.mt_sprintf(" relay=0x%x", p->relay_node);
if (p->rx_time != 0)
out += DEBUG_PORT.mt_sprintf(" rxtime=%u", p->rx_time);
if (p->rx_snr != 0.0)
@ -617,4 +621,4 @@ size_t RadioInterface::beginSending(meshtastic_MeshPacket *p)
sendingPacket = p;
return p->encrypted.size + sizeof(PacketHeader);
}
}

View File

@ -85,7 +85,7 @@ bool ReliableRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
Router::send(tosend);
}
return config.lora.next_hop_routing ? NextHopRouter::shouldFilterReceived(p) : FloodingRouter::shouldFilterReceived(p);
return p->to == NODENUM_BROADCAST ? FloodingRouter::shouldFilterReceived(p) : NextHopRouter::shouldFilterReceived(p);
}
/**
@ -146,142 +146,5 @@ void ReliableRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtas
}
}
// For DMs, we use the NextHopRouter, whereas for broadcasts, we use the FloodingRouter
config.lora.next_hop_routing && (p->to != NODENUM_BROADCAST) ? NextHopRouter::sniffReceived(p, c)
: FloodingRouter::sniffReceived(p, c);
}
#define NUM_RETRANSMISSIONS 3
PendingPacket::PendingPacket(meshtastic_MeshPacket *p)
{
packet = p;
numRetransmissions = NUM_RETRANSMISSIONS - 1; // We subtract one, because we assume the user just did the first send
}
PendingPacket *ReliableRouter::findPendingPacket(GlobalPacketId key)
{
auto old = pending.find(key); // If we have an old record, someone messed up because id got reused
if (old != pending.end()) {
return &old->second;
} else
return NULL;
}
/**
* Stop any retransmissions we are doing of the specified node/packet ID pair
*/
bool ReliableRouter::stopRetransmission(NodeNum from, PacketId id)
{
auto key = GlobalPacketId(from, id);
return stopRetransmission(key);
}
bool ReliableRouter::stopRetransmission(GlobalPacketId key)
{
auto old = findPendingPacket(key);
if (old) {
auto p = old->packet;
/* Only when we already transmitted a packet via LoRa, we will cancel the packet in the Tx queue
to avoid canceling a transmission if it was ACKed super fast via MQTT */
if (old->numRetransmissions < NUM_RETRANSMISSIONS - 1) {
// remove the 'original' (identified by originator and packet->id) from the txqueue and free it
cancelSending(getFrom(p), p->id);
// now free the pooled copy for retransmission too
packetPool.release(p);
}
auto numErased = pending.erase(key);
assert(numErased == 1);
return true;
} else
return false;
}
/**
* Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting.
*/
PendingPacket *ReliableRouter::startRetransmission(meshtastic_MeshPacket *p)
{
auto id = GlobalPacketId(p);
auto rec = PendingPacket(p);
stopRetransmission(getFrom(p), p->id);
setNextTx(&rec);
pending[id] = rec;
return &pending[id];
}
/**
* Do any retransmissions that are scheduled (FIXME - for the time being called from loop)
*/
int32_t ReliableRouter::doRetransmissions()
{
uint32_t now = millis();
int32_t d = INT32_MAX;
// FIXME, we should use a better datastructure rather than walking through this map.
// for(auto el: pending) {
for (auto it = pending.begin(), nextIt = it; it != pending.end(); it = nextIt) {
++nextIt; // we use this odd pattern because we might be deleting it...
auto &p = it->second;
bool stillValid = true; // assume we'll keep this record around
// FIXME, handle 51 day rolloever here!!!
if (p.nextTxMsec <= now) {
if (p.numRetransmissions == 0) {
LOG_DEBUG("Reliable send failed, returning a nak for fr=0x%x,to=0x%x,id=0x%x\n", p.packet->from, p.packet->to,
p.packet->id);
sendAckNak(meshtastic_Routing_Error_MAX_RETRANSMIT, getFrom(p.packet), p.packet->id, p.packet->channel);
// Note: we don't stop retransmission here, instead the Nak packet gets processed in sniffReceived
stopRetransmission(it->first);
stillValid = false; // just deleted it
} else {
LOG_DEBUG("Sending reliable retransmission fr=0x%x,to=0x%x,id=0x%x, tries left=%d\n", p.packet->from,
p.packet->to, p.packet->id, p.numRetransmissions);
if (config.lora.next_hop_routing && p.packet->to != NODENUM_BROADCAST) {
if (p.numRetransmissions == 1) {
// Last retransmission, reset next_hop (fallback to FloodingRouter)
p.packet->next_hop = NO_NEXT_HOP_PREFERENCE;
// Also reset it in the nodeDB
meshtastic_NodeInfoLite *sentTo = nodeDB->getMeshNode(p.packet->to);
if (sentTo) {
LOG_DEBUG("Resetting next hop for packet with dest 0x%x\n", p.packet->to);
sentTo->next_hop = NO_NEXT_HOP_PREFERENCE;
}
}
NextHopRouter::send(packetPool.allocCopy(*p.packet));
} else {
// Note: we call the superclass version because we don't want to have our version of send() add a new
// retransmission record
FloodingRouter::send(packetPool.allocCopy(*p.packet));
}
// Queue again
--p.numRetransmissions;
setNextTx(&p);
}
}
if (stillValid) {
// Update our desired sleep delay
int32_t t = p.nextTxMsec - now;
d = min(t, d);
}
}
return d;
}
void ReliableRouter::setNextTx(PendingPacket *pending)
{
assert(iface);
auto d = iface->getRetransmissionMsec(pending->packet);
pending->nextTxMsec = millis() + d;
LOG_DEBUG("Setting next retransmission in %u msecs: ", d);
printPacket("", pending->packet);
setReceivedMessage(); // Run ASAP, so we can figure out our correct sleep time
p->to == NODENUM_BROADCAST ? FloodingRouter::sniffReceived(p, c) : NextHopRouter::sniffReceived(p, c);
}

View File

@ -1,7 +1,6 @@
#pragma once
#include "NextHopRouter.h"
#include <unordered_map>
/**
* This is a mixin that extends Router with the ability to do (one hop only) reliable message sends.
@ -32,4 +31,6 @@ class ReliableRouter : public NextHopRouter
* We hook this method so we can see packets before FloodingRouter says they should be discarded
*/
virtual bool shouldFilterReceived(const meshtastic_MeshPacket *p) override;
constexpr static uint8_t NUM_RETRANSMISSIONS = 3;
};