StoreForward updates (#3194)

* StoreForward updates
- Send history in "text" variant
- Don't send history the client already got
- Check if PSRAM is full
- More sensible defaults

* Set `TEXT_BROADCAST` or `TEXT_DIRECT` RequestResponse tag

* feat: E-Ink "Dynamic Partial" (#3193)

Use a mixture of full refresh, partial refresh, and skipped updates, balancing urgency and display health.

Co-authored-by: Ben Meadors <benmmeadors@gmail.com>

* [create-pull-request] automated change (#3209)

Co-authored-by: thebentern <thebentern@users.noreply.github.com>

* Reset `last_index` if history was cleared, e.g. by reboot

---------

Co-authored-by: Ben Meadors <benmmeadors@gmail.com>
Co-authored-by: todd-herbert <herbert.todd@gmail.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: thebentern <thebentern@users.noreply.github.com>
Co-authored-by: Garth Vander Houwen <garthvh@yahoo.com>
This commit is contained in:
GUVWAF 2024-02-14 14:07:20 +01:00 committed by GitHub
parent d2a74a5329
commit d9bd9bdfb0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 53 deletions

View File

@ -38,16 +38,11 @@ int32_t StoreForwardModule::runOnce()
// Only send packets if the channel is less than 25% utilized. // Only send packets if the channel is less than 25% utilized.
if (airTime->isTxAllowedChannelUtil(true)) { if (airTime->isTxAllowedChannelUtil(true)) {
storeForwardModule->sendPayload(this->busyTo, this->packetHistoryTXQueue_index); storeForwardModule->sendPayload(this->busyTo, this->packetHistoryTXQueue_index);
if (this->packetHistoryTXQueue_index == packetHistoryTXQueue_size) { if (this->packetHistoryTXQueue_index < packetHistoryTXQueue_size - 1) {
// Tell the client we're done sending this->packetHistoryTXQueue_index++;
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero; } else {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_PING;
storeForwardModule->sendMessage(this->busyTo, sf);
LOG_INFO("*** S&F - Done. (ROUTER_PING)\n");
this->packetHistoryTXQueue_index = 0; this->packetHistoryTXQueue_index = 0;
this->busy = false; this->busy = false;
} else {
this->packetHistoryTXQueue_index++;
} }
} }
} else if ((millis() - lastHeartbeat > (heartbeatInterval * 1000)) && airTime->isTxAllowedChannelUtil(true)) { } else if ((millis() - lastHeartbeat > (heartbeatInterval * 1000)) && airTime->isTxAllowedChannelUtil(true)) {
@ -56,7 +51,7 @@ int32_t StoreForwardModule::runOnce()
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero; meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_HEARTBEAT; sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_HEARTBEAT;
sf.which_variant = meshtastic_StoreAndForward_heartbeat_tag; sf.which_variant = meshtastic_StoreAndForward_heartbeat_tag;
sf.variant.heartbeat.period = 300; sf.variant.heartbeat.period = heartbeatInterval;
sf.variant.heartbeat.secondary = 0; // TODO we always have one primary router for now sf.variant.heartbeat.secondary = 0; // TODO we always have one primary router for now
storeForwardModule->sendMessage(NODENUM_BROADCAST, sf); storeForwardModule->sendMessage(NODENUM_BROADCAST, sf);
} }
@ -101,10 +96,11 @@ void StoreForwardModule::populatePSRAM()
* *
* @param msAgo The number of milliseconds ago from which to start sending messages. * @param msAgo The number of milliseconds ago from which to start sending messages.
* @param to The recipient ID to send the messages to. * @param to The recipient ID to send the messages to.
* @param last_request_index The index in the packet history of the last request from this node.
*/ */
void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to) void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to, uint32_t last_request_index)
{ {
uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to); uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to, &last_request_index);
if (queueSize) { if (queueSize) {
LOG_INFO("*** S&F - Sending %u message(s)\n", queueSize); LOG_INFO("*** S&F - Sending %u message(s)\n", queueSize);
@ -118,6 +114,7 @@ void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to)
sf.which_variant = meshtastic_StoreAndForward_history_tag; sf.which_variant = meshtastic_StoreAndForward_history_tag;
sf.variant.history.history_messages = queueSize; sf.variant.history.history_messages = queueSize;
sf.variant.history.window = msAgo; sf.variant.history.window = msAgo;
sf.variant.history.last_request = last_request_index;
storeForwardModule->sendMessage(to, sf); storeForwardModule->sendMessage(to, sf);
} }
@ -125,15 +122,18 @@ void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to)
* Creates a new history queue with messages that were received within the specified time frame. * Creates a new history queue with messages that were received within the specified time frame.
* *
* @param msAgo The number of milliseconds ago to start the history queue. * @param msAgo The number of milliseconds ago to start the history queue.
* @param to The maximum number of messages to include in the history queue. * @param to The NodeNum of the recipient.
* @param last_request_index The index in the packet history of the last request from this node.
* @return The ID of the newly created history queue. * @return The ID of the newly created history queue.
*/ */
uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to) uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index)
{ {
this->packetHistoryTXQueue_size = 0; this->packetHistoryTXQueue_size = 0;
// If our history was cleared, ignore what the client is telling us
uint32_t last_index = *last_request_index >= this->packetHistoryCurrent ? 0 : *last_request_index;
for (int i = 0; i < this->packetHistoryCurrent; i++) { for (int i = last_index; i < this->packetHistoryCurrent; i++) {
/* /*
LOG_DEBUG("SF historyQueueCreate\n"); LOG_DEBUG("SF historyQueueCreate\n");
LOG_DEBUG("SF historyQueueCreate - time %d\n", this->packetHistory[i].time); LOG_DEBUG("SF historyQueueCreate - time %d\n", this->packetHistory[i].time);
@ -141,16 +141,11 @@ uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
LOG_DEBUG("SF historyQueueCreate - math %d\n", (millis() - msAgo)); LOG_DEBUG("SF historyQueueCreate - math %d\n", (millis() - msAgo));
*/ */
if (this->packetHistory[i].time && (this->packetHistory[i].time < (millis() - msAgo))) { if (this->packetHistory[i].time && (this->packetHistory[i].time < (millis() - msAgo))) {
LOG_DEBUG("*** SF historyQueueCreate - Time matches - ok\n"); /* Copy the messages that were received by the router in the last msAgo
/*
Copy the messages that were received by the router in the last msAgo
to the packetHistoryTXQueue structure. to the packetHistoryTXQueue structure.
Client not interested in packets from itself and only in broadcast packets or packets towards it. */
TODO: The condition (this->packetHistory[i].to & NODENUM_BROADCAST) == to) is not tested since if (this->packetHistory[i].from != to &&
I don't have an easy way to target a specific user. Will need to do this soon. (this->packetHistory[i].to == NODENUM_BROADCAST || this->packetHistory[i].to == to)) {
*/
if ((this->packetHistory[i].to & NODENUM_BROADCAST) == NODENUM_BROADCAST ||
((this->packetHistory[i].to & NODENUM_BROADCAST) == to)) {
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].time = this->packetHistory[i].time; this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].time = this->packetHistory[i].time;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].to = this->packetHistory[i].to; this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].to = this->packetHistory[i].to;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].from = this->packetHistory[i].from; this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].from = this->packetHistory[i].from;
@ -159,9 +154,10 @@ uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
memcpy(this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].payload, this->packetHistory[i].payload, memcpy(this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].payload, this->packetHistory[i].payload,
meshtastic_Constants_DATA_PAYLOAD_LEN); meshtastic_Constants_DATA_PAYLOAD_LEN);
this->packetHistoryTXQueue_size++; this->packetHistoryTXQueue_size++;
*last_request_index = i + 1; // Set to one higher such that we don't send the same message again
LOG_DEBUG("*** PacketHistoryStruct time=%d\n", this->packetHistory[i].time); LOG_DEBUG("*** PacketHistoryStruct time=%d, msg=%s\n", this->packetHistory[i].time,
LOG_DEBUG("*** PacketHistoryStruct msg=%s\n", this->packetHistory[i].payload); this->packetHistory[i].payload);
} }
} }
} }
@ -177,6 +173,7 @@ void StoreForwardModule::historyAdd(const meshtastic_MeshPacket &mp)
{ {
const auto &p = mp.decoded; const auto &p = mp.decoded;
if (this->packetHistoryCurrent < this->records) {
this->packetHistory[this->packetHistoryCurrent].time = millis(); this->packetHistory[this->packetHistoryCurrent].time = millis();
this->packetHistory[this->packetHistoryCurrent].to = mp.to; this->packetHistory[this->packetHistoryCurrent].to = mp.to;
this->packetHistory[this->packetHistoryCurrent].channel = mp.channel; this->packetHistory[this->packetHistoryCurrent].channel = mp.channel;
@ -186,6 +183,10 @@ void StoreForwardModule::historyAdd(const meshtastic_MeshPacket &mp)
this->packetHistoryCurrent++; this->packetHistoryCurrent++;
this->packetHistoryMax++; this->packetHistoryMax++;
} else {
// TODO: Overwrite the oldest message in the history buffer when it is full.
LOG_WARN("*** S&F - PSRAM Full. Packet is not added to the history.\n");
}
} }
meshtastic_MeshPacket *StoreForwardModule::allocReply() meshtastic_MeshPacket *StoreForwardModule::allocReply()
@ -213,10 +214,19 @@ void StoreForwardModule::sendPayload(NodeNum dest, uint32_t packetHistory_index)
// TODO: Make this configurable. // TODO: Make this configurable.
p->want_ack = false; p->want_ack = false;
p->decoded.payload.size = meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
this->packetHistoryTXQueue[packetHistory_index].payload_size; // You must specify how many bytes are in the reply sf.which_variant = meshtastic_StoreAndForward_text_tag;
memcpy(p->decoded.payload.bytes, this->packetHistoryTXQueue[packetHistory_index].payload, sf.variant.text.size = this->packetHistoryTXQueue[packetHistory_index].payload_size;
memcpy(sf.variant.text.bytes, this->packetHistoryTXQueue[packetHistory_index].payload,
this->packetHistoryTXQueue[packetHistory_index].payload_size); this->packetHistoryTXQueue[packetHistory_index].payload_size);
if (this->packetHistoryTXQueue[packetHistory_index].to == NODENUM_BROADCAST) {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_BROADCAST;
} else {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_DIRECT;
}
p->decoded.payload.size =
pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &meshtastic_StoreAndForward_msg, &sf);
service.sendToMesh(p); service.sendToMesh(p);
} }
@ -387,7 +397,9 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
LOG_INFO("*** S&F - Busy. Try again shortly.\n"); LOG_INFO("*** S&F - Busy. Try again shortly.\n");
} else { } else {
if ((p->which_variant == meshtastic_StoreAndForward_history_tag) && (p->variant.history.window > 0)) { if ((p->which_variant == meshtastic_StoreAndForward_history_tag) && (p->variant.history.window > 0)) {
storeForwardModule->historySend(p->variant.history.window * 60000, getFrom(&mp)); // window is in minutes // window is in minutes
storeForwardModule->historySend(p->variant.history.window * 60000, getFrom(&mp),
p->variant.history.last_request);
} else { } else {
storeForwardModule->historySend(historyReturnWindow * 60000, getFrom(&mp)); // defaults to 4 hours storeForwardModule->historySend(historyReturnWindow * 60000, getFrom(&mp)); // defaults to 4 hours
} }
@ -406,8 +418,7 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
case meshtastic_StoreAndForward_RequestResponse_CLIENT_PONG: case meshtastic_StoreAndForward_RequestResponse_CLIENT_PONG:
if (is_server) { if (is_server) {
LOG_INFO("*** StoreAndForward_RequestResponse_CLIENT_PONG\n"); LOG_INFO("*** StoreAndForward_RequestResponse_CLIENT_PONG\n");
// The Client is alive, update NodeDB // NodeDB is already updated
nodeDB.updateFrom(mp);
} }
break; break;
@ -546,9 +557,7 @@ StoreForwardModule::StoreForwardModule()
} }
// Client // Client
} } else {
if ((config.device.role == meshtastic_Config_DeviceConfig_Role_CLIENT) ||
(config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER_CLIENT)) {
is_client = true; is_client = true;
LOG_INFO("*** Initializing Store & Forward Module in Client mode\n"); LOG_INFO("*** Initializing Store & Forward Module in Client mode\n");
} }

