move streamapi into a thread, saves power and increases responsiveness

This commit is contained in:
Kevin Hester 2021-03-25 06:15:15 +08:00
parent e17fe7e075
commit b20930c111
13 changed files with 121 additions and 114 deletions

View File

@ -6,25 +6,28 @@
#define Port Serial #define Port Serial
SerialConsole console; SerialConsole *console;
void consoleInit()
{
new SerialConsole(); // Must be dynamically allocated because we are now inheriting from thread
}
void consolePrintf(const char *format, ...) void consolePrintf(const char *format, ...)
{ {
va_list arg; va_list arg;
va_start(arg, format); va_start(arg, format);
console.vprintf(format, arg); console->vprintf(format, arg);
va_end(arg); va_end(arg);
} }
SerialConsole::SerialConsole() : StreamAPI(&Port), RedirectablePrint(&Port) SerialConsole::SerialConsole() : StreamAPI(&Port), RedirectablePrint(&Port)
{ {
assert(!console);
console = this;
canWrite = false; // We don't send packets to our port until it has talked to us first canWrite = false; // We don't send packets to our port until it has talked to us first
// setDestination(&noopPrint); for testing, try turning off 'all' debug output and see what leaks // setDestination(&noopPrint); for testing, try turning off 'all' debug output and see what leaks
}
/// Do late init that can't happen at constructor time
void SerialConsole::init()
{
Port.begin(SERIAL_BAUD); Port.begin(SERIAL_BAUD);
StreamAPI::init(); StreamAPI::init();
emitRebooted(); emitRebooted();
@ -34,14 +37,14 @@ void SerialConsole::init()
* we override this to notice when we've received a protobuf over the serial * we override this to notice when we've received a protobuf over the serial
* stream. Then we shunt off debug serial output. * stream. Then we shunt off debug serial output.
*/ */
void SerialConsole::handleToRadio(const uint8_t *buf, size_t len) bool SerialConsole::handleToRadio(const uint8_t *buf, size_t len)
{ {
// Turn off debug serial printing once the API is activated, because other threads could print and corrupt packets // Turn off debug serial printing once the API is activated, because other threads could print and corrupt packets
if (!radioConfig.preferences.debug_log_enabled) if (!radioConfig.preferences.debug_log_enabled)
setDestination(&noopPrint); setDestination(&noopPrint);
canWrite = true; canWrite = true;
StreamAPI::handleToRadio(buf, len); return StreamAPI::handleToRadio(buf, len);
} }
/// Hookable to find out when connection changes /// Hookable to find out when connection changes

View File

@ -11,14 +11,11 @@ class SerialConsole : public StreamAPI, public RedirectablePrint
public: public:
SerialConsole(); SerialConsole();
/// Do late init that can't happen at constructor time
virtual void init();
/** /**
* we override this to notice when we've received a protobuf over the serial stream. Then we shunt off * we override this to notice when we've received a protobuf over the serial stream. Then we shunt off
* debug serial output. * debug serial output.
*/ */
virtual void handleToRadio(const uint8_t *buf, size_t len); virtual bool handleToRadio(const uint8_t *buf, size_t len);
virtual size_t write(uint8_t c) virtual size_t write(uint8_t c)
{ {
@ -34,5 +31,6 @@ class SerialConsole : public StreamAPI, public RedirectablePrint
// A simple wrapper to allow non class aware code write to the console // A simple wrapper to allow non class aware code write to the console
void consolePrintf(const char *format, ...); void consolePrintf(const char *format, ...);
void consoleInit();
extern SerialConsole console; extern SerialConsole *console;

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include "configuration.h"
#include "../freertosinc.h" #include "../freertosinc.h"
namespace concurrency namespace concurrency
@ -28,4 +27,4 @@ class BinarySemaphoreFreeRTOS
#endif #endif
} } // namespace concurrency

View File

@ -457,7 +457,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "SerialConsole.h" #include "SerialConsole.h"
#define DEBUG_PORT console // Serial debug port #define DEBUG_PORT (*console) // Serial debug port
// What platforms should use SEGGER? // What platforms should use SEGGER?
#ifdef NRF52_SERIES #ifdef NRF52_SERIES

View File

@ -177,6 +177,7 @@ class ButtonThread : public OSThread
OneButton userButtonAlt; OneButton userButtonAlt;
#endif #endif
static bool shutdown_on_long_stop; static bool shutdown_on_long_stop;
public: public:
static uint32_t longPressTime; static uint32_t longPressTime;
@ -250,15 +251,15 @@ class ButtonThread : public OSThread
power->shutdown(); power->shutdown();
} }
#elif NRF52_SERIES #elif NRF52_SERIES
// Do actual shutdown when button released, otherwise the button release // Do actual shutdown when button released, otherwise the button release
// may wake the board immediatedly. // may wake the board immediatedly.
if (!shutdown_on_long_stop) { if (!shutdown_on_long_stop) {
DEBUG_MSG("Shutdown from long press"); DEBUG_MSG("Shutdown from long press");
playBeep(); playBeep();
ledOff(PIN_LED1); ledOff(PIN_LED1);
ledOff(PIN_LED2); ledOff(PIN_LED2);
shutdown_on_long_stop = true; shutdown_on_long_stop = true;
} }
#endif #endif
} else { } else {
// DEBUG_MSG("Long press %u\n", (millis() - longPressTime)); // DEBUG_MSG("Long press %u\n", (millis() - longPressTime));
@ -315,9 +316,8 @@ void setup()
SEGGER_RTT_ConfigUpBuffer(0, NULL, NULL, 0, SEGGER_RTT_MODE_NO_BLOCK_TRIM); SEGGER_RTT_ConfigUpBuffer(0, NULL, NULL, 0, SEGGER_RTT_MODE_NO_BLOCK_TRIM);
#endif #endif
// Debug
#ifdef DEBUG_PORT #ifdef DEBUG_PORT
DEBUG_PORT.init(); // Set serial baud rate and init our mesh console consoleInit(); // Set serial baud rate and init our mesh console
#endif #endif
initDeepSleep(); initDeepSleep();
@ -580,10 +580,6 @@ void loop()
{ {
// axpDebugOutput.loop(); // axpDebugOutput.loop();
#ifdef DEBUG_PORT
DEBUG_PORT.loop(); // Send/receive protobufs over the serial port
#endif
// heap_caps_check_integrity_all(true); // FIXME - disable this expensive check // heap_caps_check_integrity_all(true); // FIXME - disable this expensive check
#ifndef NO_ESP32 #ifndef NO_ESP32

View File

@ -114,7 +114,7 @@ bool MeshService::reloadConfig()
void MeshService::reloadOwner() void MeshService::reloadOwner()
{ {
assert(nodeInfoPlugin); assert(nodeInfoPlugin);
if(nodeInfoPlugin) if (nodeInfoPlugin)
nodeInfoPlugin->sendOurNodeInfo(); nodeInfoPlugin->sendOurNodeInfo();
nodeDB.saveToDisk(); nodeDB.saveToDisk();
} }
@ -172,13 +172,12 @@ void MeshService::sendNetworkPing(NodeNum dest, bool wantReplies)
assert(node); assert(node);
if (node->has_position) { if (node->has_position) {
if(positionPlugin) { if (positionPlugin) {
DEBUG_MSG("Sending position ping to 0x%x, wantReplies=%d\n", dest, wantReplies); DEBUG_MSG("Sending position ping to 0x%x, wantReplies=%d\n", dest, wantReplies);
positionPlugin->sendOurPosition(dest, wantReplies); positionPlugin->sendOurPosition(dest, wantReplies);
} }
} } else {
else { if (nodeInfoPlugin) {
if(nodeInfoPlugin) {
DEBUG_MSG("Sending nodeinfo ping to 0x%x, wantReplies=%d\n", dest, wantReplies); DEBUG_MSG("Sending nodeinfo ping to 0x%x, wantReplies=%d\n", dest, wantReplies);
nodeInfoPlugin->sendOurNodeInfo(dest, wantReplies); nodeInfoPlugin->sendOurNodeInfo(dest, wantReplies);
} }

View File

@ -84,13 +84,12 @@ class MeshService
NodeInfo *refreshMyNodeInfo(); NodeInfo *refreshMyNodeInfo();
private: private:
/// Called when our gps position has changed - updates nodedb and sends Location message out into the mesh /// Called when our gps position has changed - updates nodedb and sends Location message out into the mesh
/// returns 0 to allow futher processing /// returns 0 to allow futher processing
int onGPSChanged(const meshtastic::GPSStatus *arg); int onGPSChanged(const meshtastic::GPSStatus *arg);
/// Handle a packet that just arrived from the radio. This method does _ReliableRouternot_ free the provided packet. If it needs /// Handle a packet that just arrived from the radio. This method does _ReliableRouternot_ free the provided packet. If it
/// to keep the packet around it makes a copy /// needs to keep the packet around it makes a copy
int handleFromRadio(const MeshPacket *p); int handleFromRadio(const MeshPacket *p);
friend class RoutingPlugin; friend class RoutingPlugin;
}; };

View File

@ -1,10 +1,10 @@
#include "PhoneAPI.h" #include "PhoneAPI.h"
#include "Channels.h"
#include "GPS.h" #include "GPS.h"
#include "MeshService.h" #include "MeshService.h"
#include "NodeDB.h" #include "NodeDB.h"
#include "PowerFSM.h" #include "PowerFSM.h"
#include "RadioInterface.h" #include "RadioInterface.h"
#include "Channels.h"
#include <assert.h> #include <assert.h>
#if FromRadio_size > MAX_TO_FROM_RADIO_SIZE #if FromRadio_size > MAX_TO_FROM_RADIO_SIZE
@ -22,16 +22,18 @@ void PhoneAPI::init()
observe(&service.fromNumChanged); observe(&service.fromNumChanged);
} }
PhoneAPI::~PhoneAPI() { PhoneAPI::~PhoneAPI()
{
close(); close();
} }
void PhoneAPI::close() { void PhoneAPI::close()
{
unobserve(); unobserve();
state = STATE_SEND_NOTHING; state = STATE_SEND_NOTHING;
bool oldConnected = isConnected; bool oldConnected = isConnected;
isConnected = false; isConnected = false;
if(oldConnected != isConnected) if (oldConnected != isConnected)
onConnectionChanged(isConnected); onConnectionChanged(isConnected);
} }
@ -49,7 +51,7 @@ void PhoneAPI::checkConnectionTimeout()
/** /**
* Handle a ToRadio protobuf * Handle a ToRadio protobuf
*/ */
void PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength) bool PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength)
{ {
powerFSM.trigger(EVENT_CONTACT_FROM_PHONE); // As long as the phone keeps talking to us, don't let the radio go to sleep powerFSM.trigger(EVENT_CONTACT_FROM_PHONE); // As long as the phone keeps talking to us, don't let the radio go to sleep
lastContactMsec = millis(); lastContactMsec = millis();
@ -62,12 +64,9 @@ void PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength)
memset(&toRadioScratch, 0, sizeof(toRadioScratch)); memset(&toRadioScratch, 0, sizeof(toRadioScratch));
if (pb_decode_from_bytes(buf, bufLength, ToRadio_fields, &toRadioScratch)) { if (pb_decode_from_bytes(buf, bufLength, ToRadio_fields, &toRadioScratch)) {
switch (toRadioScratch.which_payloadVariant) { switch (toRadioScratch.which_payloadVariant) {
case ToRadio_packet_tag: { case ToRadio_packet_tag:
MeshPacket &p = toRadioScratch.packet; return handleToRadioPacket(toRadioScratch.packet);
printPacket("PACKET FROM PHONE", &p);
service.handleToRadio(p);
break;
}
case ToRadio_want_config_id_tag: case ToRadio_want_config_id_tag:
config_nonce = toRadioScratch.want_config_id; config_nonce = toRadioScratch.want_config_id;
DEBUG_MSG("Client wants config, nonce=%u\n", config_nonce); DEBUG_MSG("Client wants config, nonce=%u\n", config_nonce);
@ -86,6 +85,8 @@ void PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength)
} else { } else {
DEBUG_MSG("Error: ignoring malformed toradio\n"); DEBUG_MSG("Error: ignoring malformed toradio\n");
} }
return false;
} }
/** /**
@ -127,7 +128,7 @@ size_t PhoneAPI::getFromRadio(uint8_t *buf)
fromRadioScratch.my_info = myNodeInfo; fromRadioScratch.my_info = myNodeInfo;
state = STATE_SEND_NODEINFO; state = STATE_SEND_NODEINFO;
service.refreshMyNodeInfo(); // Update my NodeInfo because the client will be asking for it soon. service.refreshMyNodeInfo(); // Update my NodeInfo because the client will be asking for it soon.
break; break;
case STATE_SEND_NODEINFO: { case STATE_SEND_NODEINFO: {
@ -226,7 +227,13 @@ bool PhoneAPI::available()
/** /**
* Handle a packet that the phone wants us to send. It is our responsibility to free the packet to the pool * Handle a packet that the phone wants us to send. It is our responsibility to free the packet to the pool
*/ */
void PhoneAPI::handleToRadioPacket(MeshPacket *p) {} bool PhoneAPI::handleToRadioPacket(MeshPacket &p)
{
printPacket("PACKET FROM PHONE", &p);
service.handleToRadio(p);
return true;
}
/// If the mesh service tells us fromNum has changed, tell the phone /// If the mesh service tells us fromNum has changed, tell the phone
int PhoneAPI::onNotify(uint32_t newValue) int PhoneAPI::onNotify(uint32_t newValue)

