mirror of
https://github.com/meshtastic/firmware.git
synced 2025-02-26 22:33:24 +00:00
2.6 <- Next hop router (#6005)
* Initial version of NextHopRouter * Set original hop limit in header flags * Short-circuit to FloodingRouter for broadcasts * If packet traveled 1 hop, set `relay_node` as `next_hop` for the original transmitter * Set last byte to 0xFF if it ended at 0x00 As per an idea of @S5NC * Also update next-hop based on received DM for us * temp * Add 1 retransmission for intermediate hops when using NextHopRouter * Add next_hop and relayed_by in PacketHistory for setting next-hop and handle flooding fallback * Update protos, store multiple relayers * Remove next-hop update logic from NeighborInfoModule * Fix retransmissions * Improve ACKs for repeated packets and responses * Stop retransmission even if there's not relay node * Revert perhapsRebroadcast() * Remove relayer if we cancel a transmission * Better checking for fallback to flooding * Fix newlines in traceroute print logs * Stop retransmission for original packet * Use relayID * Also when want_ack is set, we should try to retransmit * Fix cppcheck error * Fix 'router' not in scope error * Fix another cppcheck error * Check for hop_limit and also update next hop when `hop_start == hop_limit` on ACK Also check for broadcast in `getNextHop()` * Formatting and correct NUM_RETRANSMISSIONS * Update protos * Start retransmissions in NextHopRouter if ReliableRouter didn't do it * Handle repeated/fallback to flooding packets properly First check if it's not still in the TxQueue * Guard against clients setting `next_hop`/`relay_node` * Don't cancel relay if we were the assigned next-hop * Replies (e.g. tapback emoji) are also a valid confirmation of receipt --------- Co-authored-by: GUVWAF <thijs@havinga.eu> Co-authored-by: Thomas Göttgens <tgoettgens@gmail.com> Co-authored-by: Tom Fifield <tom@tomfifield.net> Co-authored-by: GUVWAF <78759985+GUVWAF@users.noreply.github.com>
This commit is contained in:
parent
d65d9305d3
commit
66a98fb062
@ -653,9 +653,9 @@ void setup()
|
||||
// but we need to do this after main cpu init (esp32setup), because we need the random seed set
|
||||
nodeDB = new NodeDB;
|
||||
|
||||
// If we're taking on the repeater role, use flood router and turn off 3V3_S rail because peripherals are not needed
|
||||
// If we're taking on the repeater role, use NextHopRouter and turn off 3V3_S rail because peripherals are not needed
|
||||
if (config.device.role == meshtastic_Config_DeviceConfig_Role_REPEATER) {
|
||||
router = new FloodingRouter();
|
||||
router = new NextHopRouter();
|
||||
#ifdef PIN_3V3_EN
|
||||
digitalWrite(PIN_3V3_EN, LOW);
|
||||
#endif
|
||||
|
@ -13,7 +13,8 @@ FloodingRouter::FloodingRouter() {}
|
||||
ErrorCode FloodingRouter::send(meshtastic_MeshPacket *p)
|
||||
{
|
||||
// Add any messages _we_ send to the seen message list (so we will ignore all retransmissions we see)
|
||||
wasSeenRecently(p); // FIXME, move this to a sniffSent method
|
||||
p->relay_node = nodeDB->getLastByteOfNodeNum(getNodeNum()); // First set the relayer to us
|
||||
wasSeenRecently(p); // FIXME, move this to a sniffSent method
|
||||
|
||||
return Router::send(p);
|
||||
}
|
||||
@ -23,26 +24,17 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
|
||||
if (wasSeenRecently(p)) { // Note: this will also add a recent packet record
|
||||
printPacket("Ignore dupe incoming msg", p);
|
||||
rxDupe++;
|
||||
if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER &&
|
||||
config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER &&
|
||||
config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER_LATE) {
|
||||
// cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater!
|
||||
if (Router::cancelSending(p->from, p->id))
|
||||
txRelayCanceled++;
|
||||
}
|
||||
if (config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER_LATE && iface) {
|
||||
iface->clampToLateRebroadcastWindow(getFrom(p), p->id);
|
||||
}
|
||||
|
||||
/* If the original transmitter is doing retransmissions (hopStart equals hopLimit) for a reliable transmission, e.g., when
|
||||
the ACK got lost, we will handle the packet again to make sure it gets an ACK to its packet. */
|
||||
the ACK got lost, we will handle the packet again to make sure it gets an implicit ACK. */
|
||||
bool isRepeated = p->hop_start > 0 && p->hop_start == p->hop_limit;
|
||||
if (isRepeated) {
|
||||
LOG_DEBUG("Repeated reliable tx");
|
||||
if (!perhapsRebroadcast(p) && isToUs(p) && p->want_ack) {
|
||||
// FIXME - channel index should be used, but the packet is still encrypted here
|
||||
sendAckNak(meshtastic_Routing_Error_NONE, getFrom(p), p->id, 0, 0);
|
||||
}
|
||||
// Check if it's still in the Tx queue, if not, we have to relay it again
|
||||
if (!findInTxQueue(p->from, p->id))
|
||||
perhapsRebroadcast(p);
|
||||
} else {
|
||||
perhapsCancelDupe(p);
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -51,13 +43,27 @@ bool FloodingRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
|
||||
return Router::shouldFilterReceived(p);
|
||||
}
|
||||
|
||||
void FloodingRouter::perhapsCancelDupe(const meshtastic_MeshPacket *p)
|
||||
{
|
||||
if (config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER &&
|
||||
config.device.role != meshtastic_Config_DeviceConfig_Role_REPEATER &&
|
||||
config.device.role != meshtastic_Config_DeviceConfig_Role_ROUTER_LATE) {
|
||||
// cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater!
|
||||
if (Router::cancelSending(p->from, p->id))
|
||||
txRelayCanceled++;
|
||||
}
|
||||
if (config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER_LATE && iface) {
|
||||
iface->clampToLateRebroadcastWindow(getFrom(p), p->id);
|
||||
}
|
||||
}
|
||||
|
||||
bool FloodingRouter::isRebroadcaster()
|
||||
{
|
||||
return config.device.role != meshtastic_Config_DeviceConfig_Role_CLIENT_MUTE &&
|
||||
config.device.rebroadcast_mode != meshtastic_Config_DeviceConfig_RebroadcastMode_NONE;
|
||||
}
|
||||
|
||||
bool FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p)
|
||||
void FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p)
|
||||
{
|
||||
if (!isToUs(p) && (p->hop_limit > 0) && !isFromUs(p)) {
|
||||
if (p->id != 0) {
|
||||
@ -72,13 +78,12 @@ bool FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p)
|
||||
tosend->hop_limit = 2;
|
||||
}
|
||||
#endif
|
||||
tosend->next_hop = NO_NEXT_HOP_PREFERENCE; // this should already be the case, but just in case
|
||||
|
||||
LOG_INFO("Rebroadcast received floodmsg");
|
||||
// Note: we are careful to resend using the original senders node id
|
||||
// We are careful not to call our hooked version of send() - because we don't want to check this again
|
||||
Router::send(tosend);
|
||||
|
||||
return true;
|
||||
} else {
|
||||
LOG_DEBUG("No rebroadcast: Role = CLIENT_MUTE or Rebroadcast Mode = NONE");
|
||||
}
|
||||
@ -86,13 +91,12 @@ bool FloodingRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p)
|
||||
LOG_DEBUG("Ignore 0 id broadcast");
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void FloodingRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c)
|
||||
{
|
||||
bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) && (p->decoded.request_id != 0);
|
||||
bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) &&
|
||||
(p->decoded.request_id != 0 || p->decoded.reply_id != 0);
|
||||
if (isAckorReply && !isToUs(p) && !isBroadcast(p->to)) {
|
||||
// do not flood direct message that is ACKed or replied to
|
||||
LOG_DEBUG("Rxd an ACK/reply not for me, cancel rebroadcast");
|
||||
|
@ -1,6 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include "PacketHistory.h"
|
||||
#include "Router.h"
|
||||
|
||||
/**
|
||||
@ -26,14 +25,11 @@
|
||||
Any entries in recentBroadcasts that are older than X seconds (longer than the
|
||||
max time a flood can take) will be discarded.
|
||||
*/
|
||||
class FloodingRouter : public Router, protected PacketHistory
|
||||
class FloodingRouter : public Router
|
||||
{
|
||||
private:
|
||||
bool isRebroadcaster();
|
||||
|
||||
/** Check if we should rebroadcast this packet, and do so if needed
|
||||
* @return true if rebroadcasted */
|
||||
bool perhapsRebroadcast(const meshtastic_MeshPacket *p);
|
||||
/* Check if we should rebroadcast this packet, and do so if needed */
|
||||
void perhapsRebroadcast(const meshtastic_MeshPacket *p);
|
||||
|
||||
public:
|
||||
/**
|
||||
@ -62,4 +58,10 @@ class FloodingRouter : public Router, protected PacketHistory
|
||||
* Look for broadcasts we need to rebroadcast
|
||||
*/
|
||||
virtual void sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) override;
|
||||
|
||||
/* Call when receiving a duplicate packet to check whether we should cancel a packet in the Tx queue */
|
||||
void perhapsCancelDupe(const meshtastic_MeshPacket *p);
|
||||
|
||||
// Return true if we are a rebroadcaster
|
||||
bool isRebroadcaster();
|
||||
};
|
@ -117,6 +117,19 @@ meshtastic_MeshPacket *MeshPacketQueue::remove(NodeNum from, PacketId id, bool t
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Attempt to find a packet from this queue. Return true if it was found. */
|
||||
bool MeshPacketQueue::find(NodeNum from, PacketId id)
|
||||
{
|
||||
for (auto it = queue.begin(); it != queue.end(); it++) {
|
||||
auto p = (*it);
|
||||
if (getFrom(p) == from && p->id == id) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to find a lower-priority packet in the queue and replace it with the provided one.
|
||||
* @return True if the replacement succeeded, false otherwise
|
||||
|
@ -37,4 +37,7 @@ class MeshPacketQueue
|
||||
|
||||
/** Attempt to find and remove a packet from this queue. Returns the packet which was removed from the queue */
|
||||
meshtastic_MeshPacket *remove(NodeNum from, PacketId id, bool tx_normal = true, bool tx_late = true);
|
||||
|
||||
/* Attempt to find a packet from this queue. Return true if it was found. */
|
||||
bool find(NodeNum from, PacketId id);
|
||||
};
|
@ -173,7 +173,9 @@ void MeshService::handleToRadio(meshtastic_MeshPacket &p)
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
p.from = 0; // We don't let phones assign nodenums to their sent messages
|
||||
p.from = 0; // We don't let clients assign nodenums to their sent messages
|
||||
p.next_hop = NO_NEXT_HOP_PREFERENCE; // We don't let clients assign next_hop to their sent messages
|
||||
p.relay_node = NO_RELAY_NODE; // We don't let clients assign relay_node to their sent messages
|
||||
|
||||
if (p.id == 0)
|
||||
p.id = generatePacketId(); // If the phone didn't supply one, then pick one
|
||||
|
@ -40,6 +40,11 @@ enum RxSource {
|
||||
/// We normally just use max 3 hops for sending reliable messages
|
||||
#define HOP_RELIABLE 3
|
||||
|
||||
// For old firmware or when falling back to flooding, there is no next-hop preference
|
||||
#define NO_NEXT_HOP_PREFERENCE 0
|
||||
// For old firmware there is no relay node set
|
||||
#define NO_RELAY_NODE 0
|
||||
|
||||
typedef int ErrorCode;
|
||||
|
||||
/// Alloc and free packets to our global, ISR safe pool
|
||||
|
272
src/mesh/NextHopRouter.cpp
Normal file
272
src/mesh/NextHopRouter.cpp
Normal file
@ -0,0 +1,272 @@
|
||||
#include "NextHopRouter.h"
|
||||
|
||||
NextHopRouter::NextHopRouter() {}
|
||||
|
||||
PendingPacket::PendingPacket(meshtastic_MeshPacket *p, uint8_t numRetransmissions)
|
||||
{
|
||||
packet = p;
|
||||
this->numRetransmissions = numRetransmissions - 1; // We subtract one, because we assume the user just did the first send
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a packet
|
||||
*/
|
||||
ErrorCode NextHopRouter::send(meshtastic_MeshPacket *p)
|
||||
{
|
||||
// Add any messages _we_ send to the seen message list (so we will ignore all retransmissions we see)
|
||||
p->relay_node = nodeDB->getLastByteOfNodeNum(getNodeNum()); // First set the relayer to us
|
||||
wasSeenRecently(p); // FIXME, move this to a sniffSent method
|
||||
|
||||
p->next_hop = getNextHop(p->to, p->relay_node); // set the next hop
|
||||
LOG_DEBUG("Setting next hop for packet with dest %x to %x", p->to, p->next_hop);
|
||||
|
||||
// If it's from us, ReliableRouter already handles retransmissions if want_ack is set. If a next hop is set and hop limit is
|
||||
// not 0 or want_ack is set, start retransmissions
|
||||
if ((!isFromUs(p) || !p->want_ack) && p->next_hop != NO_NEXT_HOP_PREFERENCE && (p->hop_limit > 0 || p->want_ack))
|
||||
startRetransmission(packetPool.allocCopy(*p)); // start retransmission for relayed packet
|
||||
|
||||
return Router::send(p);
|
||||
}
|
||||
|
||||
bool NextHopRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
|
||||
{
|
||||
bool wasFallback = false;
|
||||
bool weWereNextHop = false;
|
||||
if (wasSeenRecently(p, true, &wasFallback, &weWereNextHop)) { // Note: this will also add a recent packet record
|
||||
printPacket("Ignore dupe incoming msg", p);
|
||||
rxDupe++;
|
||||
stopRetransmission(p->from, p->id);
|
||||
|
||||
// If it was a fallback to flooding, try to relay again
|
||||
if (wasFallback) {
|
||||
LOG_INFO("Fallback to flooding from relay_node=0x%x", p->relay_node);
|
||||
// Check if it's still in the Tx queue, if not, we have to relay it again
|
||||
if (!findInTxQueue(p->from, p->id))
|
||||
perhapsRelay(p);
|
||||
} else {
|
||||
bool isRepeated = p->hop_start > 0 && p->hop_start == p->hop_limit;
|
||||
// If repeated and not in Tx queue anymore, try relaying again, or if we are the destination, send the ACK again
|
||||
if (isRepeated) {
|
||||
if (!findInTxQueue(p->from, p->id) && !perhapsRelay(p) && isToUs(p) && p->want_ack)
|
||||
sendAckNak(meshtastic_Routing_Error_NONE, getFrom(p), p->id, p->channel, 0);
|
||||
} else if (!weWereNextHop) {
|
||||
perhapsCancelDupe(p); // If it's a dupe, cancel relay if we were not explicitly asked to relay
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
return Router::shouldFilterReceived(p);
|
||||
}
|
||||
|
||||
void NextHopRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c)
|
||||
{
|
||||
NodeNum ourNodeNum = getNodeNum();
|
||||
uint8_t ourRelayID = nodeDB->getLastByteOfNodeNum(ourNodeNum);
|
||||
bool isAckorReply = (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) &&
|
||||
(p->decoded.request_id != 0 || p->decoded.reply_id != 0);
|
||||
if (isAckorReply) {
|
||||
// Update next-hop for the original transmitter of this successful transmission to the relay node, but ONLY if "from" is
|
||||
// not 0 (means implicit ACK) and original packet was also relayed by this node, or we sent it directly to the destination
|
||||
if (p->from != 0) {
|
||||
meshtastic_NodeInfoLite *origTx = nodeDB->getMeshNode(p->from);
|
||||
if (origTx) {
|
||||
// Either relayer of ACK was also a relayer of the packet, or we were the relayer and the ACK came directly from
|
||||
// the destination
|
||||
if (wasRelayer(p->relay_node, p->decoded.request_id, p->to) ||
|
||||
(wasRelayer(ourRelayID, p->decoded.request_id, p->to) && p->hop_start != 0 && p->hop_start == p->hop_limit)) {
|
||||
if (origTx->next_hop != p->relay_node) { // Not already set
|
||||
LOG_INFO("Update next hop of 0x%x to 0x%x based on ACK/reply", p->from, p->relay_node);
|
||||
origTx->next_hop = p->relay_node;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!isToUs(p)) {
|
||||
Router::cancelSending(p->to, p->decoded.request_id); // cancel rebroadcast for this DM
|
||||
// stop retransmission for the original packet
|
||||
stopRetransmission(p->to, p->decoded.request_id); // for original packet, from = to and id = request_id
|
||||
}
|
||||
}
|
||||
|
||||
perhapsRelay(p);
|
||||
|
||||
// handle the packet as normal
|
||||
Router::sniffReceived(p, c);
|
||||
}
|
||||
|
||||
/* Check if we should be relaying this packet if so, do so. */
|
||||
bool NextHopRouter::perhapsRelay(const meshtastic_MeshPacket *p)
|
||||
{
|
||||
if (!isToUs(p) && !isFromUs(p) && p->hop_limit > 0) {
|
||||
if (p->next_hop == NO_NEXT_HOP_PREFERENCE || p->next_hop == nodeDB->getLastByteOfNodeNum(getNodeNum())) {
|
||||
if (isRebroadcaster()) {
|
||||
meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it
|
||||
LOG_INFO("Relaying received message coming from %x", p->relay_node);
|
||||
|
||||
tosend->hop_limit--; // bump down the hop count
|
||||
NextHopRouter::send(tosend);
|
||||
|
||||
return true;
|
||||
} else {
|
||||
LOG_DEBUG("Not rebroadcasting: Role = CLIENT_MUTE or Rebroadcast Mode = NONE");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next hop for a destination, given the relay node
|
||||
* @return the node number of the next hop, 0 if no preference (fallback to FloodingRouter)
|
||||
*/
|
||||
uint8_t NextHopRouter::getNextHop(NodeNum to, uint8_t relay_node)
|
||||
{
|
||||
// When we're a repeater router->sniffReceived will call NextHopRouter directly without checking for broadcast
|
||||
if (isBroadcast(to))
|
||||
return NO_NEXT_HOP_PREFERENCE;
|
||||
|
||||
meshtastic_NodeInfoLite *node = nodeDB->getMeshNode(to);
|
||||
if (node && node->next_hop) {
|
||||
// We are careful not to return the relay node as the next hop
|
||||
if (node->next_hop != relay_node) {
|
||||
// LOG_DEBUG("Next hop for 0x%x is 0x%x", to, node->next_hop);
|
||||
return node->next_hop;
|
||||
} else
|
||||
LOG_WARN("Next hop for 0x%x is 0x%x, same as relayer; set no pref", to, node->next_hop);
|
||||
}
|
||||
return NO_NEXT_HOP_PREFERENCE;
|
||||
}
|
||||
|
||||
PendingPacket *NextHopRouter::findPendingPacket(GlobalPacketId key)
|
||||
{
|
||||
auto old = pending.find(key); // If we have an old record, someone messed up because id got reused
|
||||
if (old != pending.end()) {
|
||||
return &old->second;
|
||||
} else
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop any retransmissions we are doing of the specified node/packet ID pair
|
||||
*/
|
||||
bool NextHopRouter::stopRetransmission(NodeNum from, PacketId id)
|
||||
{
|
||||
auto key = GlobalPacketId(from, id);
|
||||
return stopRetransmission(key);
|
||||
}
|
||||
|
||||
bool NextHopRouter::stopRetransmission(GlobalPacketId key)
|
||||
{
|
||||
auto old = findPendingPacket(key);
|
||||
if (old) {
|
||||
auto p = old->packet;
|
||||
/* Only when we already transmitted a packet via LoRa, we will cancel the packet in the Tx queue
|
||||
to avoid canceling a transmission if it was ACKed super fast via MQTT */
|
||||
if (old->numRetransmissions < NUM_RELIABLE_RETX - 1) {
|
||||
// remove the 'original' (identified by originator and packet->id) from the txqueue and free it
|
||||
cancelSending(getFrom(p), p->id);
|
||||
// now free the pooled copy for retransmission too
|
||||
packetPool.release(p);
|
||||
}
|
||||
auto numErased = pending.erase(key);
|
||||
assert(numErased == 1);
|
||||
return true;
|
||||
} else
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting.
|
||||
*/
|
||||
PendingPacket *NextHopRouter::startRetransmission(meshtastic_MeshPacket *p, uint8_t numReTx)
|
||||
{
|
||||
auto id = GlobalPacketId(p);
|
||||
auto rec = PendingPacket(p, numReTx);
|
||||
|
||||
stopRetransmission(getFrom(p), p->id);
|
||||
|
||||
setNextTx(&rec);
|
||||
pending[id] = rec;
|
||||
|
||||
return &pending[id];
|
||||
}
|
||||
|
||||
/**
|
||||
* Do any retransmissions that are scheduled (FIXME - for the time being called from loop)
|
||||
*/
|
||||
int32_t NextHopRouter::doRetransmissions()
|
||||
{
|
||||
uint32_t now = millis();
|
||||
int32_t d = INT32_MAX;
|
||||
|
||||
// FIXME, we should use a better datastructure rather than walking through this map.
|
||||
// for(auto el: pending) {
|
||||
for (auto it = pending.begin(), nextIt = it; it != pending.end(); it = nextIt) {
|
||||
++nextIt; // we use this odd pattern because we might be deleting it...
|
||||
auto &p = it->second;
|
||||
|
||||
bool stillValid = true; // assume we'll keep this record around
|
||||
|
||||
// FIXME, handle 51 day rolloever here!!!
|
||||
if (p.nextTxMsec <= now) {
|
||||
if (p.numRetransmissions == 0) {
|
||||
if (isFromUs(p.packet)) {
|
||||
LOG_DEBUG("Reliable send failed, returning a nak for fr=0x%x,to=0x%x,id=0x%x", p.packet->from, p.packet->to,
|
||||
p.packet->id);
|
||||
sendAckNak(meshtastic_Routing_Error_MAX_RETRANSMIT, getFrom(p.packet), p.packet->id, p.packet->channel);
|
||||
}
|
||||
// Note: we don't stop retransmission here, instead the Nak packet gets processed in sniffReceived
|
||||
stopRetransmission(it->first);
|
||||
stillValid = false; // just deleted it
|
||||
} else {
|
||||
LOG_DEBUG("Sending retransmission fr=0x%x,to=0x%x,id=0x%x, tries left=%d", p.packet->from, p.packet->to,
|
||||
p.packet->id, p.numRetransmissions);
|
||||
|
||||
if (!isBroadcast(p.packet->to)) {
|
||||
if (p.numRetransmissions == 1) {
|
||||
// Last retransmission, reset next_hop (fallback to FloodingRouter)
|
||||
p.packet->next_hop = NO_NEXT_HOP_PREFERENCE;
|
||||
// Also reset it in the nodeDB
|
||||
meshtastic_NodeInfoLite *sentTo = nodeDB->getMeshNode(p.packet->to);
|
||||
if (sentTo) {
|
||||
LOG_INFO("Resetting next hop for packet with dest 0x%x\n", p.packet->to);
|
||||
sentTo->next_hop = NO_NEXT_HOP_PREFERENCE;
|
||||
}
|
||||
FloodingRouter::send(packetPool.allocCopy(*p.packet));
|
||||
} else {
|
||||
NextHopRouter::send(packetPool.allocCopy(*p.packet));
|
||||
}
|
||||
} else {
|
||||
// Note: we call the superclass version because we don't want to have our version of send() add a new
|
||||
// retransmission record
|
||||
FloodingRouter::send(packetPool.allocCopy(*p.packet));
|
||||
}
|
||||
|
||||
// Queue again
|
||||
--p.numRetransmissions;
|
||||
setNextTx(&p);
|
||||
}
|
||||
}
|
||||
|
||||
if (stillValid) {
|
||||
// Update our desired sleep delay
|
||||
int32_t t = p.nextTxMsec - now;
|
||||
|
||||
d = min(t, d);
|
||||
}
|
||||
}
|
||||
|
||||
return d;
|
||||
}
|
||||
|
||||
void NextHopRouter::setNextTx(PendingPacket *pending)
|
||||
{
|
||||
assert(iface);
|
||||
auto d = iface->getRetransmissionMsec(pending->packet);
|
||||
pending->nextTxMsec = millis() + d;
|
||||
LOG_DEBUG("Setting next retransmission in %u msecs: ", d);
|
||||
printPacket("", pending->packet);
|
||||
setReceivedMessage(); // Run ASAP, so we can figure out our correct sleep time
|
||||
}
|
151
src/mesh/NextHopRouter.h
Normal file
151
src/mesh/NextHopRouter.h
Normal file
@ -0,0 +1,151 @@
|
||||
#pragma once
|
||||
|
||||
#include "FloodingRouter.h"
|
||||
#include <unordered_map>
|
||||
|
||||
/**
|
||||
* An identifier for a globally unique message - a pair of the sending nodenum and the packet id assigned
|
||||
* to that message
|
||||
*/
|
||||
struct GlobalPacketId {
|
||||
NodeNum node;
|
||||
PacketId id;
|
||||
|
||||
bool operator==(const GlobalPacketId &p) const { return node == p.node && id == p.id; }
|
||||
|
||||
explicit GlobalPacketId(const meshtastic_MeshPacket *p)
|
||||
{
|
||||
node = getFrom(p);
|
||||
id = p->id;
|
||||
}
|
||||
|
||||
GlobalPacketId(NodeNum _from, PacketId _id)
|
||||
{
|
||||
node = _from;
|
||||
id = _id;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* A packet queued for retransmission
|
||||
*/
|
||||
struct PendingPacket {
|
||||
meshtastic_MeshPacket *packet;
|
||||
|
||||
/** The next time we should try to retransmit this packet */
|
||||
uint32_t nextTxMsec = 0;
|
||||
|
||||
/** Starts at NUM_RETRANSMISSIONS -1 and counts down. Once zero it will be removed from the list */
|
||||
uint8_t numRetransmissions = 0;
|
||||
|
||||
PendingPacket() {}
|
||||
explicit PendingPacket(meshtastic_MeshPacket *p, uint8_t numRetransmissions);
|
||||
};
|
||||
|
||||
class GlobalPacketIdHashFunction
|
||||
{
|
||||
public:
|
||||
size_t operator()(const GlobalPacketId &p) const { return (std::hash<NodeNum>()(p.node)) ^ (std::hash<PacketId>()(p.id)); }
|
||||
};
|
||||
|
||||
/*
|
||||
Router for direct messages, which only relays if it is the next hop for a packet. The next hop is set by the current
|
||||
relayer of a packet, which bases this on information from a previous successful delivery to the destination via flooding.
|
||||
Namely, in the PacketHistory, we keep track of (up to 3) relayers of a packet. When the ACK is delivered back to us via a node
|
||||
that also relayed the original packet, we use that node as next hop for the destination from then on. This makes sure that only
|
||||
when there’s a two-way connection, we assign a next hop. Both the ReliableRouter and NextHopRouter will do retransmissions (the
|
||||
NextHopRouter only 1 time). For the final retry, if no one actually relayed the packet, it will reset the next hop in order to
|
||||
fall back to the FloodingRouter again. Note that thus also intermediate hops will do a single retransmission if the intended
|
||||
next-hop didn’t relay, in order to fix changes in the middle of the route.
|
||||
*/
|
||||
class NextHopRouter : public FloodingRouter
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
*/
|
||||
NextHopRouter();
|
||||
|
||||
/**
|
||||
* Send a packet
|
||||
* @return an error code
|
||||
*/
|
||||
virtual ErrorCode send(meshtastic_MeshPacket *p) override;
|
||||
|
||||
/** Do our retransmission handling */
|
||||
virtual int32_t runOnce() override
|
||||
{
|
||||
// Note: We must doRetransmissions FIRST, because it might queue up work for the base class runOnce implementation
|
||||
doRetransmissions();
|
||||
|
||||
int32_t r = FloodingRouter::runOnce();
|
||||
|
||||
// Also after calling runOnce there might be new packets to retransmit
|
||||
auto d = doRetransmissions();
|
||||
return min(d, r);
|
||||
}
|
||||
|
||||
// The number of retransmissions intermediate nodes will do (actually 1 less than this)
|
||||
constexpr static uint8_t NUM_INTERMEDIATE_RETX = 2;
|
||||
// The number of retransmissions the original sender will do
|
||||
constexpr static uint8_t NUM_RELIABLE_RETX = 3;
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Pending retransmissions
|
||||
*/
|
||||
std::unordered_map<GlobalPacketId, PendingPacket, GlobalPacketIdHashFunction> pending;
|
||||
|
||||
/**
|
||||
* Should this incoming filter be dropped?
|
||||
*
|
||||
* Called immediately on reception, before any further processing.
|
||||
* @return true to abandon the packet
|
||||
*/
|
||||
virtual bool shouldFilterReceived(const meshtastic_MeshPacket *p) override;
|
||||
|
||||
/**
|
||||
* Look for packets we need to relay
|
||||
*/
|
||||
virtual void sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) override;
|
||||
|
||||
/**
|
||||
* Try to find the pending packet record for this ID (or NULL if not found)
|
||||
*/
|
||||
PendingPacket *findPendingPacket(NodeNum from, PacketId id) { return findPendingPacket(GlobalPacketId(from, id)); }
|
||||
PendingPacket *findPendingPacket(GlobalPacketId p);
|
||||
|
||||
/**
|
||||
* Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting.
|
||||
*/
|
||||
PendingPacket *startRetransmission(meshtastic_MeshPacket *p, uint8_t numReTx = NUM_INTERMEDIATE_RETX);
|
||||
|
||||
/**
|
||||
* Stop any retransmissions we are doing of the specified node/packet ID pair
|
||||
*
|
||||
* @return true if we found and removed a transmission with this ID
|
||||
*/
|
||||
bool stopRetransmission(NodeNum from, PacketId id);
|
||||
bool stopRetransmission(GlobalPacketId p);
|
||||
|
||||
/**
|
||||
* Do any retransmissions that are scheduled (FIXME - for the time being called from loop)
|
||||
*
|
||||
* @return the number of msecs until our next retransmission or MAXINT if none scheduled
|
||||
*/
|
||||
int32_t doRetransmissions();
|
||||
|
||||
void setNextTx(PendingPacket *pending);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Get the next hop for a destination, given the relay node
|
||||
* @return the node number of the next hop, 0 if no preference (fallback to FloodingRouter)
|
||||
*/
|
||||
uint8_t getNextHop(NodeNum to, uint8_t relay_node);
|
||||
|
||||
/** Check if we should be relaying this packet if so, do so.
|
||||
* @return true if we did relay */
|
||||
bool perhapsRelay(const meshtastic_MeshPacket *p);
|
||||
};
|
@ -116,6 +116,9 @@ class NodeDB
|
||||
/// @return our node number
|
||||
NodeNum getNodeNum() { return myNodeInfo.my_node_num; }
|
||||
|
||||
// @return last byte of a NodeNum, 0xFF if it ended at 0x00
|
||||
uint8_t getLastByteOfNodeNum(NodeNum num) { return (uint8_t)((num & 0xFF) ? (num & 0xFF) : 0xFF); }
|
||||
|
||||
/// if returns false, that means our node should send a DenyNodeNum response. If true, we think the number is okay for use
|
||||
// bool handleWantNodeNum(NodeNum n);
|
||||
|
||||
|
@ -16,7 +16,7 @@ PacketHistory::PacketHistory()
|
||||
/**
|
||||
* Update recentBroadcasts and return true if we have already seen this packet
|
||||
*/
|
||||
bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate)
|
||||
bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate, bool *wasFallback, bool *weWereNextHop)
|
||||
{
|
||||
if (p->id == 0) {
|
||||
LOG_DEBUG("Ignore message with zero id");
|
||||
@ -27,6 +27,9 @@ bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpd
|
||||
r.id = p->id;
|
||||
r.sender = getFrom(p);
|
||||
r.rxTimeMsec = millis();
|
||||
r.next_hop = p->next_hop;
|
||||
r.relayed_by[0] = p->relay_node;
|
||||
// LOG_INFO("Add relayed_by 0x%x for id=0x%x", p->relay_node, r.id);
|
||||
|
||||
auto found = recentPackets.find(r);
|
||||
bool seenRecently = (found != recentPackets.end()); // found not equal to .end() means packet was seen recently
|
||||
@ -40,14 +43,36 @@ bool PacketHistory::wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpd
|
||||
|
||||
if (seenRecently) {
|
||||
LOG_DEBUG("Found existing packet record for fr=0x%x,to=0x%x,id=0x%x", p->from, p->to, p->id);
|
||||
uint8_t ourRelayID = nodeDB->getLastByteOfNodeNum(nodeDB->getNodeNum());
|
||||
if (wasFallback) {
|
||||
// If it was seen with a next-hop not set to us and now it's NO_NEXT_HOP_PREFERENCE, and the relayer relayed already
|
||||
// before, it's a fallback to flooding. If we didn't already relay and the next-hop neither, we might need to handle
|
||||
// it now.
|
||||
if (found->sender != nodeDB->getNodeNum() && found->next_hop != NO_NEXT_HOP_PREFERENCE &&
|
||||
found->next_hop != ourRelayID && p->next_hop == NO_NEXT_HOP_PREFERENCE && wasRelayer(p->relay_node, found) &&
|
||||
!wasRelayer(ourRelayID, found) && !wasRelayer(found->next_hop, found)) {
|
||||
*wasFallback = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we were the next hop for this packet
|
||||
if (weWereNextHop) {
|
||||
*weWereNextHop = found->next_hop == ourRelayID;
|
||||
}
|
||||
}
|
||||
|
||||
if (withUpdate) {
|
||||
if (found != recentPackets.end()) { // delete existing to updated timestamp (re-insert)
|
||||
recentPackets.erase(found); // as unsorted_set::iterator is const (can't update timestamp - so re-insert..)
|
||||
if (found != recentPackets.end()) { // delete existing to updated timestamp and relayed_by (re-insert)
|
||||
// Add the existing relayed_by to the new record
|
||||
for (uint8_t i = 0; i < NUM_RELAYERS - 1; i++) {
|
||||
if (found->relayed_by[i])
|
||||
r.relayed_by[i + 1] = found->relayed_by[i];
|
||||
}
|
||||
r.next_hop = found->next_hop; // keep the original next_hop (such that we check whether we were originally asked)
|
||||
recentPackets.erase(found); // as unsorted_set::iterator is const (can't update - so re-insert..)
|
||||
}
|
||||
recentPackets.insert(r);
|
||||
printPacket("Add packet record", p);
|
||||
LOG_DEBUG("Add packet record fr=0x%x, id=0x%x", p->from, p->id);
|
||||
}
|
||||
|
||||
// Capacity is reerved, so only purge expired packets if recentPackets fills past 90% capacity
|
||||
@ -75,4 +100,59 @@ void PacketHistory::clearExpiredRecentPackets()
|
||||
}
|
||||
|
||||
LOG_DEBUG("recentPackets size=%ld (after clearing expired packets)", recentPackets.size());
|
||||
}
|
||||
|
||||
/* Check if a certain node was a relayer of a packet in the history given an ID and sender
|
||||
* @return true if node was indeed a relayer, false if not */
|
||||
bool PacketHistory::wasRelayer(const uint8_t relayer, const uint32_t id, const NodeNum sender)
|
||||
{
|
||||
if (relayer == 0)
|
||||
return false;
|
||||
|
||||
PacketRecord r = {.sender = sender, .id = id, .rxTimeMsec = 0, .next_hop = 0};
|
||||
auto found = recentPackets.find(r);
|
||||
|
||||
if (found == recentPackets.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return wasRelayer(relayer, found);
|
||||
}
|
||||
|
||||
/* Check if a certain node was a relayer of a packet in the history given iterator
|
||||
* @return true if node was indeed a relayer, false if not */
|
||||
bool PacketHistory::wasRelayer(const uint8_t relayer, std::unordered_set<PacketRecord, PacketRecordHashFunction>::iterator r)
|
||||
{
|
||||
for (uint8_t i = 0; i < NUM_RELAYERS; i++) {
|
||||
if (r->relayed_by[i] == relayer) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Remove a relayer from the list of relayers of a packet in the history given an ID and sender
|
||||
void PacketHistory::removeRelayer(const uint8_t relayer, const uint32_t id, const NodeNum sender)
|
||||
{
|
||||
PacketRecord r = {.sender = sender, .id = id, .rxTimeMsec = 0, .next_hop = 0};
|
||||
auto found = recentPackets.find(r);
|
||||
|
||||
if (found == recentPackets.end()) {
|
||||
return;
|
||||
}
|
||||
// Make a copy of the found record
|
||||
r.next_hop = found->next_hop;
|
||||
r.rxTimeMsec = found->rxTimeMsec;
|
||||
|
||||
// Only add the relayers that are not the one we want to remove
|
||||
uint8_t j = 0;
|
||||
for (uint8_t i = 0; i < NUM_RELAYERS; i++) {
|
||||
if (found->relayed_by[i] != relayer) {
|
||||
r.relayed_by[j] = found->relayed_by[i];
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
||||
recentPackets.erase(found);
|
||||
recentPackets.insert(r);
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "Router.h"
|
||||
#include "NodeDB.h"
|
||||
#include <unordered_set>
|
||||
|
||||
/// We clear our old flood record 10 minutes after we see the last of it
|
||||
@ -10,13 +10,18 @@
|
||||
#define FLOOD_EXPIRE_TIME (10 * 60 * 1000L)
|
||||
#endif
|
||||
|
||||
#define NUM_RELAYERS \
|
||||
3 // Number of relayer we keep track of. Use 3 to be efficient with memory alignment of PacketRecord to 16 bytes
|
||||
|
||||
/**
|
||||
* A record of a recent message broadcast
|
||||
*/
|
||||
struct PacketRecord {
|
||||
NodeNum sender;
|
||||
PacketId id;
|
||||
uint32_t rxTimeMsec; // Unix time in msecs - the time we received it
|
||||
uint32_t rxTimeMsec; // Unix time in msecs - the time we received it
|
||||
uint8_t next_hop; // The next hop asked for this packet
|
||||
uint8_t relayed_by[NUM_RELAYERS]; // Array of nodes that relayed this packet
|
||||
|
||||
bool operator==(const PacketRecord &p) const { return sender == p.sender && id == p.id; }
|
||||
};
|
||||
@ -44,6 +49,20 @@ class PacketHistory
|
||||
* Update recentBroadcasts and return true if we have already seen this packet
|
||||
*
|
||||
* @param withUpdate if true and not found we add an entry to recentPackets
|
||||
* @param wasFallback if not nullptr, packet will be checked for fallback to flooding and value will be set to true if so
|
||||
* @param weWereNextHop if not nullptr, packet will be checked for us being the next hop and value will be set to true if so
|
||||
*/
|
||||
bool wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate = true);
|
||||
};
|
||||
bool wasSeenRecently(const meshtastic_MeshPacket *p, bool withUpdate = true, bool *wasFallback = nullptr,
|
||||
bool *weWereNextHop = nullptr);
|
||||
|
||||
/* Check if a certain node was a relayer of a packet in the history given an ID and sender
|
||||
* @return true if node was indeed a relayer, false if not */
|
||||
bool wasRelayer(const uint8_t relayer, const uint32_t id, const NodeNum sender);
|
||||
|
||||
/* Check if a certain node was a relayer of a packet in the history given iterator
|
||||
* @return true if node was indeed a relayer, false if not */
|
||||
bool wasRelayer(const uint8_t relayer, std::unordered_set<PacketRecord, PacketRecordHashFunction>::iterator r);
|
||||
|
||||
// Remove a relayer from the list of relayers of a packet in the history given an ID and sender
|
||||
void removeRelayer(const uint8_t relayer, const uint32_t id, const NodeNum sender);
|
||||
};
|
@ -12,6 +12,7 @@
|
||||
#include "PhoneAPI.h"
|
||||
#include "PowerFSM.h"
|
||||
#include "RadioInterface.h"
|
||||
#include "Router.h"
|
||||
#include "SPILock.h"
|
||||
#include "TypeConversions.h"
|
||||
#include "main.h"
|
||||
|
@ -340,6 +340,10 @@ void printPacket(const char *prefix, const meshtastic_MeshPacket *p)
|
||||
out += DEBUG_PORT.mt_sprintf(" via MQTT");
|
||||
if (p->hop_start != 0)
|
||||
out += DEBUG_PORT.mt_sprintf(" hopStart=%d", p->hop_start);
|
||||
if (p->next_hop != 0)
|
||||
out += DEBUG_PORT.mt_sprintf(" nextHop=0x%x", p->next_hop);
|
||||
if (p->relay_node != 0)
|
||||
out += DEBUG_PORT.mt_sprintf(" relay=0x%x", p->relay_node);
|
||||
if (p->priority != 0)
|
||||
out += DEBUG_PORT.mt_sprintf(" priority=%d", p->priority);
|
||||
|
||||
@ -639,8 +643,8 @@ size_t RadioInterface::beginSending(meshtastic_MeshPacket *p)
|
||||
radioBuffer.header.to = p->to;
|
||||
radioBuffer.header.id = p->id;
|
||||
radioBuffer.header.channel = p->channel;
|
||||
radioBuffer.header.next_hop = 0; // *** For future use ***
|
||||
radioBuffer.header.relay_node = 0; // *** For future use ***
|
||||
radioBuffer.header.next_hop = p->next_hop;
|
||||
radioBuffer.header.relay_node = p->relay_node;
|
||||
if (p->hop_limit > HOP_MAX) {
|
||||
LOG_WARN("hop limit %d is too high, setting to %d", p->hop_limit, HOP_RELIABLE);
|
||||
p->hop_limit = HOP_RELIABLE;
|
||||
|
@ -38,10 +38,10 @@ typedef struct {
|
||||
/** The channel hash - used as a hint for the decoder to limit which channels we consider */
|
||||
uint8_t channel;
|
||||
|
||||
// ***For future use*** Last byte of the NodeNum of the next-hop for this packet
|
||||
// Last byte of the NodeNum of the next-hop for this packet
|
||||
uint8_t next_hop;
|
||||
|
||||
// ***For future use*** Last byte of the NodeNum of the node that will relay/relayed this packet
|
||||
// Last byte of the NodeNum of the node that will relay/relayed this packet
|
||||
uint8_t relay_node;
|
||||
} PacketHeader;
|
||||
|
||||
@ -153,6 +153,9 @@ class RadioInterface
|
||||
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
|
||||
virtual bool cancelSending(NodeNum from, PacketId id) { return false; }
|
||||
|
||||
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
|
||||
virtual bool findInTxQueue(NodeNum from, PacketId id) { return false; }
|
||||
|
||||
// methods from radiohead
|
||||
|
||||
/// Initialise the Driver transport hardware and software.
|
||||
|
@ -222,6 +222,12 @@ bool RadioLibInterface::cancelSending(NodeNum from, PacketId id)
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
|
||||
bool RadioLibInterface::findInTxQueue(NodeNum from, PacketId id)
|
||||
{
|
||||
return txQueue.find(from, id);
|
||||
}
|
||||
|
||||
/** radio helper thread callback.
|
||||
We never immediately transmit after any operation (either Rx or Tx). Instead we should wait a random multiple of
|
||||
'slotTimes' (see definition in RadioInterface.h) taken from a contention window (CW) to lower the chance of collision.
|
||||
@ -445,6 +451,9 @@ void RadioLibInterface::handleReceiveInterrupt()
|
||||
mp->hop_start = (radioBuffer.header.flags & PACKET_FLAGS_HOP_START_MASK) >> PACKET_FLAGS_HOP_START_SHIFT;
|
||||
mp->want_ack = !!(radioBuffer.header.flags & PACKET_FLAGS_WANT_ACK_MASK);
|
||||
mp->via_mqtt = !!(radioBuffer.header.flags & PACKET_FLAGS_VIA_MQTT_MASK);
|
||||
// If hop_start is not set, next_hop and relay_node are invalid (firmware <2.3)
|
||||
mp->next_hop = mp->hop_start == 0 ? NO_NEXT_HOP_PREFERENCE : radioBuffer.header.next_hop;
|
||||
mp->relay_node = mp->hop_start == 0 ? NO_RELAY_NODE : radioBuffer.header.relay_node;
|
||||
|
||||
addReceiveMetadata(mp);
|
||||
|
||||
|
@ -135,6 +135,9 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified
|
||||
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
|
||||
virtual bool cancelSending(NodeNum from, PacketId id) override;
|
||||
|
||||
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
|
||||
virtual bool findInTxQueue(NodeNum from, PacketId id) override;
|
||||
|
||||
private:
|
||||
/** if we have something waiting to send, start a short (random) timer so we can come check for collision before actually
|
||||
* doing the transmit */
|
||||
|
@ -23,7 +23,7 @@ ErrorCode ReliableRouter::send(meshtastic_MeshPacket *p)
|
||||
}
|
||||
|
||||
auto copy = packetPool.allocCopy(*p);
|
||||
startRetransmission(copy);
|
||||
startRetransmission(copy, NUM_RELIABLE_RETX);
|
||||
}
|
||||
|
||||
/* If we have pending retransmissions, add the airtime of this packet to it, because during that time we cannot receive an
|
||||
@ -35,7 +35,7 @@ ErrorCode ReliableRouter::send(meshtastic_MeshPacket *p)
|
||||
}
|
||||
}
|
||||
|
||||
return FloodingRouter::send(p);
|
||||
return isBroadcast(p->to) ? FloodingRouter::send(p) : NextHopRouter::send(p);
|
||||
}
|
||||
|
||||
bool ReliableRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
|
||||
@ -73,7 +73,7 @@ bool ReliableRouter::shouldFilterReceived(const meshtastic_MeshPacket *p)
|
||||
i->second.nextTxMsec += iface->getPacketTime(p);
|
||||
}
|
||||
|
||||
return FloodingRouter::shouldFilterReceived(p);
|
||||
return isBroadcast(p->to) ? FloodingRouter::shouldFilterReceived(p) : NextHopRouter::shouldFilterReceived(p);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -138,126 +138,5 @@ void ReliableRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtas
|
||||
}
|
||||
|
||||
// handle the packet as normal
|
||||
FloodingRouter::sniffReceived(p, c);
|
||||
}
|
||||
|
||||
#define NUM_RETRANSMISSIONS 3
|
||||
|
||||
PendingPacket::PendingPacket(meshtastic_MeshPacket *p)
|
||||
{
|
||||
packet = p;
|
||||
numRetransmissions = NUM_RETRANSMISSIONS - 1; // We subtract one, because we assume the user just did the first send
|
||||
}
|
||||
|
||||
PendingPacket *ReliableRouter::findPendingPacket(GlobalPacketId key)
|
||||
{
|
||||
auto old = pending.find(key); // If we have an old record, someone messed up because id got reused
|
||||
if (old != pending.end()) {
|
||||
return &old->second;
|
||||
} else
|
||||
return NULL;
|
||||
}
|
||||
/**
|
||||
* Stop any retransmissions we are doing of the specified node/packet ID pair
|
||||
*/
|
||||
bool ReliableRouter::stopRetransmission(NodeNum from, PacketId id)
|
||||
{
|
||||
auto key = GlobalPacketId(from, id);
|
||||
return stopRetransmission(key);
|
||||
}
|
||||
|
||||
bool ReliableRouter::stopRetransmission(GlobalPacketId key)
|
||||
{
|
||||
auto old = findPendingPacket(key);
|
||||
if (old) {
|
||||
auto p = old->packet;
|
||||
/* Only when we already transmitted a packet via LoRa, we will cancel the packet in the Tx queue
|
||||
to avoid canceling a transmission if it was ACKed super fast via MQTT */
|
||||
if (old->numRetransmissions < NUM_RETRANSMISSIONS - 1) {
|
||||
// remove the 'original' (identified by originator and packet->id) from the txqueue and free it
|
||||
cancelSending(getFrom(p), p->id);
|
||||
}
|
||||
// now free the pooled copy for retransmission too
|
||||
packetPool.release(p);
|
||||
auto numErased = pending.erase(key);
|
||||
assert(numErased == 1);
|
||||
return true;
|
||||
} else
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting.
|
||||
*/
|
||||
PendingPacket *ReliableRouter::startRetransmission(meshtastic_MeshPacket *p)
|
||||
{
|
||||
auto id = GlobalPacketId(p);
|
||||
auto rec = PendingPacket(p);
|
||||
|
||||
stopRetransmission(getFrom(p), p->id);
|
||||
|
||||
setNextTx(&rec);
|
||||
pending[id] = rec;
|
||||
|
||||
return &pending[id];
|
||||
}
|
||||
|
||||
/**
|
||||
* Do any retransmissions that are scheduled (FIXME - for the time being called from loop)
|
||||
*/
|
||||
int32_t ReliableRouter::doRetransmissions()
|
||||
{
|
||||
uint32_t now = millis();
|
||||
int32_t d = INT32_MAX;
|
||||
|
||||
// FIXME, we should use a better datastructure rather than walking through this map.
|
||||
// for(auto el: pending) {
|
||||
for (auto it = pending.begin(), nextIt = it; it != pending.end(); it = nextIt) {
|
||||
++nextIt; // we use this odd pattern because we might be deleting it...
|
||||
auto &p = it->second;
|
||||
|
||||
bool stillValid = true; // assume we'll keep this record around
|
||||
|
||||
// FIXME, handle 51 day rollover here!!!
|
||||
if (p.nextTxMsec <= now) {
|
||||
if (p.numRetransmissions == 0) {
|
||||
LOG_DEBUG("Reliable send failed, return a nak for fr=0x%x,to=0x%x,id=0x%x", p.packet->from, p.packet->to,
|
||||
p.packet->id);
|
||||
sendAckNak(meshtastic_Routing_Error_MAX_RETRANSMIT, getFrom(p.packet), p.packet->id, p.packet->channel);
|
||||
// Note: we don't stop retransmission here, instead the Nak packet gets processed in sniffReceived
|
||||
stopRetransmission(it->first);
|
||||
stillValid = false; // just deleted it
|
||||
} else {
|
||||
LOG_DEBUG("Send reliable retransmission fr=0x%x,to=0x%x,id=0x%x, tries left=%d", p.packet->from, p.packet->to,
|
||||
p.packet->id, p.numRetransmissions);
|
||||
|
||||
// Note: we call the superclass version because we don't want to have our version of send() add a new
|
||||
// retransmission record
|
||||
FloodingRouter::send(packetPool.allocCopy(*p.packet));
|
||||
|
||||
// Queue again
|
||||
--p.numRetransmissions;
|
||||
setNextTx(&p);
|
||||
}
|
||||
}
|
||||
|
||||
if (stillValid) {
|
||||
// Update our desired sleep delay
|
||||
int32_t t = p.nextTxMsec - now;
|
||||
|
||||
d = min(t, d);
|
||||
}
|
||||
}
|
||||
|
||||
return d;
|
||||
}
|
||||
|
||||
void ReliableRouter::setNextTx(PendingPacket *pending)
|
||||
{
|
||||
assert(iface);
|
||||
auto d = iface->getRetransmissionMsec(pending->packet);
|
||||
pending->nextTxMsec = millis() + d;
|
||||
LOG_DEBUG("Set next retransmission in %u msecs: ", d);
|
||||
printPacket("", pending->packet);
|
||||
setReceivedMessage(); // Run ASAP, so we can figure out our correct sleep time
|
||||
isBroadcast(p->to) ? FloodingRouter::sniffReceived(p, c) : NextHopRouter::sniffReceived(p, c);
|
||||
}
|
@ -1,61 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include "FloodingRouter.h"
|
||||
#include <unordered_map>
|
||||
|
||||
/**
|
||||
* An identifier for a globally unique message - a pair of the sending nodenum and the packet id assigned
|
||||
* to that message
|
||||
*/
|
||||
struct GlobalPacketId {
|
||||
NodeNum node;
|
||||
PacketId id;
|
||||
|
||||
bool operator==(const GlobalPacketId &p) const { return node == p.node && id == p.id; }
|
||||
|
||||
explicit GlobalPacketId(const meshtastic_MeshPacket *p)
|
||||
{
|
||||
node = getFrom(p);
|
||||
id = p->id;
|
||||
}
|
||||
|
||||
GlobalPacketId(NodeNum _from, PacketId _id)
|
||||
{
|
||||
node = _from;
|
||||
id = _id;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* A packet queued for retransmission
|
||||
*/
|
||||
struct PendingPacket {
|
||||
meshtastic_MeshPacket *packet;
|
||||
|
||||
/** The next time we should try to retransmit this packet */
|
||||
uint32_t nextTxMsec = 0;
|
||||
|
||||
/** Starts at NUM_RETRANSMISSIONS -1(normally 3) and counts down. Once zero it will be removed from the list */
|
||||
uint8_t numRetransmissions = 0;
|
||||
|
||||
PendingPacket() {}
|
||||
explicit PendingPacket(meshtastic_MeshPacket *p);
|
||||
};
|
||||
|
||||
class GlobalPacketIdHashFunction
|
||||
{
|
||||
public:
|
||||
size_t operator()(const GlobalPacketId &p) const { return (std::hash<NodeNum>()(p.node)) ^ (std::hash<PacketId>()(p.id)); }
|
||||
};
|
||||
#include "NextHopRouter.h"
|
||||
|
||||
/**
|
||||
* This is a mixin that extends Router with the ability to do (one hop only) reliable message sends.
|
||||
*/
|
||||
class ReliableRouter : public FloodingRouter
|
||||
class ReliableRouter : public NextHopRouter
|
||||
{
|
||||
private:
|
||||
std::unordered_map<GlobalPacketId, PendingPacket, GlobalPacketIdHashFunction> pending;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
@ -70,54 +21,14 @@ class ReliableRouter : public FloodingRouter
|
||||
*/
|
||||
virtual ErrorCode send(meshtastic_MeshPacket *p) override;
|
||||
|
||||
/** Do our retransmission handling */
|
||||
virtual int32_t runOnce() override
|
||||
{
|
||||
// Note: We must doRetransmissions FIRST, because it might queue up work for the base class runOnce implementation
|
||||
auto d = doRetransmissions();
|
||||
|
||||
int32_t r = FloodingRouter::runOnce();
|
||||
|
||||
return min(d, r);
|
||||
}
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Look for acks/naks or someone retransmitting us
|
||||
*/
|
||||
virtual void sniffReceived(const meshtastic_MeshPacket *p, const meshtastic_Routing *c) override;
|
||||
|
||||
/**
|
||||
* Try to find the pending packet record for this ID (or NULL if not found)
|
||||
*/
|
||||
PendingPacket *findPendingPacket(NodeNum from, PacketId id) { return findPendingPacket(GlobalPacketId(from, id)); }
|
||||
PendingPacket *findPendingPacket(GlobalPacketId p);
|
||||
|
||||
/**
|
||||
* We hook this method so we can see packets before FloodingRouter says they should be discarded
|
||||
*/
|
||||
virtual bool shouldFilterReceived(const meshtastic_MeshPacket *p) override;
|
||||
|
||||
/**
|
||||
* Add p to the list of packets to retransmit occasionally. We will free it once we stop retransmitting.
|
||||
*/
|
||||
PendingPacket *startRetransmission(meshtastic_MeshPacket *p);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Stop any retransmissions we are doing of the specified node/packet ID pair
|
||||
*
|
||||
* @return true if we found and removed a transmission with this ID
|
||||
*/
|
||||
bool stopRetransmission(NodeNum from, PacketId id);
|
||||
bool stopRetransmission(GlobalPacketId p);
|
||||
|
||||
/**
|
||||
* Do any retransmissions that are scheduled (FIXME - for the time being called from loop)
|
||||
*
|
||||
* @return the number of msecs until our next retransmission or MAXINT if none scheduled
|
||||
*/
|
||||
int32_t doRetransmissions();
|
||||
|
||||
void setNextTx(PendingPacket *pending);
|
||||
};
|
||||
};
|
@ -249,6 +249,7 @@ ErrorCode Router::send(meshtastic_MeshPacket *p)
|
||||
// the lora we need to make sure we have replaced it with our local address
|
||||
p->from = getFrom(p);
|
||||
|
||||
p->relay_node = nodeDB->getLastByteOfNodeNum(getNodeNum()); // set the relayer to us
|
||||
// If we are the original transmitter, set the hop limit with which we start
|
||||
if (isFromUs(p))
|
||||
p->hop_start = p->hop_limit;
|
||||
@ -295,7 +296,18 @@ ErrorCode Router::send(meshtastic_MeshPacket *p)
|
||||
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
|
||||
bool Router::cancelSending(NodeNum from, PacketId id)
|
||||
{
|
||||
return iface ? iface->cancelSending(from, id) : false;
|
||||
if (iface && iface->cancelSending(from, id)) {
|
||||
// We are not a relayer of this packet anymore
|
||||
removeRelayer(nodeDB->getLastByteOfNodeNum(nodeDB->getNodeNum()), id, from);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
|
||||
bool Router::findInTxQueue(NodeNum from, PacketId id)
|
||||
{
|
||||
return iface->findInTxQueue(from, id);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include "MemoryPool.h"
|
||||
#include "MeshTypes.h"
|
||||
#include "Observer.h"
|
||||
#include "PacketHistory.h"
|
||||
#include "PointerQueue.h"
|
||||
#include "RadioInterface.h"
|
||||
#include "concurrency/OSThread.h"
|
||||
@ -11,7 +12,7 @@
|
||||
/**
|
||||
* A mesh aware router that supports multiple interfaces.
|
||||
*/
|
||||
class Router : protected concurrency::OSThread
|
||||
class Router : protected concurrency::OSThread, protected PacketHistory
|
||||
{
|
||||
private:
|
||||
/// Packets which have just arrived from the radio, ready to be processed by this service and possibly
|
||||
@ -50,6 +51,9 @@ class Router : protected concurrency::OSThread
|
||||
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
|
||||
bool cancelSending(NodeNum from, PacketId id);
|
||||
|
||||
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
|
||||
bool findInTxQueue(NodeNum from, PacketId id);
|
||||
|
||||
/** Allocate and return a meshpacket which defaults as send to broadcast from the current node.
|
||||
* The returned packet is guaranteed to have a unique packet ID already assigned
|
||||
*/
|
||||
|
@ -1775,4 +1775,4 @@ extern const pb_msgdesc_t meshtastic_ChunkedPayloadResponse_msg;
|
||||
} /* extern "C" */
|
||||
#endif
|
||||
|
||||
#endif
|
||||
#endif
|
@ -109,7 +109,7 @@ void TraceRouteModule::appendMyIDandSNR(meshtastic_RouteDiscovery *updated, floa
|
||||
void TraceRouteModule::printRoute(meshtastic_RouteDiscovery *r, uint32_t origin, uint32_t dest, bool isTowardsDestination)
|
||||
{
|
||||
#ifdef DEBUG_PORT
|
||||
std::string route = "Route traced:";
|
||||
std::string route = "Route traced:\n";
|
||||
route += vformat("0x%x --> ", origin);
|
||||
for (uint8_t i = 0; i < r->route_count; i++) {
|
||||
if (i < r->snr_towards_count && r->snr_towards[i] != INT8_MIN)
|
||||
@ -129,6 +129,7 @@ void TraceRouteModule::printRoute(meshtastic_RouteDiscovery *r, uint32_t origin,
|
||||
|
||||
// If there's a route back (or we are the destination as then the route is complete), print it
|
||||
if (r->route_back_count > 0 || origin == nodeDB->getNodeNum()) {
|
||||
route += "\n";
|
||||
if (r->snr_towards_count > 0 && origin == nodeDB->getNodeNum())
|
||||
route += vformat("(%.2fdB) 0x%x <-- ", (float)r->snr_back[r->snr_back_count - 1] / 4, origin);
|
||||
else
|
||||
|
@ -139,6 +139,12 @@ bool SimRadio::cancelSending(NodeNum from, PacketId id)
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
|
||||
bool SimRadio::findInTxQueue(NodeNum from, PacketId id)
|
||||
{
|
||||
return txQueue.find(from, id);
|
||||
}
|
||||
|
||||
void SimRadio::onNotify(uint32_t notification)
|
||||
{
|
||||
switch (notification) {
|
||||
|
@ -33,6 +33,9 @@ class SimRadio : public RadioInterface, protected concurrency::NotifiedWorkerThr
|
||||
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
|
||||
virtual bool cancelSending(NodeNum from, PacketId id) override;
|
||||
|
||||
/** Attempt to find a packet in the TxQueue. Returns true if the packet was found. */
|
||||
virtual bool findInTxQueue(NodeNum from, PacketId id) override;
|
||||
|
||||
/**
|
||||
* Start waiting to receive a message
|
||||
*
|
||||
|
Loading…
Reference in New Issue
Block a user