Merge pull request #1429 from mc-hamster/compression

Compression
This commit is contained in:
Jm Casler 2022-05-24 18:19:11 -07:00 committed by GitHub
commit 5678221ead
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 299 additions and 271 deletions

View File

@ -144,7 +144,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
//#define DISABLE_NTP //#define DISABLE_NTP
// Disable the welcome screen and allow // Disable the welcome screen and allow
// #define DISABLE_WELCOME_UNSET //#define DISABLE_WELCOME_UNSET
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// OLED & Input // OLED & Input

View File

@ -179,9 +179,6 @@ size_t PhoneAPI::getFromRadio(uint8_t *buf)
// Encapsulate as a FromRadio packet // Encapsulate as a FromRadio packet
fromRadioScratch.which_payloadVariant = FromRadio_packet_tag; fromRadioScratch.which_payloadVariant = FromRadio_packet_tag;
fromRadioScratch.packet = *packetForPhone; fromRadioScratch.packet = *packetForPhone;
// TODO: Remove with compression rework
fromRadioScratch.packet.decoded.which_payloadVariant = Data_payload_tag;
} }
releasePhonePacket(); releasePhonePacket();
break; break;

View File

@ -257,7 +257,7 @@ void printPacket(const char *prefix, const MeshPacket *p)
DEBUG_MSG(" rxSNR=%g", p->rx_snr); DEBUG_MSG(" rxSNR=%g", p->rx_snr);
} }
if (p->rx_rssi != 0) { if (p->rx_rssi != 0) {
DEBUG_MSG(" rxSNR=%g", p->rx_rssi); DEBUG_MSG(" rxRSSI=%g", p->rx_rssi);
} }
if (p->priority != 0) if (p->priority != 0)
DEBUG_MSG(" priority=%d", p->priority); DEBUG_MSG(" priority=%d", p->priority);

View File