View File

@ -67,8 +67,9 @@ class PhoneAPI
/** /**
* Handle a ToRadio protobuf * Handle a ToRadio protobuf
* @return true true if a packet was queued for sending (so that caller can yield)
*/ */
virtual void handleToRadio(const uint8_t *buf, size_t len); virtual bool handleToRadio(const uint8_t *buf, size_t len);
/** /**
* Get the next packet we want to send to the phone * Get the next packet we want to send to the phone
@ -93,7 +94,7 @@ class PhoneAPI
/// Hookable to find out when connection changes /// Hookable to find out when connection changes
virtual void onConnectionChanged(bool connected) {} virtual void onConnectionChanged(bool connected) {}
/// If we haven't heard from the other side in a while then say not connected /// If we haven't heard from the other side in a while then say not connected
void checkConnectionTimeout(); void checkConnectionTimeout();
/** /**
@ -103,9 +104,10 @@ class PhoneAPI
private: private:
/** /**
* Handle a packet that the phone wants us to send. It is our responsibility to free the packet to the pool * Handle a packet that the phone wants us to send. We can write to it but can not keep a reference to it
* @return true true if a packet was queued for sending
*/ */
void handleToRadioPacket(MeshPacket *p); bool handleToRadioPacket(MeshPacket &p);
/// If the mesh service tells us fromNum has changed, tell the phone /// If the mesh service tells us fromNum has changed, tell the phone
virtual int onNotify(uint32_t newValue); virtual int onNotify(uint32_t newValue);

