Merge pull request #2074 from meshtastic/nice-threads

a lot of thread housekeeping
This commit is contained in:
Thomas Göttgens 2022-12-30 20:58:09 +01:00 committed by GitHub
commit 9a04aaa811
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 150 additions and 97 deletions

View File

@ -287,8 +287,18 @@ void Power::readPowerStatus()
powerStatus2.getIsCharging(), powerStatus2.getBatteryVoltageMv(), powerStatus2.getBatteryChargePercent());
newStatus.notifyObservers(&powerStatus2);
#ifdef DEBUG_HEAP
if (lastheap != ESP.getFreeHeap()){
LOG_DEBUG("Heap status: %d/%d bytes free (%d), running %d threads\n", ESP.getFreeHeap(), ESP.getHeapSize(), ESP.getFreeHeap() - lastheap , concurrency::mainController.size(false));
if (lastheap != ESP.getFreeHeap()) {
LOG_DEBUG("Threads running:");
int running = 0;
for(int i = 0; i < MAX_THREADS; i++){
auto thread = concurrency::mainController.get(i);
if((thread != nullptr) && (thread->enabled)) {
LOG_DEBUG(" %s", thread->ThreadName.c_str());
running++;
}
}
LOG_DEBUG("\n");
LOG_DEBUG("Heap status: %d/%d bytes free (%d), running %d/%d threads\n", ESP.getFreeHeap(), ESP.getHeapSize(), ESP.getFreeHeap() - lastheap, running, concurrency::mainController.size(false));
lastheap = ESP.getFreeHeap();
}
#endif

View File

@ -74,8 +74,18 @@ bool OSThread::shouldRun(unsigned long time)
void OSThread::run()
{
#ifdef DEBUG_HEAP
auto heap = ESP.getFreeHeap();
#endif
currentThread = this;
auto newDelay = runOnce();
#ifdef DEBUG_HEAP
auto newHeap = ESP.getFreeHeap();
if (newHeap < heap)
LOG_DEBUG("------ Thread %s leaked heap %d -> %d (%d) ------\n", ThreadName.c_str(), heap, newHeap, newHeap - heap);
if (heap < newHeap)
LOG_DEBUG("++++++ Thread %s freed heap %d -> %d (%d) ++++++\n", ThreadName.c_str(), heap, newHeap, newHeap - heap);
#endif
runned();
@ -85,6 +95,14 @@ void OSThread::run()
currentThread = NULL;
}
int32_t OSThread::disable()
{
enabled = false;
setInterval(INT32_MAX);
return INT32_MAX;
}
/**
* This flag is set **only** when setup() starts, to provide a way for us to check for sloppy static constructor calls.
* Call assertIsSetup() to force a crash if someone tries to create an instance too early.

View File

@ -53,6 +53,8 @@ class OSThread : public Thread
static void setup();
int32_t disable();
/**
* Wait a specified number msecs starting from the current time (rather than the last time we were run)
*/

View File

@ -54,7 +54,7 @@ int32_t RotaryEncoderInterruptBase::runOnce()
this->action = ROTARY_ACTION_NONE;
return 30000; // TODO: technically this can be MAX_INT
return INT32_MAX;
}
void RotaryEncoderInterruptBase::intPressHandler()

View File

