Let StoreForward server send history to phoneAPI (#4282)

* Send StoreForward history of the server to a connected client
To extend the ToPhoneQueue

* Add delay after sending history info

* Don't allow history request over LoRa on default channel

---------

Co-authored-by: Ben Meadors <benmmeadors@gmail.com>
This commit is contained in:
GUVWAF 2024-07-13 19:36:44 +02:00 committed by GitHub
parent 141ae296b7
commit 9e4ce86c2a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 259 additions and 183 deletions

View File

@ -277,6 +277,17 @@ const char *Channels::getName(size_t chIndex)
return channelName;
}
bool Channels::isDefaultChannel(const meshtastic_Channel &ch)
{
if (ch.settings.psk.size == 1 && ch.settings.psk.bytes[0] == 1) {
const char *presetName = DisplayFormatters::getModemPresetDisplayName(config.lora.modem_preset, false);
// Check if the name is the default derived from the modem preset
if (strcmp(ch.settings.name, presetName) == 0)
return true;
}
return false;
}
bool Channels::hasDefaultChannel()
{
// If we don't use a preset or the default frequency slot, or we override the frequency, we don't have a default channel
@ -285,13 +296,8 @@ bool Channels::hasDefaultChannel()
// Check if any of the channels are using the default name and PSK
for (size_t i = 0; i < getNumChannels(); i++) {
const auto &ch = getByIndex(i);
if (ch.settings.psk.size == 1 && ch.settings.psk.bytes[0] == 1) {
const char *name = getName(i);
const char *presetName = DisplayFormatters::getModemPresetDisplayName(config.lora.modem_preset, false);
// Check if the name is the default derived from the modem preset
if (strcmp(name, presetName) == 0)
return true;
}
if (isDefaultChannel(ch))
return true;
}
return false;
}
@ -324,4 +330,4 @@ bool Channels::decryptForHash(ChannelIndex chIndex, ChannelHash channelHash)
int16_t Channels::setActiveByIndex(ChannelIndex channelIndex)
{
return setCrypto(channelIndex);
}
}

View File

@ -83,6 +83,9 @@ class Channels
*/
int16_t setActiveByIndex(ChannelIndex channelIndex);
// Returns true if the channel has the default name and PSK
bool isDefaultChannel(const meshtastic_Channel &ch);
// Returns true if we can be reached via a channel with the default settings given a region and modem preset
bool hasDefaultChannel();
@ -126,4 +129,4 @@ class Channels
};
/// Singleton channel table
extern Channels channels;
extern Channels channels;

View File