View File

@ -5,48 +5,62 @@
#define START2 0xc3 #define START2 0xc3
#define HEADER_LEN 4 #define HEADER_LEN 4
void StreamAPI::loop() int32_t StreamAPI::runOnce()
{ {
auto result = readStream();
writeStream(); writeStream();
readStream();
checkConnectionTimeout(); checkConnectionTimeout();
return result;
} }
/** /**
* Read any rx chars from the link and call handleToRadio * Read any rx chars from the link and call handleToRadio
*/ */
void StreamAPI::readStream() int32_t StreamAPI::readStream()
{ {
while (stream->available()) { // Currently we never want to block uint32_t now = millis();
uint8_t c = stream->read(); if (!stream->available()) {
// Nothing available this time, if the computer has talked to us recently, poll often, otherwise let CPU sleep a long time
bool recentRx = (now - lastRxMsec) < 2000;
return recentRx ? 5 : 250;
} else {
while (stream->available()) { // Currently we never want to block
uint8_t c = stream->read();
// Use the read pointer for a little state machine, first look for framing, then length bytes, then payload // Use the read pointer for a little state machine, first look for framing, then length bytes, then payload
size_t ptr = rxPtr++; // assume we will probably advance the rxPtr size_t ptr = rxPtr++; // assume we will probably advance the rxPtr
rxBuf[ptr] = c; // store all bytes (including framing) rxBuf[ptr] = c; // store all bytes (including framing)
if (ptr == 0) { // looking for START1 if (ptr == 0) { // looking for START1
if (c != START1) if (c != START1)
rxPtr = 0; // failed to find framing rxPtr = 0; // failed to find framing
} else if (ptr == 1) { // looking for START2 } else if (ptr == 1) { // looking for START2
if (c != START2) if (c != START2)
rxPtr = 0; // failed to find framing rxPtr = 0; // failed to find framing
} else if (ptr >= HEADER_LEN) { // we have at least read our 4 byte framing } else if (ptr >= HEADER_LEN) { // we have at least read our 4 byte framing
uint32_t len = (rxBuf[2] << 8) + rxBuf[3]; // big endian 16 bit length follows framing uint32_t len = (rxBuf[2] << 8) + rxBuf[3]; // big endian 16 bit length follows framing
if (ptr == HEADER_LEN) { if (ptr == HEADER_LEN) {
// we _just_ finished our 4 byte header, validate length now (note: a length of zero is a valid // we _just_ finished our 4 byte header, validate length now (note: a length of zero is a valid
// protobuf also) // protobuf also)
if (len > MAX_TO_FROM_RADIO_SIZE) if (len > MAX_TO_FROM_RADIO_SIZE)
rxPtr = 0; // length is bogus, restart search for framing rxPtr = 0; // length is bogus, restart search for framing
} }
if (rxPtr != 0 && ptr + 1 == len + HEADER_LEN) { if (rxPtr != 0 && ptr + 1 == len + HEADER_LEN) {
// If we didn't just fail the packet and we now have the right # of bytes, parse it rxPtr = 0; // start over again on the next packet
handleToRadio(rxBuf + HEADER_LEN, len);
rxPtr = 0; // start over again // If we didn't just fail the packet and we now have the right # of bytes, parse it
if (handleToRadio(rxBuf + HEADER_LEN, len))
return 0; // we want to be called again ASAP because we still have more work to do
}
} }
} }
// we had packets available this time, so assume we might have them next time also
lastRxMsec = now;
return 0;
} }
} }
@ -71,7 +85,7 @@ void StreamAPI::writeStream()
void StreamAPI::emitTxBuffer(size_t len) void StreamAPI::emitTxBuffer(size_t len)
{ {
if (len != 0) { if (len != 0) {
DEBUG_MSG("emit tx %d\n", len); // DEBUG_MSG("emit tx %d\n", len);
txBuf[0] = START1; txBuf[0] = START1;
txBuf[1] = START2; txBuf[1] = START2;
txBuf[2] = (len >> 8) & 0xff; txBuf[2] = (len >> 8) & 0xff;
@ -93,6 +107,6 @@ void StreamAPI::emitRebooted()
fromRadioScratch.which_payloadVariant = FromRadio_rebooted_tag; fromRadioScratch.which_payloadVariant = FromRadio_rebooted_tag;
fromRadioScratch.rebooted = true; fromRadioScratch.rebooted = true;
DEBUG_MSG("Emitting reboot packet for serial shell\n"); // DEBUG_MSG("Emitting reboot packet for serial shell\n");
emitTxBuffer(pb_encode_to_bytes(txBuf + HEADER_LEN, FromRadio_size, FromRadio_fields, &fromRadioScratch)); emitTxBuffer(pb_encode_to_bytes(txBuf + HEADER_LEN, FromRadio_size, FromRadio_fields, &fromRadioScratch));
} }

View File

@ -2,6 +2,7 @@
#include "PhoneAPI.h" #include "PhoneAPI.h"
#include "Stream.h" #include "Stream.h"
#include "concurrency/OSThread.h"
// A To/FromRadio packet + our 32 bit header // A To/FromRadio packet + our 32 bit header
#define MAX_STREAM_BUF_SIZE (MAX_TO_FROM_RADIO_SIZE + sizeof(uint32_t)) #define MAX_STREAM_BUF_SIZE (MAX_TO_FROM_RADIO_SIZE + sizeof(uint32_t))
@ -27,7 +28,7 @@ valid utf8 encoding. This makes it a bit easier to start a device outputting reg
after it has received a valid packet from the PC, turn off unencoded debug printing and switch to this packet encoding. after it has received a valid packet from the PC, turn off unencoded debug printing and switch to this packet encoding.
*/ */
class StreamAPI : public PhoneAPI class StreamAPI : public PhoneAPI, protected concurrency::OSThread
{ {
/** /**
* The stream we read/write from * The stream we read/write from
@ -37,21 +38,23 @@ class StreamAPI : public PhoneAPI
uint8_t rxBuf[MAX_STREAM_BUF_SIZE]; uint8_t rxBuf[MAX_STREAM_BUF_SIZE];
size_t rxPtr = 0; size_t rxPtr = 0;
/// time of last rx, used, to slow down our polling if we haven't heard from anyone
uint32_t lastRxMsec = 0;
public: public:
StreamAPI(Stream *_stream) : stream(_stream) {} StreamAPI(Stream *_stream) : concurrency::OSThread("StreamAPI"), stream(_stream) {}
/** /**
* Currently we require frequent invocation from loop() to check for arrived serial packets and to send new packets to the * Currently we require frequent invocation from loop() to check for arrived serial packets and to send new packets to the
* phone. * phone.
* FIXME, to support better power behavior instead move to a thread and block on serial reads.
*/ */
void loop(); virtual int32_t runOnce();
private: private:
/** /**
* Read any rx chars from the link and call handleToRadio * Read any rx chars from the link and call handleToRadio
*/ */
void readStream(); int32_t readStream();
/** /**
* call getFromRadio() and deliver encapsulated packets to the Stream * call getFromRadio() and deliver encapsulated packets to the Stream
@ -63,7 +66,7 @@ class StreamAPI : public PhoneAPI
* Send a FromRadio.rebooted = true packet to the phone * Send a FromRadio.rebooted = true packet to the phone
*/ */
void emitRebooted(); void emitRebooted();
/** /**
* Send the current txBuffer over our stream * Send the current txBuffer over our stream
*/ */

View File

@ -40,18 +40,20 @@ void WiFiServerAPI::onConnectionChanged(bool connected)
} }
/// override close to also shutdown the TCP link /// override close to also shutdown the TCP link
void WiFiServerAPI::close() { void WiFiServerAPI::close()
{
client.stop(); // drop tcp connection client.stop(); // drop tcp connection
StreamAPI::close(); StreamAPI::close();
} }
bool WiFiServerAPI::loop() int32_t WiFiServerAPI::runOnce()
{ {
if (client.connected()) { if (client.connected()) {
StreamAPI::loop(); return StreamAPI::runOnce();
return true;
} else { } else {
return false; DEBUG_MSG("Client dropped connection, suspending API service\n");
enabled = false; // we no longer need to run
return 0;
} }
} }
@ -78,18 +80,5 @@ int32_t WiFiServerPort::runOnce()
openAPI = new WiFiServerAPI(client); openAPI = new WiFiServerAPI(client);
} }
if (openAPI) { return 100; // only check occasionally for incoming connections
// Allow idle processing so the API can read from its incoming stream
if(!openAPI->loop()) {
// If our API link was up, shut it down
DEBUG_MSG("Client dropped connection, closing API client\n");
// Note: we can't call delete here because this object includes other state
// besides the stream API. Instead kill it later when we start a new instance
delete openAPI;
openAPI = NULL;
}
return 0; // run fast while our API server is running
} else
return 100; // only check occasionally for incoming connections
} }

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include "StreamAPI.h" #include "StreamAPI.h"
#include "concurrency/OSThread.h"
#include <WiFi.h> #include <WiFi.h>
/** /**
@ -18,15 +17,14 @@ class WiFiServerAPI : public StreamAPI
virtual ~WiFiServerAPI(); virtual ~WiFiServerAPI();
/// @return true if we want to keep running, or false if we are ready to be destroyed
virtual bool loop(); // Check for dropped client connections
/// override close to also shutdown the TCP link /// override close to also shutdown the TCP link
virtual void close(); virtual void close();
protected: protected:
/// Hookable to find out when connection changes /// Hookable to find out when connection changes
virtual void onConnectionChanged(bool connected); virtual void onConnectionChanged(bool connected);
virtual int32_t runOnce(); // Check for dropped client connections
}; };
/** /**