for #681, add MeshPacketQueue (a priority queue) to ensure acks get sent quickly

This commit is contained in:
Kevin Hester 2021-02-11 19:00:17 +08:00
parent 917090856f
commit 2ff5046dcd
12 changed files with 158 additions and 17 deletions

2
proto

@ -1 +1 @@
Subproject commit a76ceb1509b2ec3d844af0378e998e7d4737492c Subproject commit 0221e83d689f7930ed3e5c474eff4fbb8697efbb

View File

@ -0,0 +1,79 @@
#include "MeshPacketQueue.h"
/// @return the priority of the specified packet
inline uint32_t getPriority(MeshPacket *p)
{
auto pri = p->priority;
return pri;
}
/// @return "true" if "p1" is ordered before "p2"
bool CompareMeshPacket::operator()(MeshPacket *p1, MeshPacket *p2)
{
assert(p1 && p2);
auto p1p = getPriority(p1), p2p = getPriority(p2);
// If priorities differ, use that
// for equal priorities, order by id (older packets have higher priority - this will briefly be wrong when IDs roll over but
// no big deal)
return (p1p != p2p) ? (p1p < p2p) // prefer bigger priorities
: (p1->id >= p2->id); // prefer smaller packet ids
}
MeshPacketQueue::MeshPacketQueue(size_t _maxLen) : maxLen(_maxLen)
{
}
/** enqueue a packet, return false if full */
bool MeshPacketQueue::enqueue(MeshPacket *p)
{
// We might receive acks from other nodes (and since generated remotely, they won't have priority assigned. Check for that
// and fix it
if (p->priority == MeshPacket_Priority_UNSET)
p->priority = p->decoded.which_ackVariant ? MeshPacket_Priority_ACK : MeshPacket_Priority_DEFAULT;
if (size() >= maxLen)
return false;
else {
push(p);
return true;
}
}
MeshPacket *MeshPacketQueue::dequeue()
{
if (empty())
return NULL;
else {
auto p = top();
pop(); // remove the first item
return p;
}
}
// this is kinda yucky, but I'm not sure if all arduino c++ compilers support closuers. And we only have one
// thread that can run at a time - so safe
static NodeNum findFrom;
static PacketId findId;
static bool isMyPacket(MeshPacket *p)
{
return p->id == findId && p->from == findFrom;
}
/** Attempt to find and remove a packet from this queue. Returns true the packet which was removed from the queue */
MeshPacket *MeshPacketQueue::remove(NodeNum from, PacketId id)
{
findFrom = from;
findId = id;
auto it = std::find_if(this->c.begin(), this->c.end(), isMyPacket);
if (it != this->c.end()) {
auto p = *it;
this->c.erase(it);
std::make_heap(this->c.begin(), this->c.end(), this->comp);
return p;
} else {
return NULL;
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include "MeshTypes.h"
#include <assert.h>
#include <queue>
// this is an strucure which implements the
// operator overloading
struct CompareMeshPacket {
bool operator()(MeshPacket *p1, MeshPacket *p2);
};
/**
* A priority queue of packets.
*
*/
class MeshPacketQueue : public std::priority_queue<MeshPacket *, std::vector<MeshPacket *>, CompareMeshPacket>
{
size_t maxLen;
public:
MeshPacketQueue(size_t _maxLen);
/** enqueue a packet, return false if full */
bool enqueue(MeshPacket *p);
// bool isEmpty();
MeshPacket *dequeue();
/** Attempt to find and remove a packet from this queue. Returns true the packet which was removed from the queue */
MeshPacket *remove(NodeNum from, PacketId id);
};

View File

@ -161,6 +161,9 @@ void printPacket(const char *prefix, const MeshPacket *p)
if (p->rx_snr != 0.0) { if (p->rx_snr != 0.0) {
DEBUG_MSG(" rxSNR=%g", p->rx_snr); DEBUG_MSG(" rxSNR=%g", p->rx_snr);
} }
if(p->priority != 0)
DEBUG_MSG(" priority=%d", p->priority);
DEBUG_MSG(")\n"); DEBUG_MSG(")\n");
} }