@ -93,6 +93,12 @@ bool RadioLibInterface::canSendImmediately()
/// bluetooth comms code. If the txmit queue is empty it might return an error /// bluetooth comms code. If the txmit queue is empty it might return an error
ErrorCode RadioLibInterface::send(MeshPacket *p) ErrorCode RadioLibInterface::send(MeshPacket *p)
{ {
#ifndef DISABLE_WELCOME_UNSET
if (config.lora.region != Config_LoRaConfig_RegionCode_Unset) {
if (disabled || config.lora.tx_disabled) {
if (config.lora.region != Config_LoRaConfig_RegionCode_Unset) { if (config.lora.region != Config_LoRaConfig_RegionCode_Unset) {
if (disabled || config.lora.tx_disabled) { if (disabled || config.lora.tx_disabled) {
DEBUG_MSG("send - lora_tx_disabled\n"); DEBUG_MSG("send - lora_tx_disabled\n");
@ -105,6 +111,18 @@ ErrorCode RadioLibInterface::send(MeshPacket *p)
packetPool.release(p); packetPool.release(p);
return ERRNO_DISABLED; return ERRNO_DISABLED;
} }
}
}
#else
if (disabled || config.lora.tx_disabled) {
DEBUG_MSG("send - lora_tx_disabled\n");
packetPool.release(p);
return ERRNO_DISABLED;
}
#endif
// Sometimes when testing it is useful to be able to never turn on the xmitter // Sometimes when testing it is useful to be able to never turn on the xmitter
#ifndef LORA_DISABLE_SENDING #ifndef LORA_DISABLE_SENDING
@ -128,20 +146,20 @@ ErrorCode RadioLibInterface::send(MeshPacket *p)
packetPool.release(p); packetPool.release(p);
return ERRNO_DISABLED; return ERRNO_DISABLED;
#endif #endif
} }
bool RadioLibInterface::canSleep() bool RadioLibInterface::canSleep()
{ {
bool res = txQueue.empty(); bool res = txQueue.empty();
if (!res) // only print debug messages if we are vetoing sleep if (!res) // only print debug messages if we are vetoing sleep
DEBUG_MSG("radio wait to sleep, txEmpty=%d\n", res); DEBUG_MSG("radio wait to sleep, txEmpty=%d\n", res);
return res; return res;
} }
/** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */
bool RadioLibInterface::cancelSending(NodeNum from, PacketId id) bool RadioLibInterface::cancelSending(NodeNum from, PacketId id)
{ {
auto p = txQueue.remove(from, id); auto p = txQueue.remove(from, id);
if (p) if (p)
packetPool.release(p); // free the packet we just removed packetPool.release(p); // free the packet we just removed
@ -149,23 +167,23 @@ bool RadioLibInterface::cancelSending(NodeNum from, PacketId id)
bool result = (p != NULL); bool result = (p != NULL);
DEBUG_MSG("cancelSending id=0x%x, removed=%d\n", id, result); DEBUG_MSG("cancelSending id=0x%x, removed=%d\n", id, result);
return result; return result;
} }
/** radio helper thread callback. /** radio helper thread callback.
We never immediately transmit after any operation (either rx or tx). Instead we should start receiving and We never immediately transmit after any operation (either rx or tx). Instead we should start receiving and
wait a random delay of 100ms to 100ms+shortPacketMsec to make sure we are not stomping on someone else. The 100ms delay at the wait a random delay of 100ms to 100ms+shortPacketMsec to make sure we are not stomping on someone else. The 100ms delay
beginning ensures all possible listeners have had time to finish processing the previous packet and now have their radio in RX at the beginning ensures all possible listeners have had time to finish processing the previous packet and now have their
state. The up to 100ms+shortPacketMsec random delay gives a chance for all possible senders to have high odds of detecting that radio in RX state. The up to 100ms+shortPacketMsec random delay gives a chance for all possible senders to have high odds
someone else started transmitting first and then they will wait until that packet finishes. of detecting that someone else started transmitting first and then they will wait until that packet finishes.
NOTE: the large flood rebroadcast delay might still be needed even with this approach. Because we might not be able to hear other NOTE: the large flood rebroadcast delay might still be needed even with this approach. Because we might not be able to
transmitters that we are potentially stomping on. Requires further thought. hear other transmitters that we are potentially stomping on. Requires further thought.
FIXME, the MIN_TX_WAIT_MSEC and MAX_TX_WAIT_MSEC values should be tuned via logic analyzer later. FIXME, the MIN_TX_WAIT_MSEC and MAX_TX_WAIT_MSEC values should be tuned via logic analyzer later.
*/ */
void RadioLibInterface::onNotify(uint32_t notification) void RadioLibInterface::onNotify(uint32_t notification)
{ {
switch (notification) { switch (notification) {
case ISR_TX: case ISR_TX:
handleTransmitInterrupt(); handleTransmitInterrupt();
@ -210,10 +228,10 @@ void RadioLibInterface::onNotify(uint32_t notification)
default: default:
assert(0); // We expected to receive a valid notification from the ISR assert(0); // We expected to receive a valid notification from the ISR
} }
} }
void RadioLibInterface::setTransmitDelay() void RadioLibInterface::setTransmitDelay()
{ {
MeshPacket *p = txQueue.getFront(); MeshPacket *p = txQueue.getFront();
// We want all sending/receiving to be done by our daemon thread. // We want all sending/receiving to be done by our daemon thread.
// We use a delay here because this packet might have been sent in response to a packet we just received. // We use a delay here because this packet might have been sent in response to a packet we just received.
@ -230,39 +248,39 @@ void RadioLibInterface::setTransmitDelay()
DEBUG_MSG("rx_snr found. hop_limit:%d rx_snr:%f\n", p->hop_limit, p->rx_snr); DEBUG_MSG("rx_snr found. hop_limit:%d rx_snr:%f\n", p->hop_limit, p->rx_snr);
startTransmitTimerSNR(p->rx_snr); startTransmitTimerSNR(p->rx_snr);
} }
} }
void RadioLibInterface::startTransmitTimer(bool withDelay) void RadioLibInterface::startTransmitTimer(bool withDelay)
{ {
// If we have work to do and the timer wasn't already scheduled, schedule it now // If we have work to do and the timer wasn't already scheduled, schedule it now
if (!txQueue.empty()) { if (!txQueue.empty()) {
uint32_t delay = !withDelay ? 1 : getTxDelayMsec(); uint32_t delay = !withDelay ? 1 : getTxDelayMsec();
// DEBUG_MSG("xmit timer %d\n", delay); // DEBUG_MSG("xmit timer %d\n", delay);
notifyLater(delay, TRANSMIT_DELAY_COMPLETED, false); // This will implicitly enable notifyLater(delay, TRANSMIT_DELAY_COMPLETED, false); // This will implicitly enable
} }
} }
void RadioLibInterface::startTransmitTimerSNR(float snr) void RadioLibInterface::startTransmitTimerSNR(float snr)
{ {
// If we have work to do and the timer wasn't already scheduled, schedule it now // If we have work to do and the timer wasn't already scheduled, schedule it now
if (!txQueue.empty()) { if (!txQueue.empty()) {
uint32_t delay = getTxDelayMsecWeighted(snr); uint32_t delay = getTxDelayMsecWeighted(snr);
// DEBUG_MSG("xmit timer %d\n", delay); // DEBUG_MSG("xmit timer %d\n", delay);
notifyLater(delay, TRANSMIT_DELAY_COMPLETED, false); // This will implicitly enable notifyLater(delay, TRANSMIT_DELAY_COMPLETED, false); // This will implicitly enable
} }
} }
void RadioLibInterface::handleTransmitInterrupt() void RadioLibInterface::handleTransmitInterrupt()
{ {
// DEBUG_MSG("handling lora TX interrupt\n"); // DEBUG_MSG("handling lora TX interrupt\n");
// This can be null if we forced the device to enter standby mode. In that case // This can be null if we forced the device to enter standby mode. In that case
// ignore the transmit interrupt // ignore the transmit interrupt
if (sendingPacket) if (sendingPacket)
completeSending(); completeSending();
} }
void RadioLibInterface::completeSending() void RadioLibInterface::completeSending()
{ {
// We are careful to clear sending packet before calling printPacket because // We are careful to clear sending packet before calling printPacket because
// that can take a long time // that can take a long time
auto p = sendingPacket; auto p = sendingPacket;
@ -276,10 +294,10 @@ void RadioLibInterface::completeSending()
packetPool.release(p); packetPool.release(p);
// DEBUG_MSG("Done with send\n"); // DEBUG_MSG("Done with send\n");
} }
} }
void RadioLibInterface::handleReceiveInterrupt() void RadioLibInterface::handleReceiveInterrupt()
{ {
uint32_t xmitMsec; uint32_t xmitMsec;
assert(isReceiving); assert(isReceiving);
isReceiving = false; isReceiving = false;
@ -339,11 +357,11 @@ void RadioLibInterface::handleReceiveInterrupt()
deliverToReceiver(mp); deliverToReceiver(mp);
} }
} }
} }
/** start an immediate transmit */ /** start an immediate transmit */
void RadioLibInterface::startSend(MeshPacket *txp) void RadioLibInterface::startSend(MeshPacket * txp)
{ {
printPacket("Starting low level send", txp); printPacket("Starting low level send", txp);
if (disabled || config.lora.tx_disabled) { if (disabled || config.lora.tx_disabled) {
DEBUG_MSG("startSend is dropping tx packet because we are disabled\n"); DEBUG_MSG("startSend is dropping tx packet because we are disabled\n");
@ -364,7 +382,8 @@ void RadioLibInterface::startSend(MeshPacket *txp)
startReceive(); // Restart receive mode (because startTransmit failed to put us in xmit mode) startReceive(); // Restart receive mode (because startTransmit failed to put us in xmit mode)
} }
// Must be done AFTER, starting transmit, because startTransmit clears (possibly stale) interrupt pending register bits // Must be done AFTER, starting transmit, because startTransmit clears (possibly stale) interrupt pending register
// bits
enableInterrupt(isrTxLevel0); enableInterrupt(isrTxLevel0);
} }
} }