@ -7,7 +7,7 @@ enum RotaryEncoderInterruptBaseStateType { ROTARY_EVENT_OCCURRED, ROTARY_EVENT_C
enum RotaryEncoderInterruptBaseActionType { ROTARY_ACTION_NONE, ROTARY_ACTION_PRESSED, ROTARY_ACTION_CW, ROTARY_ACTION_CCW };
class RotaryEncoderInterruptBase : public Observable<const InputEvent *>, private concurrency::OSThread
class RotaryEncoderInterruptBase : public Observable<const InputEvent *>, public concurrency::OSThread
{
public:
explicit RotaryEncoderInterruptBase(const char *name);

View File

@ -9,6 +9,7 @@ void RotaryEncoderInterruptImpl1::init()
{
if (!moduleConfig.canned_message.rotary1_enabled) {
// Input device is disabled.
disable();
return;
}

View File

@ -12,7 +12,7 @@ void CardKbI2cImpl::init()
{
if (cardkb_found != CARDKB_ADDR)
{
// Input device is not detected.
disable();
return;
}

View File

@ -5,7 +5,7 @@
class KbI2cBase :
public Observable<const InputEvent *>,
private concurrency::OSThread
public concurrency::OSThread
{
public:
explicit KbI2cBase(const char *name);

View File

@ -221,29 +221,32 @@ ErrorCode Router::send(MeshPacket *p)
if (p->which_payload_variant == MeshPacket_decoded_tag) {
ChannelIndex chIndex = p->channel; // keep as a local because we are about to change it
bool shouldActuallyEncrypt = true;
#if HAS_WIFI || HAS_ETHERNET
// check if we should send decrypted packets to mqtt
if(moduleConfig.mqtt.enabled) {
// check if we should send decrypted packets to mqtt
// truth table:
/* mqtt_server mqtt_encryption_enabled should_encrypt
* not set 0 1
* not set 1 1
* set 0 0
* set 1 1
*
* => so we only decrypt mqtt if they have a custom mqtt server AND mqtt_encryption_enabled is FALSE
*/
// truth table:
/* mqtt_server mqtt_encryption_enabled should_encrypt
* not set 0 1
* not set 1 1
* set 0 0
* set 1 1
*
* => so we only decrypt mqtt if they have a custom mqtt server AND mqtt_encryption_enabled is FALSE
*/
bool shouldActuallyEncrypt = true;
if (*moduleConfig.mqtt.address && !moduleConfig.mqtt.encryption_enabled) {
shouldActuallyEncrypt = false;
if (*moduleConfig.mqtt.address && !moduleConfig.mqtt.encryption_enabled) {
shouldActuallyEncrypt = false;
}
LOG_INFO("Should encrypt MQTT?: %d\n", shouldActuallyEncrypt);
// the packet is currently in a decrypted state. send it now if they want decrypted packets
if (mqtt && !shouldActuallyEncrypt)
mqtt->onSend(*p, chIndex);
}
LOG_INFO("Should encrypt MQTT?: %d\n", shouldActuallyEncrypt);
// the packet is currently in a decrypted state. send it now if they want decrypted packets
if (mqtt && !shouldActuallyEncrypt)
mqtt->onSend(*p, chIndex);
#endif
auto encodeResult = perhapsEncode(p);
@ -253,10 +256,12 @@ ErrorCode Router::send(MeshPacket *p)
}
#if HAS_WIFI || HAS_ETHERNET
// the packet is now encrypted.
// check if we should send encrypted packets to mqtt
if (mqtt && shouldActuallyEncrypt)
mqtt->onSend(*p, chIndex);
if (moduleConfig.mqtt.enabled) {
// the packet is now encrypted.
// check if we should send encrypted packets to mqtt
if (mqtt && shouldActuallyEncrypt)
mqtt->onSend(*p, chIndex);
}
#endif
}

View File

@ -165,11 +165,18 @@ void createSSLCert()
WebServerThread *webServerThread;
WebServerThread::WebServerThread() : concurrency::OSThread("WebServerThread") {}
WebServerThread::WebServerThread() : concurrency::OSThread("WebServerThread") {
if (!config.network.wifi_enabled) {
disable();
}
}
int32_t WebServerThread::runOnce()
{
// LOG_DEBUG("WebServerThread::runOnce()\n");
if (!config.network.wifi_enabled) {
disable();
}
handleWebResponse();
if (requestRestart && (millis() / 1000) > requestRestart) {

View File

@ -51,10 +51,14 @@ CannedMessageModule::CannedMessageModule()
if ((this->splitConfiguredMessages() <= 0) && (cardkb_found != CARDKB_ADDR)) {
LOG_INFO("CannedMessageModule: No messages are configured. Module is disabled\n");
this->runState = CANNED_MESSAGE_RUN_STATE_DISABLED;
disable();
} else {
LOG_INFO("CannedMessageModule is enabled\n");
this->inputObserver.observe(inputBroker);
}
} else {
this->runState = CANNED_MESSAGE_RUN_STATE_DISABLED;
disable();
}
}

View File

@ -208,7 +208,7 @@ ExternalNotificationModule::ExternalNotificationModule()
}
} else {
LOG_INFO("External Notification Module Disabled\n");
enabled = false;
disable();
}
}

View File

@ -48,64 +48,66 @@ static uint64_t digitalReads(uint64_t mask)
RemoteHardwareModule::RemoteHardwareModule()
: ProtobufModule("remotehardware", PortNum_REMOTE_HARDWARE_APP, &HardwareMessage_msg), concurrency::OSThread(
"remotehardware")
"RemoteHardwareModule")
{
}
bool RemoteHardwareModule::handleReceivedProtobuf(const MeshPacket &req, HardwareMessage *pptr)
{
auto p = *pptr;
LOG_INFO("Received RemoteHardware typ=%d\n", p.type);
if (moduleConfig.remote_hardware.enabled) {
auto p = *pptr;
LOG_INFO("Received RemoteHardware typ=%d\n", p.type);
switch (p.type) {
case HardwareMessage_Type_WRITE_GPIOS:
// Print notification to LCD screen
screen->print("Write GPIOs\n");
switch (p.type) {
case HardwareMessage_Type_WRITE_GPIOS:
// Print notification to LCD screen
screen->print("Write GPIOs\n");
for (uint8_t i = 0; i < NUM_GPIOS; i++) {
uint64_t mask = 1 << i;
if (p.gpio_mask & mask) {
digitalWrite(i, (p.gpio_value & mask) ? 1 : 0);
for (uint8_t i = 0; i < NUM_GPIOS; i++) {
uint64_t mask = 1 << i;
if (p.gpio_mask & mask) {
digitalWrite(i, (p.gpio_value & mask) ? 1 : 0);
}
}
pinModes(p.gpio_mask, OUTPUT);
break;
case HardwareMessage_Type_READ_GPIOS: {
// Print notification to LCD screen
if (screen)
screen->print("Read GPIOs\n");
uint64_t res = digitalReads(p.gpio_mask);
// Send the reply
HardwareMessage r = HardwareMessage_init_default;
r.type = HardwareMessage_Type_READ_GPIOS_REPLY;
r.gpio_value = res;
r.gpio_mask = p.gpio_mask;
MeshPacket *p2 = allocDataProtobuf(r);
setReplyTo(p2, req);
myReply = p2;
break;
}
pinModes(p.gpio_mask, OUTPUT);
break;
case HardwareMessage_Type_WATCH_GPIOS: {
watchGpios = p.gpio_mask;
lastWatchMsec = 0; // Force a new publish soon
previousWatch = ~watchGpios; // generate a 'previous' value which is guaranteed to not match (to force an initial publish)
enabled = true; // Let our thread run at least once
LOG_INFO("Now watching GPIOs 0x%llx\n", watchGpios);
break;
}
case HardwareMessage_Type_READ_GPIOS: {
// Print notification to LCD screen
if (screen)
screen->print("Read GPIOs\n");
case HardwareMessage_Type_READ_GPIOS_REPLY:
case HardwareMessage_Type_GPIOS_CHANGED:
break; // Ignore - we might see our own replies
uint64_t res = digitalReads(p.gpio_mask);
// Send the reply
HardwareMessage r = HardwareMessage_init_default;
r.type = HardwareMessage_Type_READ_GPIOS_REPLY;
r.gpio_value = res;
r.gpio_mask = p.gpio_mask;
MeshPacket *p2 = allocDataProtobuf(r);
setReplyTo(p2, req);
myReply = p2;
break;
}
case HardwareMessage_Type_WATCH_GPIOS: {
watchGpios = p.gpio_mask;
lastWatchMsec = 0; // Force a new publish soon
previousWatch = ~watchGpios; // generate a 'previous' value which is guaranteed to not match (to force an initial publish)
enabled = true; // Let our thread run at least once
LOG_INFO("Now watching GPIOs 0x%llx\n", watchGpios);
break;
}
case HardwareMessage_Type_READ_GPIOS_REPLY:
case HardwareMessage_Type_GPIOS_CHANGED:
break; // Ignore - we might see our own replies
default:
LOG_ERROR("Hardware operation %d not yet implemented! FIXME\n", p.type);
break;
default:
LOG_ERROR("Hardware operation %d not yet implemented! FIXME\n", p.type);
break;
}
}
return false;
@ -113,7 +115,7 @@ bool RemoteHardwareModule::handleReceivedProtobuf(const MeshPacket &req, Hardwar
int32_t RemoteHardwareModule::runOnce()
{
if (watchGpios) {
if (moduleConfig.remote_hardware.enabled && watchGpios) {
uint32_t now = millis();
if (now - lastWatchMsec >= WATCH_INTERVAL_MSEC) {
@ -133,7 +135,7 @@ int32_t RemoteHardwareModule::runOnce()
}
} else {
// No longer watching anything - stop using CPU
enabled = false;
return disable();
}
return 200; // Poll our GPIOs every 200ms (FIXME, make adjustable via protobuf arg)

View File

@ -220,9 +220,7 @@ int32_t SerialModule::runOnce()
return (10);
} else {
LOG_INFO("Serial Module Disabled\n");
return INT32_MAX;
return disable();
}
}
@ -303,9 +301,6 @@ ProcessMessage SerialModuleRadio::handleReceived(const MeshPacket &mp)
}
}
}
} else {
LOG_INFO("Serial Module Disabled\n");
}
return ProcessMessage::CONTINUE; // Let others look at this message also if they want
}

View File

@ -67,7 +67,7 @@ int32_t EnvironmentTelemetryModule::runOnce()
if (!(moduleConfig.telemetry.environment_measurement_enabled ||
moduleConfig.telemetry.environment_screen_enabled)) {
// If this module is not enabled, and the user doesn't want the display screen don't waste any OSThread time on it
return result;
return disable();
}
if (firstTime) {

View File

@ -139,7 +139,7 @@ AudioModule::AudioModule() : SinglePortModule("AudioModule", PortNum_AUDIO_APP),
LOG_INFO(" using %d frames of %d bytes for a total payload length of %d bytes\n", encode_frame_num, encode_codec_size, encode_frame_size);
xTaskCreate(&run_codec2, "codec2_task", 30000, NULL, 5, &codec2HandlerTask);
} else {
LOG_INFO("Codec2 disabled (AudioModule %d, Region %s, permitted %d)\n", moduleConfig.audio.codec2_enabled, myRegion->name, myRegion->audioPermitted);
disable();
}
}
@ -258,8 +258,7 @@ int32_t AudioModule::runOnce()
}
return 100;
} else {
LOG_INFO("Audio Module Disabled\n");
return INT32_MAX;
return disable();
}
}