@ -293,6 +293,17 @@ void MeshService::sendToPhone(meshtastic_MeshPacket *p)
{
perhapsDecode(p);
#ifdef ARCH_ESP32
#if !MESHTASTIC_EXCLUDE_STOREFORWARD
if (moduleConfig.store_forward.enabled && storeForwardModule->isServer() &&
p->decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP) {
releaseToPool(p); // Copy is already stored in StoreForward history
fromNum++; // Notify observers for packet from radio
return;
}
#endif
#endif
if (toPhoneQueue.numFree() == 0) {
if (p->decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP ||
p->decoded.portnum == meshtastic_PortNum_RANGE_TEST_APP) {

View File

@ -13,6 +13,11 @@
#if defined(ARCH_PORTDUINO) && !HAS_RADIO
#include "../platform/portduino/SimRadio.h"
#endif
#ifdef ARCH_ESP32
#if !MESHTASTIC_EXCLUDE_STOREFORWARD
#include "modules/esp32/StoreForwardModule.h"
#endif
#endif
extern Allocator<meshtastic_QueueStatus> &queueStatusPool;
extern Allocator<meshtastic_MqttClientProxyMessage> &mqttClientProxyMessagePool;

View File

@ -503,6 +503,14 @@ bool PhoneAPI::available()
return true;
}
#ifdef ARCH_ESP32
#if !MESHTASTIC_EXCLUDE_STOREFORWARD
// Check if StoreForward has packets stored for us.
if (!packetForPhone && storeForwardModule)
packetForPhone = storeForwardModule->getForPhone();
#endif
#endif
if (!packetForPhone)
packetForPhone = service.getForPhone();
hasPacket = !!packetForPhone;

View File

@ -392,6 +392,11 @@ void AdminModule::handleSetConfig(const meshtastic_Config &c)
// Router Client is deprecated; Set it to client
if (c.payload_variant.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER_CLIENT) {
config.device.role = meshtastic_Config_DeviceConfig_Role_CLIENT;
if (moduleConfig.store_forward.enabled && !moduleConfig.store_forward.is_server) {
moduleConfig.store_forward.is_server = true;
changes |= SEGMENT_MODULECONFIG;
requiresReboot = true;
}
}
break;
case meshtastic_Config_position_tag:

View File

@ -35,13 +35,10 @@ int32_t StoreForwardModule::runOnce()
if (moduleConfig.store_forward.enabled && is_server) {
// Send out the message queue.
if (this->busy) {
// Only send packets if the channel is less than 25% utilized.
if (airTime->isTxAllowedChannelUtil(true)) {
storeForwardModule->sendPayload(this->busyTo, this->packetHistoryTXQueue_index);
if (this->packetHistoryTXQueue_index < packetHistoryTXQueue_size - 1) {
this->packetHistoryTXQueue_index++;
} else {
this->packetHistoryTXQueue_index = 0;
// Only send packets if the channel is less than 25% utilized and until historyReturnMax
if (airTime->isTxAllowedChannelUtil(true) && this->requestCount < this->historyReturnMax) {
if (!storeForwardModule->sendPayload(this->busyTo, this->last_time)) {
this->requestCount = 0;
this->busy = false;
}
}
@ -75,9 +72,6 @@ void StoreForwardModule::populatePSRAM()
LOG_DEBUG("*** Before PSRAM initialization: heap %d/%d PSRAM %d/%d\n", memGet.getFreeHeap(), memGet.getHeapSize(),
memGet.getFreePsram(), memGet.getPsramSize());
this->packetHistoryTXQueue =
static_cast<PacketHistoryStruct *>(ps_calloc(this->historyReturnMax, sizeof(PacketHistoryStruct)));
/* Use a maximum of 2/3 the available PSRAM unless otherwise specified.
Note: This needs to be done after every thing that would use PSRAM
*/
@ -95,13 +89,15 @@ void StoreForwardModule::populatePSRAM()
/**
* Sends messages from the message history to the specified recipient.
*
* @param msAgo The number of milliseconds ago from which to start sending messages.
* @param sAgo The number of seconds ago from which to start sending messages.
* @param to The recipient ID to send the messages to.
*/
void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to)
void StoreForwardModule::historySend(uint32_t secAgo, uint32_t to)
{
uint32_t lastIndex = lastRequest.find(to) != lastRequest.end() ? lastRequest[to] : 0;
uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to, &lastIndex);
this->last_time = getTime() < secAgo ? 0 : getTime() - secAgo;
uint32_t queueSize = getNumAvailablePackets(to, last_time);
if (queueSize > this->historyReturnMax)
queueSize = this->historyReturnMax;
if (queueSize) {
LOG_INFO("*** S&F - Sending %u message(s)\n", queueSize);
@ -114,62 +110,66 @@ void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to)
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_HISTORY;
sf.which_variant = meshtastic_StoreAndForward_history_tag;
sf.variant.history.history_messages = queueSize;
sf.variant.history.window = msAgo;
sf.variant.history.last_request = lastIndex;
lastRequest[to] = lastIndex;
sf.variant.history.window = secAgo * 1000;
sf.variant.history.last_request = lastRequest[to];
storeForwardModule->sendMessage(to, sf);
setIntervalFromNow(this->packetTimeMax); // Delay start of sending payloads
}
/**
* Creates a new history queue with messages that were received within the specified time frame.
* Returns the number of available packets in the message history for a specified destination node.
*
* @param msAgo The number of milliseconds ago to start the history queue.
* @param to The NodeNum of the recipient.
* @param last_request_index The index in the packet history of the last request from this node.
* @return The ID of the newly created history queue.
* @param dest The destination node number.
* @param last_time The relative time to start counting messages from.
* @return The number of available packets in the message history.
*/
uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index)
uint32_t StoreForwardModule::getNumAvailablePackets(NodeNum dest, uint32_t last_time)
{
this->packetHistoryTXQueue_size = 0;
// If our history was cleared, ignore the last request index
uint32_t last_index = *last_request_index > this->packetHistoryCurrent ? 0 : *last_request_index;
for (uint32_t i = last_index; i < this->packetHistoryCurrent; i++) {
/*
LOG_DEBUG("SF historyQueueCreate\n");
LOG_DEBUG("SF historyQueueCreate - time %d\n", this->packetHistory[i].time);
LOG_DEBUG("SF historyQueueCreate - millis %d\n", millis());
LOG_DEBUG("SF historyQueueCreate - math %d\n", (millis() - msAgo));
*/
if (this->packetHistoryTXQueue_size < this->historyReturnMax) {
if (this->packetHistory[i].time && (this->packetHistory[i].time < (millis() - msAgo))) {
/* Copy the messages that were received by the router in the last msAgo
to the packetHistoryTXQueue structure.
Client not interested in packets from itself and only in broadcast packets or packets towards it. */
if (this->packetHistory[i].from != to &&
(this->packetHistory[i].to == NODENUM_BROADCAST || this->packetHistory[i].to == to)) {
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].time = this->packetHistory[i].time;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].to = this->packetHistory[i].to;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].from = this->packetHistory[i].from;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].channel = this->packetHistory[i].channel;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].payload_size =
this->packetHistory[i].payload_size;
memcpy(this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].payload, this->packetHistory[i].payload,
meshtastic_Constants_DATA_PAYLOAD_LEN);
this->packetHistoryTXQueue_size++;
*last_request_index = i + 1; // Set to one higher such that we don't send the same message again
LOG_DEBUG("*** PacketHistoryStruct time=%d, msg=%s\n", this->packetHistory[i].time,
this->packetHistory[i].payload);
}
uint32_t count = 0;
if (lastRequest.find(dest) == lastRequest.end()) {
lastRequest[dest] = 0;
}
for (uint32_t i = lastRequest[dest]; i < this->packetHistoryTotalCount; i++) {
if (this->packetHistory[i].time && (this->packetHistory[i].time > last_time)) {
// Client is only interested in packets not from itself and only in broadcast packets or packets towards it.
if (this->packetHistory[i].from != dest &&
(this->packetHistory[i].to == NODENUM_BROADCAST || this->packetHistory[i].to == dest)) {
count++;
}
} else {
LOG_WARN("*** S&F - Maximum history return reached.\n");
return this->packetHistoryTXQueue_size;
}
}
return this->packetHistoryTXQueue_size;
return count;
}
/**
* Allocates a mesh packet for sending to the phone.
*
* @return A pointer to the allocated mesh packet or nullptr if none is available.
*/
meshtastic_MeshPacket *StoreForwardModule::getForPhone()
{
if (moduleConfig.store_forward.enabled && is_server) {
NodeNum to = nodeDB->getNodeNum();
if (!this->busy) {
// Get number of packets we're going to send in this loop
uint32_t histSize = getNumAvailablePackets(to, 0); // No time limit
if (histSize) {
this->busy = true;
this->busyTo = to;
} else {
return nullptr;
}
}
// We're busy with sending to us until no payload is available anymore
if (this->busy && this->busyTo == to) {
meshtastic_MeshPacket *p = preparePayload(to, 0, true); // No time limit
if (!p) // No more messages to send
this->busy = false;
return p;
}
}
return nullptr;
}
/**
@ -181,66 +181,97 @@ void StoreForwardModule::historyAdd(const meshtastic_MeshPacket &mp)
{
const auto &p = mp.decoded;
if (this->packetHistoryCurrent == this->records) {
if (this->packetHistoryTotalCount == this->records) {
LOG_WARN("*** S&F - PSRAM Full. Starting overwrite now.\n");
this->packetHistoryCurrent = 0;
this->packetHistoryMax = 0;
this->packetHistoryTotalCount = 0;
for (auto &i : lastRequest) {
i.second = 0; // Clear the last request index for each client device
}
}
this->packetHistory[this->packetHistoryCurrent].time = millis();
this->packetHistory[this->packetHistoryCurrent].to = mp.to;
this->packetHistory[this->packetHistoryCurrent].channel = mp.channel;
this->packetHistory[this->packetHistoryCurrent].from = mp.from;
this->packetHistory[this->packetHistoryCurrent].payload_size = p.payload.size;
memcpy(this->packetHistory[this->packetHistoryCurrent].payload, p.payload.bytes, meshtastic_Constants_DATA_PAYLOAD_LEN);
this->packetHistory[this->packetHistoryTotalCount].time = getTime();
this->packetHistory[this->packetHistoryTotalCount].to = mp.to;
this->packetHistory[this->packetHistoryTotalCount].channel = mp.channel;
this->packetHistory[this->packetHistoryTotalCount].from = getFrom(&mp);
this->packetHistory[this->packetHistoryTotalCount].payload_size = p.payload.size;
memcpy(this->packetHistory[this->packetHistoryTotalCount].payload, p.payload.bytes, meshtastic_Constants_DATA_PAYLOAD_LEN);
this->packetHistoryCurrent++;
this->packetHistoryMax++;
}
meshtastic_MeshPacket *StoreForwardModule::allocReply()
{
auto reply = allocDataPacket(); // Allocate a packet for sending
return reply;
this->packetHistoryTotalCount++;
}
/**
* Sends a payload to a specified destination node using the store and forward mechanism.
*
* @param dest The destination node number.
* @param packetHistory_index The index of the packet in the packet history buffer.
* @param last_time The relative time to start sending messages from.
* @return True if a packet was successfully sent, false otherwise.
*/
void StoreForwardModule::sendPayload(NodeNum dest, uint32_t packetHistory_index)
bool StoreForwardModule::sendPayload(NodeNum dest, uint32_t last_time)
{
LOG_INFO("*** Sending S&F Payload\n");
meshtastic_MeshPacket *p = allocReply();
p->to = dest;
p->from = this->packetHistoryTXQueue[packetHistory_index].from;
p->channel = this->packetHistoryTXQueue[packetHistory_index].channel;
// Let's assume that if the router received the S&F request that the client is in range.
// TODO: Make this configurable.
p->want_ack = false;
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
sf.which_variant = meshtastic_StoreAndForward_text_tag;
sf.variant.text.size = this->packetHistoryTXQueue[packetHistory_index].payload_size;
memcpy(sf.variant.text.bytes, this->packetHistoryTXQueue[packetHistory_index].payload,
this->packetHistoryTXQueue[packetHistory_index].payload_size);
if (this->packetHistoryTXQueue[packetHistory_index].to == NODENUM_BROADCAST) {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_BROADCAST;
} else {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_DIRECT;
meshtastic_MeshPacket *p = preparePayload(dest, last_time);
if (p) {
LOG_INFO("*** Sending S&F Payload\n");
service.sendToMesh(p);
this->requestCount++;
return true;
}
return false;
}
p->decoded.payload.size =
pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &meshtastic_StoreAndForward_msg, &sf);
/**
* Prepares a payload to be sent to a specified destination node from the S&F packet history.
*
* @param dest The destination node number.
* @param last_time The relative time to start sending messages from.
* @return A pointer to the prepared mesh packet or nullptr if none is available.
*/
meshtastic_MeshPacket *StoreForwardModule::preparePayload(NodeNum dest, uint32_t last_time, bool local)
{
for (uint32_t i = lastRequest[dest]; i < this->packetHistoryTotalCount; i++) {
if (this->packetHistory[i].time && (this->packetHistory[i].time > last_time)) {
/* Copy the messages that were received by the server in the last msAgo
to the packetHistoryTXQueue structure.
Client not interested in packets from itself and only in broadcast packets or packets towards it. */
if (this->packetHistory[i].from != dest &&
(this->packetHistory[i].to == NODENUM_BROADCAST || this->packetHistory[i].to == dest)) {
service.sendToMesh(p);
meshtastic_MeshPacket *p = allocDataPacket();
p->to = local ? this->packetHistory[i].to : dest; // PhoneAPI can handle original `to`
p->from = this->packetHistory[i].from;
p->channel = this->packetHistory[i].channel;
p->rx_time = this->packetHistory[i].time;
// Let's assume that if the server received the S&F request that the client is in range.
// TODO: Make this configurable.
p->want_ack = false;
if (local) { // PhoneAPI gets normal TEXT_MESSAGE_APP
p->decoded.portnum = meshtastic_PortNum_TEXT_MESSAGE_APP;
memcpy(p->decoded.payload.bytes, this->packetHistory[i].payload, this->packetHistory[i].payload_size);
p->decoded.payload.size = this->packetHistory[i].payload_size;
} else {
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
sf.which_variant = meshtastic_StoreAndForward_text_tag;
sf.variant.text.size = this->packetHistory[i].payload_size;
memcpy(sf.variant.text.bytes, this->packetHistory[i].payload, this->packetHistory[i].payload_size);
if (this->packetHistory[i].to == NODENUM_BROADCAST) {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_BROADCAST;
} else {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_DIRECT;
}
p->decoded.payload.size = pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes),
&meshtastic_StoreAndForward_msg, &sf);
}
lastRequest[dest] = i + 1; // Update the last request index for the client device
return p;
}
}
}
return nullptr;
}
/**
@ -257,11 +288,7 @@ void StoreForwardModule::sendMessage(NodeNum dest, const meshtastic_StoreAndForw
p->priority = meshtastic_MeshPacket_Priority_BACKGROUND;
// FIXME - Determine if the delayed packet is broadcast or delayed. For now, assume
// everything is broadcast.
p->delayed = meshtastic_MeshPacket_Delayed_DELAYED_BROADCAST;
// Let's assume that if the router received the S&F request that the client is in range.
// Let's assume that if the server received the S&F request that the client is in range.
// TODO: Make this configurable.
p->want_ack = false;
p->decoded.want_response = false;
@ -283,6 +310,35 @@ void StoreForwardModule::sendMessage(NodeNum dest, meshtastic_StoreAndForward_Re
storeForwardModule->sendMessage(dest, sf);
}
/**
* Sends a text message with an error (busy or channel not available) to the specified destination node.
*
* @param dest The destination node number.
* @param want_response True if the original message requested a response, false otherwise.
*/
void StoreForwardModule::sendErrorTextMessage(NodeNum dest, bool want_response)
{
meshtastic_MeshPacket *pr = allocDataPacket();
pr->to = dest;
pr->priority = meshtastic_MeshPacket_Priority_BACKGROUND;
pr->want_ack = false;
pr->decoded.want_response = false;
pr->decoded.portnum = meshtastic_PortNum_TEXT_MESSAGE_APP;
const char *str;
if (this->busy) {
str = "** S&F - Busy. Try again shortly.";
} else {
str = "** S&F - Not available on this channel.";
}
LOG_WARN("%s\n", str);
memcpy(pr->decoded.payload.bytes, str, strlen(str));
pr->decoded.payload.size = strlen(str);
if (want_response) {
ignoreRequest = true; // This text message counts as response.
}
service.sendToMesh(pr);
}
/**
* Sends statistics about the store and forward module to the specified node.
*
@ -294,8 +350,8 @@ void StoreForwardModule::statsSend(uint32_t to)
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_STATS;
sf.which_variant = meshtastic_StoreAndForward_stats_tag;
sf.variant.stats.messages_total = this->packetHistoryMax;
sf.variant.stats.messages_saved = this->packetHistoryCurrent;
sf.variant.stats.messages_total = this->records;
sf.variant.stats.messages_saved = this->packetHistoryTotalCount;
sf.variant.stats.messages_max = this->records;
sf.variant.stats.up_time = millis() / 1000;
sf.variant.stats.requests = this->requests;
@ -319,51 +375,37 @@ ProcessMessage StoreForwardModule::handleReceived(const meshtastic_MeshPacket &m
#ifdef ARCH_ESP32
if (moduleConfig.store_forward.enabled) {
// The router node should not be sending messages as a client
if ((getFrom(&mp) != nodeDB->getNodeNum())) {
if ((mp.decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP) && is_server) {
auto &p = mp.decoded;
if (mp.to == nodeDB->getNodeNum() && (p.payload.bytes[0] == 'S') && (p.payload.bytes[1] == 'F') &&
(p.payload.bytes[2] == 0x00)) {
LOG_DEBUG("*** Legacy Request to send\n");
if ((mp.decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP) && is_server) {
auto &p = mp.decoded;
if (mp.to == nodeDB->getNodeNum() && (p.payload.bytes[0] == 'S') && (p.payload.bytes[1] == 'F') &&
(p.payload.bytes[2] == 0x00)) {
LOG_DEBUG("*** Legacy Request to send\n");
// Send the last 60 minutes of messages.
if (this->busy) {
storeForwardModule->sendMessage(getFrom(&mp), meshtastic_StoreAndForward_RequestResponse_ROUTER_BUSY);
LOG_INFO("*** S&F - Busy. Try again shortly.\n");
meshtastic_MeshPacket *pr = allocReply();
pr->to = getFrom(&mp);
pr->priority = meshtastic_MeshPacket_Priority_BACKGROUND;
pr->want_ack = false;
pr->decoded.want_response = false;
pr->decoded.portnum = meshtastic_PortNum_TEXT_MESSAGE_APP;
memcpy(pr->decoded.payload.bytes, "** S&F - Busy. Try again shortly.", 34);
pr->decoded.payload.size = 34;
service.sendToMesh(pr);
} else {
storeForwardModule->historySend(historyReturnWindow * 60000, getFrom(&mp));
}
// Send the last 60 minutes of messages.
if (this->busy || channels.isDefaultChannel(channels.getByIndex(mp.channel))) {
sendErrorTextMessage(getFrom(&mp), mp.decoded.want_response);
} else {
storeForwardModule->historyAdd(mp);
LOG_INFO("*** S&F stored. Message history contains %u records now.\n", this->packetHistoryCurrent);
storeForwardModule->historySend(historyReturnWindow * 60, getFrom(&mp));
}
} else if (mp.decoded.portnum == meshtastic_PortNum_STORE_FORWARD_APP) {
auto &p = mp.decoded;
meshtastic_StoreAndForward scratch;
meshtastic_StoreAndForward *decoded = NULL;
if (mp.which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
if (pb_decode_from_bytes(p.payload.bytes, p.payload.size, &meshtastic_StoreAndForward_msg, &scratch)) {
decoded = &scratch;
} else {
LOG_ERROR("Error decoding protobuf module!\n");
// if we can't decode it, nobody can process it!
return ProcessMessage::STOP;
}
return handleReceivedProtobuf(mp, decoded) ? ProcessMessage::STOP : ProcessMessage::CONTINUE;
} else {
storeForwardModule->historyAdd(mp);
LOG_INFO("*** S&F stored. Message history contains %u records now.\n", this->packetHistoryTotalCount);
}
} else if (getFrom(&mp) != nodeDB->getNodeNum() && mp.decoded.portnum == meshtastic_PortNum_STORE_FORWARD_APP) {
auto &p = mp.decoded;
meshtastic_StoreAndForward scratch;
meshtastic_StoreAndForward *decoded = NULL;
if (mp.which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
if (pb_decode_from_bytes(p.payload.bytes, p.payload.size, &meshtastic_StoreAndForward_msg, &scratch)) {
decoded = &scratch;
} else {
LOG_ERROR("Error decoding protobuf module!\n");
// if we can't decode it, nobody can process it!
return ProcessMessage::STOP;
}
} // all others are irrelevant
}
return handleReceivedProtobuf(mp, decoded) ? ProcessMessage::STOP : ProcessMessage::CONTINUE;
}
} // all others are irrelevant
}
#endif
@ -394,7 +436,7 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
// stop sending stuff, the client wants to abort or has another error
if ((this->busy) && (this->busyTo == getFrom(&mp))) {
LOG_ERROR("*** Client in ERROR or ABORT requested\n");
this->packetHistoryTXQueue_index = 0;
this->requestCount = 0;
this->busy = false;
}
}
@ -405,15 +447,14 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
requests_history++;
LOG_INFO("*** Client Request to send HISTORY\n");
// Send the last 60 minutes of messages.
if (this->busy) {
storeForwardModule->sendMessage(getFrom(&mp), meshtastic_StoreAndForward_RequestResponse_ROUTER_BUSY);
LOG_INFO("*** S&F - Busy. Try again shortly.\n");
if (this->busy || channels.isDefaultChannel(channels.getByIndex(mp.channel))) {
sendErrorTextMessage(getFrom(&mp), mp.decoded.want_response);
} else {
if ((p->which_variant == meshtastic_StoreAndForward_history_tag) && (p->variant.history.window > 0)) {
// window is in minutes
storeForwardModule->historySend(p->variant.history.window * 60000, getFrom(&mp));
storeForwardModule->historySend(p->variant.history.window * 60, getFrom(&mp));
} else {
storeForwardModule->historySend(historyReturnWindow * 60000, getFrom(&mp)); // defaults to 4 hours
storeForwardModule->historySend(historyReturnWindow * 60, getFrom(&mp)); // defaults to 4 hours
}
}
}
@ -451,7 +492,7 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
if (is_client) {
LOG_DEBUG("*** StoreAndForward_RequestResponse_ROUTER_BUSY\n");
// retry in messages_saved * packetTimeMax ms
retry_delay = millis() + packetHistoryCurrent * packetTimeMax *
retry_delay = millis() + getNumAvailablePackets(this->busyTo, this->last_time) * packetTimeMax *
(meshtastic_StoreAndForward_RequestResponse_ROUTER_ERROR ? 2 : 1);
}
break;
@ -482,8 +523,6 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
LOG_DEBUG("*** Router Response STATS\n");
// These fields only have informational purpose on a client. Fill them to consume later.
if (p->which_variant == meshtastic_StoreAndForward_stats_tag) {
this->packetHistoryMax = p->variant.stats.messages_total;
this->packetHistoryCurrent = p->variant.stats.messages_saved;
this->records = p->variant.stats.messages_max;
this->requests = p->variant.stats.requests;
this->requests_history = p->variant.stats.requests_history;
@ -508,7 +547,7 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
default:
break; // no need to do anything
}
return true; // There's no need for others to look at this message.
return false; // RoutingModule sends it to the phone
}
StoreForwardModule::StoreForwardModule()
@ -532,9 +571,8 @@ StoreForwardModule::StoreForwardModule()
if (moduleConfig.store_forward.enabled) {
// Router
if ((config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER) ||
(config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER_CLIENT)) {
LOG_INFO("*** Initializing Store & Forward Module in Router mode\n");
if ((config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER || moduleConfig.store_forward.is_server)) {
LOG_INFO("*** Initializing Store & Forward Module in Server mode\n");
if (memGet.getPsramSize() > 0) {
if (memGet.getFreePsram() >= 1024 * 1024) {

View File

@ -25,12 +25,9 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
char routerMessage[meshtastic_Constants_DATA_PAYLOAD_LEN] = {0};
PacketHistoryStruct *packetHistory = 0;
uint32_t packetHistoryCurrent = 0;
uint32_t packetHistoryMax = 0;
PacketHistoryStruct *packetHistoryTXQueue = 0;
uint32_t packetHistoryTXQueue_size = 0;
uint32_t packetHistoryTXQueue_index = 0;
uint32_t packetHistoryTotalCount = 0;
uint32_t last_time = 0;
uint32_t requestCount = 0;
uint32_t packetTimeMax = 5000; // Interval between sending history packets as a server.
@ -52,18 +49,21 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
*/
void historyAdd(const meshtastic_MeshPacket &mp);
void statsSend(uint32_t to);
void historySend(uint32_t msAgo, uint32_t to);
uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index);
void historySend(uint32_t secAgo, uint32_t to);
uint32_t getNumAvailablePackets(NodeNum dest, uint32_t last_time);
/**
* Send our payload into the mesh
*/
void sendPayload(NodeNum dest = NODENUM_BROADCAST, uint32_t packetHistory_index = 0);
bool sendPayload(NodeNum dest = NODENUM_BROADCAST, uint32_t packetHistory_index = 0);
meshtastic_MeshPacket *preparePayload(NodeNum dest, uint32_t packetHistory_index, bool local = false);
void sendMessage(NodeNum dest, const meshtastic_StoreAndForward &payload);
void sendMessage(NodeNum dest, meshtastic_StoreAndForward_RequestResponse rr);
void sendErrorTextMessage(NodeNum dest, bool want_response);
meshtastic_MeshPacket *getForPhone();
// Returns true if we are configured as server AND we could allocate PSRAM.
bool isServer() { return is_server; }
virtual meshtastic_MeshPacket *allocReply() override;
/*
-Override the wantPacket method.
*/