View File

@ -100,7 +100,7 @@ ErrorCode RadioLibInterface::send(MeshPacket *p)
uint32_t xmitMsec = getPacketTime(p); uint32_t xmitMsec = getPacketTime(p);
DEBUG_MSG("txGood=%d,rxGood=%d,rxBad=%d\n", txGood, rxGood, rxBad); DEBUG_MSG("txGood=%d,rxGood=%d,rxBad=%d\n", txGood, rxGood, rxBad);
ErrorCode res = txQueue.enqueue(p, 0) ? ERRNO_OK : ERRNO_UNKNOWN; ErrorCode res = txQueue.enqueue(p) ? ERRNO_OK : ERRNO_UNKNOWN;
if (res != ERRNO_OK) { // we weren't able to queue it, so we must drop it to prevent leaks if (res != ERRNO_OK) { // we weren't able to queue it, so we must drop it to prevent leaks
packetPool.release(p); packetPool.release(p);
@ -125,7 +125,7 @@ ErrorCode RadioLibInterface::send(MeshPacket *p)
bool RadioLibInterface::canSleep() bool RadioLibInterface::canSleep()
{ {
bool res = txQueue.isEmpty(); bool res = txQueue.empty();
if (!res) // only print debug messages if we are vetoing sleep if (!res) // only print debug messages if we are vetoing sleep
DEBUG_MSG("radio wait to sleep, txEmpty=%d\n", res); DEBUG_MSG("radio wait to sleep, txEmpty=%d\n", res);
@ -134,8 +134,13 @@ bool RadioLibInterface::canSleep()
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
bool RadioLibInterface::cancelSending(NodeNum from, PacketId id) { bool RadioLibInterface::cancelSending(NodeNum from, PacketId id) {
assert(0); auto p = txQueue.remove(from, id);
return false; if(p)
packetPool.release(p); // free the packet we just removed
bool result = (p != NULL);
DEBUG_MSG("cancelSending id=0x%x, removed=%d", id, result);
return result;
} }
@ -172,12 +177,12 @@ void RadioLibInterface::onNotify(uint32_t notification)
// If we are not currently in receive mode, then restart the timer and try again later (this can happen if the main thread // If we are not currently in receive mode, then restart the timer and try again later (this can happen if the main thread
// has placed the unit into standby) FIXME, how will this work if the chipset is in sleep mode? // has placed the unit into standby) FIXME, how will this work if the chipset is in sleep mode?
if (!txQueue.isEmpty()) { if (!txQueue.empty()) {
if (!canSendImmediately()) { if (!canSendImmediately()) {
startTransmitTimer(); // try again in a little while startTransmitTimer(); // try again in a little while
} else { } else {
// Send any outgoing packets we have ready // Send any outgoing packets we have ready
MeshPacket *txp = txQueue.dequeuePtr(0); MeshPacket *txp = txQueue.dequeue();
assert(txp); assert(txp);
startSend(txp); startSend(txp);
} }
@ -193,7 +198,7 @@ void RadioLibInterface::onNotify(uint32_t notification)
void RadioLibInterface::startTransmitTimer(bool withDelay) void RadioLibInterface::startTransmitTimer(bool withDelay)
{ {
// If we have work to do and the timer wasn't already scheduled, schedule it now // If we have work to do and the timer wasn't already scheduled, schedule it now
if (!txQueue.isEmpty()) { if (!txQueue.empty()) {
uint32_t delay = !withDelay ? 1 : getTxDelayMsec(); uint32_t delay = !withDelay ? 1 : getTxDelayMsec();
// DEBUG_MSG("xmit timer %d\n", delay); // DEBUG_MSG("xmit timer %d\n", delay);
notifyLater(delay, TRANSMIT_DELAY_COMPLETED, false); // This will implicitly enable notifyLater(delay, TRANSMIT_DELAY_COMPLETED, false); // This will implicitly enable

View File

@ -2,6 +2,7 @@
#include "../concurrency/OSThread.h" #include "../concurrency/OSThread.h"
#include "RadioInterface.h" #include "RadioInterface.h"
#include "MeshPacketQueue.h"
#ifdef CubeCell_BoardPlus #ifdef CubeCell_BoardPlus
#define RADIOLIB_SOFTWARE_SERIAL_UNSUPPORTED #define RADIOLIB_SOFTWARE_SERIAL_UNSUPPORTED
@ -74,7 +75,7 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified
*/ */
uint32_t rxBad = 0, rxGood = 0, txGood = 0; uint32_t rxBad = 0, rxGood = 0, txGood = 0;
PointerQueue<MeshPacket> txQueue = PointerQueue<MeshPacket>(MAX_TX_QUEUE); MeshPacketQueue txQueue = MeshPacketQueue(MAX_TX_QUEUE);
protected: protected:
@ -138,7 +139,7 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
virtual bool cancelSending(NodeNum from, PacketId id); virtual bool cancelSending(NodeNum from, PacketId id);
private: private:
/** if we have something waiting to send, start a short random timer so we can come check for collision before actually doing /** if we have something waiting to send, start a short random timer so we can come check for collision before actually doing
* the transmit * the transmit

View File

@ -120,6 +120,7 @@ void Router::sendAckNak(ErrorReason err, NodeNum to, PacketId idFrom)
p->decoded.which_payloadVariant = SubPacket_error_reason_tag; p->decoded.which_payloadVariant = SubPacket_error_reason_tag;
p->decoded.error_reason = err; p->decoded.error_reason = err;
} }
p->priority = MeshPacket_Priority_ACK;
sendLocal(p); // we sometimes send directly to the local node sendLocal(p); // we sometimes send directly to the local node
} }

View File

@ -80,7 +80,7 @@ extern const pb_msgdesc_t DeviceState_msg;
#define DeviceState_fields &DeviceState_msg #define DeviceState_fields &DeviceState_msg
/* Maximum encoded size of messages (where known) */ /* Maximum encoded size of messages (where known) */
#define DeviceState_size 6262 #define DeviceState_size 6266
#ifdef __cplusplus #ifdef __cplusplus
} /* extern "C" */ } /* extern "C" */

View File

@ -58,3 +58,4 @@ PB_BIND(ToRadio, ToRadio, 2)

View File

@ -83,6 +83,15 @@ typedef enum _CriticalErrorCode {
CriticalErrorCode_TransmitFailed = 8 CriticalErrorCode_TransmitFailed = 8
} CriticalErrorCode; } CriticalErrorCode;
typedef enum _MeshPacket_Priority {
MeshPacket_Priority_UNSET = 0,
MeshPacket_Priority_MIN = 1,
MeshPacket_Priority_BACKGROUND = 10,
MeshPacket_Priority_DEFAULT = 64,
MeshPacket_Priority_ACK = 120,
MeshPacket_Priority_MAX = 127
} MeshPacket_Priority;
typedef enum _ChannelSettings_ModemConfig { typedef enum _ChannelSettings_ModemConfig {
ChannelSettings_ModemConfig_Bw125Cr45Sf128 = 0, ChannelSettings_ModemConfig_Bw125Cr45Sf128 = 0,
ChannelSettings_ModemConfig_Bw500Cr45Sf128 = 1, ChannelSettings_ModemConfig_Bw500Cr45Sf128 = 1,
@ -265,6 +274,7 @@ typedef struct _MeshPacket {
uint32_t rx_time; uint32_t rx_time;
uint32_t hop_limit; uint32_t hop_limit;
bool want_ack; bool want_ack;
MeshPacket_Priority priority;
} MeshPacket; } MeshPacket;
typedef struct _FromRadio { typedef struct _FromRadio {
@ -323,6 +333,10 @@ typedef struct _ToRadio {
#define _CriticalErrorCode_MAX CriticalErrorCode_TransmitFailed #define _CriticalErrorCode_MAX CriticalErrorCode_TransmitFailed
#define _CriticalErrorCode_ARRAYSIZE ((CriticalErrorCode)(CriticalErrorCode_TransmitFailed+1)) #define _CriticalErrorCode_ARRAYSIZE ((CriticalErrorCode)(CriticalErrorCode_TransmitFailed+1))
#define _MeshPacket_Priority_MIN MeshPacket_Priority_UNSET
#define _MeshPacket_Priority_MAX MeshPacket_Priority_MAX
#define _MeshPacket_Priority_ARRAYSIZE ((MeshPacket_Priority)(MeshPacket_Priority_MAX+1))
#define _ChannelSettings_ModemConfig_MIN ChannelSettings_ModemConfig_Bw125Cr45Sf128 #define _ChannelSettings_ModemConfig_MIN ChannelSettings_ModemConfig_Bw125Cr45Sf128
#define _ChannelSettings_ModemConfig_MAX ChannelSettings_ModemConfig_Bw125Cr48Sf4096 #define _ChannelSettings_ModemConfig_MAX ChannelSettings_ModemConfig_Bw125Cr48Sf4096
#define _ChannelSettings_ModemConfig_ARRAYSIZE ((ChannelSettings_ModemConfig)(ChannelSettings_ModemConfig_Bw125Cr48Sf4096+1)) #define _ChannelSettings_ModemConfig_ARRAYSIZE ((ChannelSettings_ModemConfig)(ChannelSettings_ModemConfig_Bw125Cr48Sf4096+1))
@ -342,7 +356,7 @@ extern "C" {
#define User_init_default {"", "", "", {0}} #define User_init_default {"", "", "", {0}}
#define RouteDiscovery_init_default {0, {0, 0, 0, 0, 0, 0, 0, 0}} #define RouteDiscovery_init_default {0, {0, 0, 0, 0, 0, 0, 0, 0}}
#define SubPacket_init_default {0, {Position_init_default}, 0, 0, 0, 0, {0}, 0} #define SubPacket_init_default {0, {Position_init_default}, 0, 0, 0, 0, {0}, 0}
#define MeshPacket_init_default {0, 0, 0, {SubPacket_init_default}, 0, 0, 0, 0, 0, 0} #define MeshPacket_init_default {0, 0, 0, {SubPacket_init_default}, 0, 0, 0, 0, 0, 0, _MeshPacket_Priority_MIN}
#define ChannelSettings_init_default {0, _ChannelSettings_ModemConfig_MIN, {0, {0}}, "", 0, 0, 0, 0, 0, 0, 0} #define ChannelSettings_init_default {0, _ChannelSettings_ModemConfig_MIN, {0, {0}}, "", 0, 0, 0, 0, 0, 0, 0}
#define RadioConfig_init_default {false, RadioConfig_UserPreferences_init_default, false, ChannelSettings_init_default} #define RadioConfig_init_default {false, RadioConfig_UserPreferences_init_default, false, ChannelSettings_init_default}
#define RadioConfig_UserPreferences_init_default {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} #define RadioConfig_UserPreferences_init_default {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
@ -356,7 +370,7 @@ extern "C" {
#define User_init_zero {"", "", "", {0}} #define User_init_zero {"", "", "", {0}}
#define RouteDiscovery_init_zero {0, {0, 0, 0, 0, 0, 0, 0, 0}} #define RouteDiscovery_init_zero {0, {0, 0, 0, 0, 0, 0, 0, 0}}
#define SubPacket_init_zero {0, {Position_init_zero}, 0, 0, 0, 0, {0}, 0} #define SubPacket_init_zero {0, {Position_init_zero}, 0, 0, 0, 0, {0}, 0}
#define MeshPacket_init_zero {0, 0, 0, {SubPacket_init_zero}, 0, 0, 0, 0, 0, 0} #define MeshPacket_init_zero {0, 0, 0, {SubPacket_init_zero}, 0, 0, 0, 0, 0, 0, _MeshPacket_Priority_MIN}
#define ChannelSettings_init_zero {0, _ChannelSettings_ModemConfig_MIN, {0, {0}}, "", 0, 0, 0, 0, 0, 0, 0} #define ChannelSettings_init_zero {0, _ChannelSettings_ModemConfig_MIN, {0, {0}}, "", 0, 0, 0, 0, 0, 0, 0}
#define RadioConfig_init_zero {false, RadioConfig_UserPreferences_init_zero, false, ChannelSettings_init_zero} #define RadioConfig_init_zero {false, RadioConfig_UserPreferences_init_zero, false, ChannelSettings_init_zero}
#define RadioConfig_UserPreferences_init_zero {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} #define RadioConfig_UserPreferences_init_zero {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
@ -479,6 +493,7 @@ extern "C" {
#define MeshPacket_rx_time_tag 9 #define MeshPacket_rx_time_tag 9
#define MeshPacket_hop_limit_tag 10 #define MeshPacket_hop_limit_tag 10
#define MeshPacket_want_ack_tag 11 #define MeshPacket_want_ack_tag 11
#define MeshPacket_priority_tag 12
#define FromRadio_num_tag 1 #define FromRadio_num_tag 1
#define FromRadio_packet_tag 2 #define FromRadio_packet_tag 2
#define FromRadio_my_info_tag 3 #define FromRadio_my_info_tag 3
@ -554,7 +569,8 @@ X(a, STATIC, SINGULAR, UINT32, id, 6) \
X(a, STATIC, SINGULAR, FLOAT, rx_snr, 7) \ X(a, STATIC, SINGULAR, FLOAT, rx_snr, 7) \
X(a, STATIC, SINGULAR, FIXED32, rx_time, 9) \ X(a, STATIC, SINGULAR, FIXED32, rx_time, 9) \
X(a, STATIC, SINGULAR, UINT32, hop_limit, 10) \ X(a, STATIC, SINGULAR, UINT32, hop_limit, 10) \
X(a, STATIC, SINGULAR, BOOL, want_ack, 11) X(a, STATIC, SINGULAR, BOOL, want_ack, 11) \
X(a, STATIC, SINGULAR, UENUM, priority, 12)
#define MeshPacket_CALLBACK NULL #define MeshPacket_CALLBACK NULL
#define MeshPacket_DEFAULT NULL #define MeshPacket_DEFAULT NULL
#define MeshPacket_payloadVariant_decoded_MSGTYPE SubPacket #define MeshPacket_payloadVariant_decoded_MSGTYPE SubPacket
@ -734,7 +750,7 @@ extern const pb_msgdesc_t ToRadio_msg;
#define User_size 72 #define User_size 72
#define RouteDiscovery_size 88 #define RouteDiscovery_size 88
#define SubPacket_size 275 #define SubPacket_size 275
#define MeshPacket_size 320 #define MeshPacket_size 322
#define ChannelSettings_size 95 #define ChannelSettings_size 95
#define RadioConfig_size 405 #define RadioConfig_size 405
#define RadioConfig_UserPreferences_size 305 #define RadioConfig_UserPreferences_size 305

View File

@ -35,8 +35,9 @@ void NodeInfoPlugin::sendOurNodeInfo(NodeNum dest, bool wantReplies)
MeshPacket *p = allocReply(); MeshPacket *p = allocReply();
p->to = dest; p->to = dest;
p->decoded.want_response = wantReplies; p->decoded.want_response = wantReplies;
p->priority = MeshPacket_Priority_BACKGROUND;
prevPacketId = p->id; prevPacketId = p->id;
service.sendToMesh(p); service.sendToMesh(p);
} }

View File

@ -40,10 +40,11 @@ void PositionPlugin::sendOurPosition(NodeNum dest, bool wantReplies)
// cancel any not yet sent (now stale) position packets // cancel any not yet sent (now stale) position packets
if(prevPacketId) // if we wrap around to zero, we'll simply fail to cancel in that rare case (no big deal) if(prevPacketId) // if we wrap around to zero, we'll simply fail to cancel in that rare case (no big deal)
service.cancelSending(prevPacketId); service.cancelSending(prevPacketId);
MeshPacket *p = allocReply(); MeshPacket *p = allocReply();
p->to = dest; p->to = dest;
p->decoded.want_response = wantReplies; p->decoded.want_response = wantReplies;
p->priority = MeshPacket_Priority_BACKGROUND;
prevPacketId = p->id; prevPacketId = p->id;
service.sendToMesh(p); service.sendToMesh(p);