From d9bd9bdfb0eb4989e680f4a2d97b1c2ca83dbb7a Mon Sep 17 00:00:00 2001 From: GUVWAF <78759985+GUVWAF@users.noreply.github.com> Date: Wed, 14 Feb 2024 14:07:20 +0100 Subject: [PATCH] StoreForward updates (#3194) * StoreForward updates - Send history in "text" variant - Don't send history the client already got - Check if PSRAM is full - More sensible defaults * Set `TEXT_BROADCAST` or `TEXT_DIRECT` RequestResponse tag * feat: E-Ink "Dynamic Partial" (#3193) Use a mixture of full refresh, partial refresh, and skipped updates, balancing urgency and display health. Co-authored-by: Ben Meadors * [create-pull-request] automated change (#3209) Co-authored-by: thebentern * Reset `last_index` if history was cleared, e.g. by reboot --------- Co-authored-by: Ben Meadors Co-authored-by: todd-herbert Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: thebentern Co-authored-by: Garth Vander Houwen --- src/modules/esp32/StoreForwardModule.cpp | 93 +++++++++++++----------- src/modules/esp32/StoreForwardModule.h | 21 +++--- 2 files changed, 61 insertions(+), 53 deletions(-) diff --git a/src/modules/esp32/StoreForwardModule.cpp b/src/modules/esp32/StoreForwardModule.cpp index 6b2c079cc..93472b8b1 100644 --- a/src/modules/esp32/StoreForwardModule.cpp +++ b/src/modules/esp32/StoreForwardModule.cpp @@ -38,16 +38,11 @@ int32_t StoreForwardModule::runOnce() // Only send packets if the channel is less than 25% utilized. if (airTime->isTxAllowedChannelUtil(true)) { storeForwardModule->sendPayload(this->busyTo, this->packetHistoryTXQueue_index); - if (this->packetHistoryTXQueue_index == packetHistoryTXQueue_size) { - // Tell the client we're done sending - meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero; - sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_PING; - storeForwardModule->sendMessage(this->busyTo, sf); - LOG_INFO("*** S&F - Done. (ROUTER_PING)\n"); + if (this->packetHistoryTXQueue_index < packetHistoryTXQueue_size - 1) { + this->packetHistoryTXQueue_index++; + } else { this->packetHistoryTXQueue_index = 0; this->busy = false; - } else { - this->packetHistoryTXQueue_index++; } } } else if ((millis() - lastHeartbeat > (heartbeatInterval * 1000)) && airTime->isTxAllowedChannelUtil(true)) { @@ -56,7 +51,7 @@ int32_t StoreForwardModule::runOnce() meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero; sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_HEARTBEAT; sf.which_variant = meshtastic_StoreAndForward_heartbeat_tag; - sf.variant.heartbeat.period = 300; + sf.variant.heartbeat.period = heartbeatInterval; sf.variant.heartbeat.secondary = 0; // TODO we always have one primary router for now storeForwardModule->sendMessage(NODENUM_BROADCAST, sf); } @@ -101,10 +96,11 @@ void StoreForwardModule::populatePSRAM() * * @param msAgo The number of milliseconds ago from which to start sending messages. * @param to The recipient ID to send the messages to. + * @param last_request_index The index in the packet history of the last request from this node. */ -void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to) +void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to, uint32_t last_request_index) { - uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to); + uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to, &last_request_index); if (queueSize) { LOG_INFO("*** S&F - Sending %u message(s)\n", queueSize); @@ -118,6 +114,7 @@ void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to) sf.which_variant = meshtastic_StoreAndForward_history_tag; sf.variant.history.history_messages = queueSize; sf.variant.history.window = msAgo; + sf.variant.history.last_request = last_request_index; storeForwardModule->sendMessage(to, sf); } @@ -125,15 +122,18 @@ void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to) * Creates a new history queue with messages that were received within the specified time frame. * * @param msAgo The number of milliseconds ago to start the history queue. - * @param to The maximum number of messages to include in the history queue. + * @param to The NodeNum of the recipient. + * @param last_request_index The index in the packet history of the last request from this node. * @return The ID of the newly created history queue. */ -uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to) +uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index) { this->packetHistoryTXQueue_size = 0; + // If our history was cleared, ignore what the client is telling us + uint32_t last_index = *last_request_index >= this->packetHistoryCurrent ? 0 : *last_request_index; - for (int i = 0; i < this->packetHistoryCurrent; i++) { + for (int i = last_index; i < this->packetHistoryCurrent; i++) { /* LOG_DEBUG("SF historyQueueCreate\n"); LOG_DEBUG("SF historyQueueCreate - time %d\n", this->packetHistory[i].time); @@ -141,16 +141,11 @@ uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to) LOG_DEBUG("SF historyQueueCreate - math %d\n", (millis() - msAgo)); */ if (this->packetHistory[i].time && (this->packetHistory[i].time < (millis() - msAgo))) { - LOG_DEBUG("*** SF historyQueueCreate - Time matches - ok\n"); - /* - Copy the messages that were received by the router in the last msAgo + /* Copy the messages that were received by the router in the last msAgo to the packetHistoryTXQueue structure. - - TODO: The condition (this->packetHistory[i].to & NODENUM_BROADCAST) == to) is not tested since - I don't have an easy way to target a specific user. Will need to do this soon. - */ - if ((this->packetHistory[i].to & NODENUM_BROADCAST) == NODENUM_BROADCAST || - ((this->packetHistory[i].to & NODENUM_BROADCAST) == to)) { + Client not interested in packets from itself and only in broadcast packets or packets towards it. */ + if (this->packetHistory[i].from != to && + (this->packetHistory[i].to == NODENUM_BROADCAST || this->packetHistory[i].to == to)) { this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].time = this->packetHistory[i].time; this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].to = this->packetHistory[i].to; this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].from = this->packetHistory[i].from; @@ -159,9 +154,10 @@ uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to) memcpy(this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].payload, this->packetHistory[i].payload, meshtastic_Constants_DATA_PAYLOAD_LEN); this->packetHistoryTXQueue_size++; + *last_request_index = i + 1; // Set to one higher such that we don't send the same message again - LOG_DEBUG("*** PacketHistoryStruct time=%d\n", this->packetHistory[i].time); - LOG_DEBUG("*** PacketHistoryStruct msg=%s\n", this->packetHistory[i].payload); + LOG_DEBUG("*** PacketHistoryStruct time=%d, msg=%s\n", this->packetHistory[i].time, + this->packetHistory[i].payload); } } } @@ -177,15 +173,20 @@ void StoreForwardModule::historyAdd(const meshtastic_MeshPacket &mp) { const auto &p = mp.decoded; - this->packetHistory[this->packetHistoryCurrent].time = millis(); - this->packetHistory[this->packetHistoryCurrent].to = mp.to; - this->packetHistory[this->packetHistoryCurrent].channel = mp.channel; - this->packetHistory[this->packetHistoryCurrent].from = mp.from; - this->packetHistory[this->packetHistoryCurrent].payload_size = p.payload.size; - memcpy(this->packetHistory[this->packetHistoryCurrent].payload, p.payload.bytes, meshtastic_Constants_DATA_PAYLOAD_LEN); + if (this->packetHistoryCurrent < this->records) { + this->packetHistory[this->packetHistoryCurrent].time = millis(); + this->packetHistory[this->packetHistoryCurrent].to = mp.to; + this->packetHistory[this->packetHistoryCurrent].channel = mp.channel; + this->packetHistory[this->packetHistoryCurrent].from = mp.from; + this->packetHistory[this->packetHistoryCurrent].payload_size = p.payload.size; + memcpy(this->packetHistory[this->packetHistoryCurrent].payload, p.payload.bytes, meshtastic_Constants_DATA_PAYLOAD_LEN); - this->packetHistoryCurrent++; - this->packetHistoryMax++; + this->packetHistoryCurrent++; + this->packetHistoryMax++; + } else { + // TODO: Overwrite the oldest message in the history buffer when it is full. + LOG_WARN("*** S&F - PSRAM Full. Packet is not added to the history.\n"); + } } meshtastic_MeshPacket *StoreForwardModule::allocReply() @@ -213,10 +214,19 @@ void StoreForwardModule::sendPayload(NodeNum dest, uint32_t packetHistory_index) // TODO: Make this configurable. p->want_ack = false; - p->decoded.payload.size = - this->packetHistoryTXQueue[packetHistory_index].payload_size; // You must specify how many bytes are in the reply - memcpy(p->decoded.payload.bytes, this->packetHistoryTXQueue[packetHistory_index].payload, + meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero; + sf.which_variant = meshtastic_StoreAndForward_text_tag; + sf.variant.text.size = this->packetHistoryTXQueue[packetHistory_index].payload_size; + memcpy(sf.variant.text.bytes, this->packetHistoryTXQueue[packetHistory_index].payload, this->packetHistoryTXQueue[packetHistory_index].payload_size); + if (this->packetHistoryTXQueue[packetHistory_index].to == NODENUM_BROADCAST) { + sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_BROADCAST; + } else { + sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_DIRECT; + } + + p->decoded.payload.size = + pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &meshtastic_StoreAndForward_msg, &sf); service.sendToMesh(p); } @@ -387,7 +397,9 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp, LOG_INFO("*** S&F - Busy. Try again shortly.\n"); } else { if ((p->which_variant == meshtastic_StoreAndForward_history_tag) && (p->variant.history.window > 0)) { - storeForwardModule->historySend(p->variant.history.window * 60000, getFrom(&mp)); // window is in minutes + // window is in minutes + storeForwardModule->historySend(p->variant.history.window * 60000, getFrom(&mp), + p->variant.history.last_request); } else { storeForwardModule->historySend(historyReturnWindow * 60000, getFrom(&mp)); // defaults to 4 hours } @@ -406,8 +418,7 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp, case meshtastic_StoreAndForward_RequestResponse_CLIENT_PONG: if (is_server) { LOG_INFO("*** StoreAndForward_RequestResponse_CLIENT_PONG\n"); - // The Client is alive, update NodeDB - nodeDB.updateFrom(mp); + // NodeDB is already updated } break; @@ -546,9 +557,7 @@ StoreForwardModule::StoreForwardModule() } // Client - } - if ((config.device.role == meshtastic_Config_DeviceConfig_Role_CLIENT) || - (config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER_CLIENT)) { + } else { is_client = true; LOG_INFO("*** Initializing Store & Forward Module in Client mode\n"); } diff --git a/src/modules/esp32/StoreForwardModule.h b/src/modules/esp32/StoreForwardModule.h index 806f0a836..b04d9ef84 100644 --- a/src/modules/esp32/StoreForwardModule.h +++ b/src/modules/esp32/StoreForwardModule.h @@ -13,7 +13,6 @@ struct PacketHistoryStruct { uint32_t to; uint32_t from; uint8_t channel; - bool ack; uint8_t payload[meshtastic_Constants_DATA_PAYLOAD_LEN]; pb_size_t payload_size; }; @@ -32,7 +31,7 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule< uint32_t packetHistoryTXQueue_size = 0; uint32_t packetHistoryTXQueue_index = 0; - uint32_t packetTimeMax = 5000; + uint32_t packetTimeMax = 5000; // Interval between sending history packets as a server. bool is_client = false; bool is_server = false; @@ -41,7 +40,7 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule< StoreForwardModule(); unsigned long lastHeartbeat = 0; - uint32_t heartbeatInterval = 300; + uint32_t heartbeatInterval = default_broadcast_interval_secs; /** Update our local reference of when we last saw that node. @@ -49,9 +48,9 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule< */ void historyAdd(const meshtastic_MeshPacket &mp); void statsSend(uint32_t to); - void historySend(uint32_t msAgo, uint32_t to); + void historySend(uint32_t msAgo, uint32_t to, uint32_t last_request_index = 0); - uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to); + uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index); /** * Send our payload into the mesh @@ -79,16 +78,16 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule< void populatePSRAM(); // S&F Defaults - uint32_t historyReturnMax = 250; // 250 records - uint32_t historyReturnWindow = 240; // 4 hours + uint32_t historyReturnMax = 25; // Return maximum of 25 records by default. + uint32_t historyReturnWindow = 240; // Return history of last 4 hours by default. uint32_t records = 0; // Calculated bool heartbeat = false; // No heartbeat. // stats - uint32_t requests = 0; - uint32_t requests_history = 0; + uint32_t requests = 0; // Number of times any client sent a request to the S&F. + uint32_t requests_history = 0; // Number of times the history was requested. - uint32_t retry_delay = 0; + uint32_t retry_delay = 0; // If server is busy, retry after this delay (in ms). protected: virtual int32_t runOnce() override; @@ -102,4 +101,4 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule< virtual bool handleReceivedProtobuf(const meshtastic_MeshPacket &mp, meshtastic_StoreAndForward *p); }; -extern StoreForwardModule *storeForwardModule; +extern StoreForwardModule *storeForwardModule; \ No newline at end of file