View File

@ -82,7 +82,7 @@ int32_t RangeTestModule::runOnce()
return (senderHeartbeat);
} else {
return (INT32_MAX);
return disable();
// This thread does not need to run as a receiver
}
@ -93,7 +93,7 @@ int32_t RangeTestModule::runOnce()
}
#endif
return (INT32_MAX);
return disable();
}
MeshPacket *RangeTestModuleRadio::allocReply()

View File

@ -52,7 +52,7 @@ int32_t StoreForwardModule::runOnce()
return (this->packetTimeMax);
}
#endif
return (INT32_MAX);
return disable();
}
/*
@ -458,6 +458,8 @@ StoreForwardModule::StoreForwardModule()
is_client = true;
LOG_INFO("*** Initializing Store & Forward Module in Client mode\n");
}
} else {
disable();
}
#endif
}

View File

@ -128,12 +128,17 @@ void mqttInit()
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE)
{
assert(!mqtt);
mqtt = this;
if(moduleConfig.mqtt.enabled) {
pubSub.setCallback(mqttCallback);
assert(!mqtt);
mqtt = this;
// preflightSleepObserver.observe(&preflightSleep);
pubSub.setCallback(mqttCallback);
// preflightSleepObserver.observe(&preflightSleep);
} else {
disable();
}
}
bool MQTT::connected()
@ -239,6 +244,9 @@ bool MQTT::wantsLink() const
int32_t MQTT::runOnce()
{
if(!moduleConfig.mqtt.enabled) {
return disable();
}
bool wantConnection = wantsLink();
// If connected poll rapidly, otherwise only occasionally check for a wifi connection change and ability to contact server