View File

@ -13,7 +13,6 @@ struct PacketHistoryStruct {
uint32_t to; uint32_t to;
uint32_t from; uint32_t from;
uint8_t channel; uint8_t channel;
bool ack;
uint8_t payload[meshtastic_Constants_DATA_PAYLOAD_LEN]; uint8_t payload[meshtastic_Constants_DATA_PAYLOAD_LEN];
pb_size_t payload_size; pb_size_t payload_size;
}; };
@ -32,7 +31,7 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
uint32_t packetHistoryTXQueue_size = 0; uint32_t packetHistoryTXQueue_size = 0;
uint32_t packetHistoryTXQueue_index = 0; uint32_t packetHistoryTXQueue_index = 0;
uint32_t packetTimeMax = 5000; uint32_t packetTimeMax = 5000; // Interval between sending history packets as a server.
bool is_client = false; bool is_client = false;
bool is_server = false; bool is_server = false;
@ -41,7 +40,7 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
StoreForwardModule(); StoreForwardModule();
unsigned long lastHeartbeat = 0; unsigned long lastHeartbeat = 0;
uint32_t heartbeatInterval = 300; uint32_t heartbeatInterval = default_broadcast_interval_secs;
/** /**
Update our local reference of when we last saw that node. Update our local reference of when we last saw that node.
@ -49,9 +48,9 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
*/ */
void historyAdd(const meshtastic_MeshPacket &mp); void historyAdd(const meshtastic_MeshPacket &mp);
void statsSend(uint32_t to); void statsSend(uint32_t to);
void historySend(uint32_t msAgo, uint32_t to); void historySend(uint32_t msAgo, uint32_t to, uint32_t last_request_index = 0);
uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to); uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index);
/** /**
* Send our payload into the mesh * Send our payload into the mesh
@ -79,16 +78,16 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
void populatePSRAM(); void populatePSRAM();
// S&F Defaults // S&F Defaults
uint32_t historyReturnMax = 250; // 250 records uint32_t historyReturnMax = 25; // Return maximum of 25 records by default.
uint32_t historyReturnWindow = 240; // 4 hours uint32_t historyReturnWindow = 240; // Return history of last 4 hours by default.
uint32_t records = 0; // Calculated uint32_t records = 0; // Calculated
bool heartbeat = false; // No heartbeat. bool heartbeat = false; // No heartbeat.
// stats // stats
uint32_t requests = 0; uint32_t requests = 0; // Number of times any client sent a request to the S&F.
uint32_t requests_history = 0; uint32_t requests_history = 0; // Number of times the history was requested.
uint32_t retry_delay = 0; uint32_t retry_delay = 0; // If server is busy, retry after this delay (in ms).
protected: protected:
virtual int32_t runOnce() override; virtual int32_t runOnce() override;