View File

@ -274,6 +274,9 @@ void Router::sniffReceived(const MeshPacket *p, const Routing *c)
bool perhapsDecode(MeshPacket *p) bool perhapsDecode(MeshPacket *p)
{ {
// DEBUG_MSG("\n\n** perhapsDecode payloadVariant - %d\n\n", p->which_payloadVariant);
if (p->which_payloadVariant == MeshPacket_decoded_tag) if (p->which_payloadVariant == MeshPacket_decoded_tag)
return true; // If packet was already decoded just return return true; // If packet was already decoded just return
@ -304,9 +307,31 @@ bool perhapsDecode(MeshPacket *p)
p->which_payloadVariant = MeshPacket_decoded_tag; // change type to decoded p->which_payloadVariant = MeshPacket_decoded_tag; // change type to decoded
p->channel = chIndex; // change to store the index instead of the hash p->channel = chIndex; // change to store the index instead of the hash
/*
if (p->decoded.portnum == PortNum_TEXT_MESSAGE_APP) {
DEBUG_MSG("\n\n** TEXT_MESSAGE_APP\n");
} else if (p->decoded.portnum == PortNum_TEXT_MESSAGE_COMPRESSED_APP) {
DEBUG_MSG("\n\n** PortNum_TEXT_MESSAGE_COMPRESSED_APP\n");
}
*/
// Decompress if needed. jm // Decompress if needed. jm
if (p->decoded.which_payloadVariant == Data_payload_compressed_tag) { if (p->decoded.portnum == PortNum_TEXT_MESSAGE_COMPRESSED_APP) {
// Decompress the file // Decompress the payload
char compressed_in[Constants_DATA_PAYLOAD_LEN] = {};
char decompressed_out[Constants_DATA_PAYLOAD_LEN] = {};
int decompressed_len;
memcpy(compressed_in, p->decoded.payload.bytes, p->decoded.payload.size);
decompressed_len = unishox2_decompress_simple(compressed_in, p->decoded.payload.size, decompressed_out);
// DEBUG_MSG("\n\n**\n\nDecompressed length - %d \n", decompressed_len);
memcpy(p->decoded.payload.bytes, decompressed_out, decompressed_len);
// Switch the port from PortNum_TEXT_MESSAGE_COMPRESSED_APP to PortNum_TEXT_MESSAGE_APP
p->decoded.portnum = PortNum_TEXT_MESSAGE_APP;
} }
printPacket("decoded message", p); printPacket("decoded message", p);
@ -339,41 +364,28 @@ Routing_Error perhapsEncode(MeshPacket *p)
char compressed_out[Constants_DATA_PAYLOAD_LEN] = {0}; char compressed_out[Constants_DATA_PAYLOAD_LEN] = {0};
int compressed_len; int compressed_len;
// compressed_len = unishox2_compress_simple(original_payload, p->decoded.payload.size, compressed_out); compressed_len = unishox2_compress_simple(original_payload, p->decoded.payload.size, compressed_out);
Serial.print("Original length - "); DEBUG_MSG("Original length - %d \n", p->decoded.payload.size);
Serial.println(p->decoded.payload.size); DEBUG_MSG("Compressed length - %d \n", compressed_len);
DEBUG_MSG("Original message - %s \n", p->decoded.payload.bytes);
Serial.print("Compressed length - ");
Serial.println(compressed_len);
// Serial.println(compressed_out);
// If the compressed length is greater than or equal to the original size, don't use the compressed form // If the compressed length is greater than or equal to the original size, don't use the compressed form
if (compressed_len >= p->decoded.payload.size) { if (compressed_len >= p->decoded.payload.size) {
DEBUG_MSG("Not compressing message. Not enough benefit from doing so.\n"); DEBUG_MSG("Not using compressing message.\n");
// Set the uncompressed payload varient anyway. Shouldn't hurt? // Set the uncompressed payload varient anyway. Shouldn't hurt?
p->decoded.which_payloadVariant = Data_payload_tag; // p->decoded.which_payloadVariant = Data_payload_tag;
// Otherwise we use the compressor // Otherwise we use the compressor
} else { } else {
DEBUG_MSG("Compressing message.\n"); DEBUG_MSG("Using compressed message.\n");
// Copy the compressed data into the meshpacket // Copy the compressed data into the meshpacket
// p->decoded.payload_compressed.size = compressed_len;
// memcpy(p->decoded.payload_compressed.bytes, compressed_out, compressed_len);
// p->decoded.which_payloadVariant = Data_payload_compressed_tag; p->decoded.payload.size = compressed_len;
} memcpy(p->decoded.payload.bytes, compressed_out, compressed_len);
if (0) { p->decoded.portnum = PortNum_TEXT_MESSAGE_COMPRESSED_APP;
char decompressed_out[Constants_DATA_PAYLOAD_LEN] = {};
int decompressed_len;
// decompressed_len = unishox2_decompress_simple(compressed_out, compressed_len, decompressed_out);
Serial.print("Decompressed length - ");
Serial.println(decompressed_len);
Serial.println(decompressed_out);
} }
} }

View File

@ -36,8 +36,8 @@ int32_t RangeTestModule::runOnce()
without having to configure it from the PythonAPI or WebUI. without having to configure it from the PythonAPI or WebUI.
*/ */
// moduleConfig.range_test.enabled = 1; //moduleConfig.range_test.enabled = 1;
// moduleConfig.range_test.sender = 45; //moduleConfig.range_test.sender = 30;
// moduleConfig.range_test.save = 1; // moduleConfig.range_test.save = 1;
// Fixed position is useful when testing indoors. // Fixed position is useful when testing indoors.
@ -115,7 +115,7 @@ void RangeTestModuleRadio::sendPayload(NodeNum dest, bool wantReplies)
packetSequence++; packetSequence++;
static char heartbeatString[20]; static char heartbeatString[MAX_RHPACKETLEN];
snprintf(heartbeatString, sizeof(heartbeatString), "seq %u", packetSequence); snprintf(heartbeatString, sizeof(heartbeatString), "seq %u", packetSequence);
p->decoded.payload.size = strlen(heartbeatString); // You must specify how many bytes are in the reply p->decoded.payload.size = strlen(heartbeatString); // You must specify how many bytes are in the reply