diff --git a/arch/nrf52/nrf52.ini b/arch/nrf52/nrf52.ini index ec290a30e..858dcdc9c 100644 --- a/arch/nrf52/nrf52.ini +++ b/arch/nrf52/nrf52.ini @@ -9,7 +9,7 @@ build_flags = -Isrc/platform/nrf52 build_src_filter = - ${arduino_base.build_src_filter} - - - - - - - - - + ${arduino_base.build_src_filter} - - - - - - - - lib_deps= ${arduino_base.lib_deps} diff --git a/arch/rp2040/rp2040.ini b/arch/rp2040/rp2040.ini index 52fba9cba..b6ac4f171 100644 --- a/arch/rp2040/rp2040.ini +++ b/arch/rp2040/rp2040.ini @@ -12,7 +12,7 @@ build_flags = -D__PLAT_RP2040__ # -D _POSIX_THREADS build_src_filter = - ${arduino_base.build_src_filter} - - - - - - - - - + ${arduino_base.build_src_filter} - - - - - - - - lib_ignore = BluetoothOTA diff --git a/arch/stm32/stm32wl5e.ini b/arch/stm32/stm32wl5e.ini index 819ecc31c..524edd6b9 100644 --- a/arch/stm32/stm32wl5e.ini +++ b/arch/stm32/stm32wl5e.ini @@ -13,7 +13,7 @@ build_flags = -DVECT_TAB_OFFSET=0x08000000 build_src_filter = - ${arduino_base.build_src_filter} - - - - - - - - - - - - - + ${arduino_base.build_src_filter} - - - - - - - - - - - - board_upload.offset_address = 0x08000000 upload_protocol = stlink diff --git a/protobufs b/protobufs index e4396fd49..f2d1ebbd3 160000 --- a/protobufs +++ b/protobufs @@ -1 +1 @@ -Subproject commit e4396fd499769f24c265985ae0ee7be05c18f65a +Subproject commit f2d1ebbd3485f6e4814608da0cfc7a82d97305f1 diff --git a/src/main.cpp b/src/main.cpp index 2ff04475b..c7cbb7680 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -47,13 +47,12 @@ NRF52Bluetooth *nrf52Bluetooth; #if HAS_WIFI #include "mesh/api/WiFiServerAPI.h" -#include "mqtt/MQTT.h" #endif #if HAS_ETHERNET #include "mesh/api/ethServerAPI.h" -#include "mqtt/MQTT.h" #endif +#include "mqtt/MQTT.h" #include "LLCC68Interface.h" #include "RF95Interface.h" @@ -656,9 +655,7 @@ void setup() } } -#if HAS_WIFI || HAS_ETHERNET mqttInit(); -#endif #ifndef ARCH_PORTDUINO // Initialize Wifi diff --git a/src/mesh/MeshService.cpp b/src/mesh/MeshService.cpp index 2ad46a6b7..64741619f 100644 --- a/src/mesh/MeshService.cpp +++ b/src/mesh/MeshService.cpp @@ -52,13 +52,18 @@ FIXME in the initial proof of concept we just skip the entire want/deny flow and MeshService service; +static MemoryDynamic staticMqttClientProxyMessagePool; + static MemoryDynamic staticQueueStatusPool; +Allocator &mqttClientProxyMessagePool = staticMqttClientProxyMessagePool; + Allocator &queueStatusPool = staticQueueStatusPool; #include "Router.h" -MeshService::MeshService() : toPhoneQueue(MAX_RX_TOPHONE), toPhoneQueueStatusQueue(MAX_RX_TOPHONE) +MeshService::MeshService() + : toPhoneQueue(MAX_RX_TOPHONE), toPhoneQueueStatusQueue(MAX_RX_TOPHONE), toPhoneMqttProxyQueue(MAX_RX_TOPHONE) { lastQueueStatus = {0, 0, 16, 0}; } @@ -269,6 +274,20 @@ void MeshService::sendToPhone(meshtastic_MeshPacket *p) fromNum++; } +void MeshService::sendMqttMessageToClientProxy(meshtastic_MqttClientProxyMessage *m) +{ + LOG_DEBUG("Sending mqtt message on topic '%s' to client for proxying to server\n", m->topic); + if (toPhoneMqttProxyQueue.numFree() == 0) { + LOG_WARN("MqttClientProxyMessagePool queue is full, discarding oldest\n"); + meshtastic_MqttClientProxyMessage *d = toPhoneMqttProxyQueue.dequeuePtr(0); + if (d) + releaseMqttClientProxyMessageToPool(d); + } + + assert(toPhoneMqttProxyQueue.enqueue(m, 0)); + fromNum++; +} + meshtastic_NodeInfoLite *MeshService::refreshLocalMeshNode() { meshtastic_NodeInfoLite *node = nodeDB.getMeshNode(nodeDB.getNodeNum()); diff --git a/src/mesh/MeshService.h b/src/mesh/MeshService.h index d14db7139..3cc197a5a 100644 --- a/src/mesh/MeshService.h +++ b/src/mesh/MeshService.h @@ -15,6 +15,7 @@ #endif extern Allocator &queueStatusPool; +extern Allocator &mqttClientProxyMessagePool; /** * Top level app for this service. keeps the mesh, the radio config and the queue of received packets. @@ -34,6 +35,9 @@ class MeshService // keep list of QueueStatus packets to be send to the phone PointerQueue toPhoneQueueStatusQueue; + // keep list of MqttClientProxyMessages to be send to the client for delivery + PointerQueue toPhoneMqttProxyQueue; + // This holds the last QueueStatus send meshtastic_QueueStatus lastQueueStatus; @@ -67,9 +71,15 @@ class MeshService /// Return the next QueueStatus packet destined to the phone. meshtastic_QueueStatus *getQueueStatusForPhone() { return toPhoneQueueStatusQueue.dequeuePtr(0); } + /// Return the next MqttClientProxyMessage packet destined to the phone. + meshtastic_MqttClientProxyMessage *getMqttClientProxyMessageForPhone() { return toPhoneMqttProxyQueue.dequeuePtr(0); } + // Release QueueStatus packet to pool void releaseQueueStatusToPool(meshtastic_QueueStatus *p) { queueStatusPool.release(p); } + // Release MqttClientProxyMessage packet to pool + void releaseMqttClientProxyMessageToPool(meshtastic_MqttClientProxyMessage *p) { mqttClientProxyMessagePool.release(p); } + /** * Given a ToRadio buffer parse it and properly handle it (setup radio, owner or send packet into the mesh) * Called by PhoneAPI.handleToRadio. Note: p is a scratch buffer, this function is allowed to write to it but it can not keep @@ -103,6 +113,9 @@ class MeshService /// Send a packet to the phone void sendToPhone(meshtastic_MeshPacket *p); + /// Send an MQTT message to the phone for client proxying + void sendMqttMessageToClientProxy(meshtastic_MqttClientProxyMessage *m); + bool isToPhoneQueueEmpty(); private: diff --git a/src/mesh/PhoneAPI.cpp b/src/mesh/PhoneAPI.cpp index ebc886301..6c6c70165 100644 --- a/src/mesh/PhoneAPI.cpp +++ b/src/mesh/PhoneAPI.cpp @@ -18,6 +18,8 @@ #error ToRadio is too big #endif +#include "mqtt/MQTT.h" + PhoneAPI::PhoneAPI() { lastContactMsec = millis(); @@ -54,6 +56,7 @@ void PhoneAPI::close() unobserve(&xModem.packetReady); releasePhonePacket(); // Don't leak phone packets on shutdown releaseQueueStatusPhonePacket(); + releaseMqttClientProxyPhonePacket(); onConnectionChanged(false); } @@ -98,6 +101,12 @@ bool PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength) LOG_INFO("Got xmodem packet\n"); xModem.handlePacket(toRadioScratch.xmodemPacket); break; + case meshtastic_ToRadio_mqttClientProxyMessage_tag: + LOG_INFO("Got MqttClientProxy message\n"); + if (mqtt && moduleConfig.mqtt.proxy_to_client_enabled) { + mqtt->onClientProxyReceive(toRadioScratch.mqttClientProxyMessage); + } + break; default: // Ignore nop messages // LOG_DEBUG("Error: unexpected ToRadio variant\n"); @@ -295,12 +304,16 @@ size_t PhoneAPI::getFromRadio(uint8_t *buf) break; case STATE_SEND_PACKETS: - // Do we have a message from the mesh? + // Do we have a message from the mesh or packet from the local device? LOG_INFO("getFromRadio=STATE_SEND_PACKETS\n"); if (queueStatusPacketForPhone) { fromRadioScratch.which_payload_variant = meshtastic_FromRadio_queueStatus_tag; fromRadioScratch.queueStatus = *queueStatusPacketForPhone; releaseQueueStatusPhonePacket(); + } else if (mqttClientProxyMessageForPhone) { + fromRadioScratch.which_payload_variant = meshtastic_FromRadio_mqttClientProxyMessage_tag; + fromRadioScratch.mqttClientProxyMessage = *mqttClientProxyMessageForPhone; + releaseMqttClientProxyPhonePacket(); } else if (xmodemPacketForPhone.control != meshtastic_XModem_Control_NUL) { fromRadioScratch.which_payload_variant = meshtastic_FromRadio_xmodemPacket_tag; fromRadioScratch.xmodemPacket = xmodemPacketForPhone; @@ -353,6 +366,14 @@ void PhoneAPI::releaseQueueStatusPhonePacket() } } +void PhoneAPI::releaseMqttClientProxyPhonePacket() +{ + if (mqttClientProxyMessageForPhone) { + service.releaseMqttClientProxyMessageToPool(mqttClientProxyMessageForPhone); + mqttClientProxyMessageForPhone = NULL; + } +} + /** * Return true if we have data available to send to the phone */ @@ -381,7 +402,9 @@ bool PhoneAPI::available() case STATE_SEND_PACKETS: { if (!queueStatusPacketForPhone) queueStatusPacketForPhone = service.getQueueStatusForPhone(); - bool hasPacket = !!queueStatusPacketForPhone; + if (!mqttClientProxyMessageForPhone) + mqttClientProxyMessageForPhone = service.getMqttClientProxyMessageForPhone(); + bool hasPacket = !!queueStatusPacketForPhone || !!mqttClientProxyMessageForPhone; if (hasPacket) return true; diff --git a/src/mesh/PhoneAPI.h b/src/mesh/PhoneAPI.h index 8097ad34b..65a06bc6b 100644 --- a/src/mesh/PhoneAPI.h +++ b/src/mesh/PhoneAPI.h @@ -50,6 +50,9 @@ class PhoneAPI // Keep QueueStatus packet just as packetForPhone meshtastic_QueueStatus *queueStatusPacketForPhone = NULL; + // Keep MqttClientProxyMessage packet just as packetForPhone + meshtastic_MqttClientProxyMessage *mqttClientProxyMessageForPhone = NULL; + /// We temporarily keep the nodeInfo here between the call to available and getFromRadio meshtastic_NodeInfo nodeInfoForPhone = meshtastic_NodeInfo_init_default; @@ -126,6 +129,8 @@ class PhoneAPI void releaseQueueStatusPhonePacket(); + void releaseMqttClientProxyPhonePacket(); + /// begin a new connection void handleStartConfig(); diff --git a/src/mesh/Router.cpp b/src/mesh/Router.cpp index 6f020d739..294b2531f 100644 --- a/src/mesh/Router.cpp +++ b/src/mesh/Router.cpp @@ -12,9 +12,7 @@ extern "C" { #include "mesh/compression/unishox2.h" } -#if HAS_WIFI || HAS_ETHERNET #include "mqtt/MQTT.h" -#endif /** * Router todo @@ -248,7 +246,6 @@ ErrorCode Router::send(meshtastic_MeshPacket *p) bool shouldActuallyEncrypt = true; -#if HAS_WIFI || HAS_ETHERNET if (moduleConfig.mqtt.enabled) { // check if we should send decrypted packets to mqtt @@ -272,7 +269,6 @@ ErrorCode Router::send(meshtastic_MeshPacket *p) if (mqtt && !shouldActuallyEncrypt) mqtt->onSend(*p, chIndex); } -#endif auto encodeResult = perhapsEncode(p); if (encodeResult != meshtastic_Routing_Error_NONE) { @@ -280,14 +276,12 @@ ErrorCode Router::send(meshtastic_MeshPacket *p) return encodeResult; // FIXME - this isn't a valid ErrorCode } -#if HAS_WIFI || HAS_ETHERNET if (moduleConfig.mqtt.enabled) { // the packet is now encrypted. // check if we should send encrypted packets to mqtt if (mqtt && shouldActuallyEncrypt) mqtt->onSend(*p, chIndex); } -#endif } assert(iface); // This should have been detected already in sendLocal (or we just received a packet from outside) diff --git a/src/mesh/eth/ethClient.cpp b/src/mesh/eth/ethClient.cpp index c60e35394..f10c96866 100644 --- a/src/mesh/eth/ethClient.cpp +++ b/src/mesh/eth/ethClient.cpp @@ -68,7 +68,7 @@ static int32_t reconnectETH() } // FIXME this is kinda yucky, instead we should just have an observable for 'wifireconnected' - if (mqtt && !mqtt->connected()) { + if (mqtt && !moduleConfig.mqtt.proxy_to_client_enabled && !mqtt->isConnectedDirectly()) { mqtt->reconnect(); } } @@ -87,7 +87,6 @@ static int32_t reconnectETH() perhapsSetRTC(RTCQualityNTP, &tv); ntp_renew = millis() + 43200 * 1000; // success, refresh every 12 hours - } else { LOG_ERROR("NTP Update failed\n"); ntp_renew = millis() + 300 * 1000; // failure, retry every 5 minutes @@ -170,7 +169,6 @@ bool initEthernet() ethEvent = new Periodic("ethConnect", reconnectETH); return true; - } else { LOG_INFO("Not using Ethernet\n"); return false; diff --git a/src/mesh/generated/meshtastic/deviceonly.pb.h b/src/mesh/generated/meshtastic/deviceonly.pb.h index d0c3b7bd8..a093c9fe2 100644 --- a/src/mesh/generated/meshtastic/deviceonly.pb.h +++ b/src/mesh/generated/meshtastic/deviceonly.pb.h @@ -323,7 +323,7 @@ extern const pb_msgdesc_t meshtastic_NodeRemoteHardwarePin_msg; #define meshtastic_DeviceState_size 35056 #define meshtastic_NodeInfoLite_size 151 #define meshtastic_NodeRemoteHardwarePin_size 29 -#define meshtastic_OEMStore_size 3152 +#define meshtastic_OEMStore_size 3154 #define meshtastic_PositionLite_size 28 #ifdef __cplusplus diff --git a/src/mesh/generated/meshtastic/localonly.pb.h b/src/mesh/generated/meshtastic/localonly.pb.h index d70acc7fc..b7199a001 100644 --- a/src/mesh/generated/meshtastic/localonly.pb.h +++ b/src/mesh/generated/meshtastic/localonly.pb.h @@ -163,7 +163,7 @@ extern const pb_msgdesc_t meshtastic_LocalModuleConfig_msg; /* Maximum encoded size of messages (where known) */ #define meshtastic_LocalConfig_size 461 -#define meshtastic_LocalModuleConfig_size 545 +#define meshtastic_LocalModuleConfig_size 547 #ifdef __cplusplus } /* extern "C" */ diff --git a/src/mesh/generated/meshtastic/mesh.pb.c b/src/mesh/generated/meshtastic/mesh.pb.c index ce7d48b14..790f8be2d 100644 --- a/src/mesh/generated/meshtastic/mesh.pb.c +++ b/src/mesh/generated/meshtastic/mesh.pb.c @@ -24,6 +24,9 @@ PB_BIND(meshtastic_Data, meshtastic_Data, 2) PB_BIND(meshtastic_Waypoint, meshtastic_Waypoint, AUTO) +PB_BIND(meshtastic_MqttClientProxyMessage, meshtastic_MqttClientProxyMessage, 2) + + PB_BIND(meshtastic_MeshPacket, meshtastic_MeshPacket, 2) diff --git a/src/mesh/generated/meshtastic/mesh.pb.h b/src/mesh/generated/meshtastic/mesh.pb.h index 0ed2f8e68..3d4c41cb2 100644 --- a/src/mesh/generated/meshtastic/mesh.pb.h +++ b/src/mesh/generated/meshtastic/mesh.pb.h @@ -462,6 +462,22 @@ typedef struct _meshtastic_Waypoint { uint32_t icon; } meshtastic_Waypoint; +typedef PB_BYTES_ARRAY_T(435) meshtastic_MqttClientProxyMessage_data_t; +/* This message will be proxied over the PhoneAPI for the client to deliver to the MQTT server */ +typedef struct _meshtastic_MqttClientProxyMessage { + /* The MQTT topic this message will be sent /received on */ + char topic[60]; + pb_size_t which_payload_variant; + union { + /* Bytes */ + meshtastic_MqttClientProxyMessage_data_t data; + /* Text */ + char text[435]; + } payload_variant; + /* Whether the message should be retained (or not) */ + bool retained; +} meshtastic_MqttClientProxyMessage; + typedef PB_BYTES_ARRAY_T(256) meshtastic_MeshPacket_encrypted_t; /* A packet envelope sent/received over the mesh only payload_variant is sent in the payload portion of the LORA packet. @@ -683,6 +699,8 @@ typedef struct _meshtastic_ToRadio { (Sending this message is optional for clients) */ bool disconnect; meshtastic_XModem xmodemPacket; + /* MQTT Client Proxy Message */ + meshtastic_MqttClientProxyMessage mqttClientProxyMessage; }; } meshtastic_ToRadio; @@ -780,6 +798,8 @@ typedef struct _meshtastic_FromRadio { meshtastic_XModem xmodemPacket; /* Device metadata message */ meshtastic_DeviceMetadata metadata; + /* MQTT Client Proxy Message */ + meshtastic_MqttClientProxyMessage mqttClientProxyMessage; }; } meshtastic_FromRadio; @@ -836,6 +856,7 @@ extern "C" { #define meshtastic_Data_portnum_ENUMTYPE meshtastic_PortNum + #define meshtastic_MeshPacket_priority_ENUMTYPE meshtastic_MeshPacket_Priority #define meshtastic_MeshPacket_delayed_ENUMTYPE meshtastic_MeshPacket_Delayed @@ -862,6 +883,7 @@ extern "C" { #define meshtastic_Routing_init_default {0, {meshtastic_RouteDiscovery_init_default}} #define meshtastic_Data_init_default {_meshtastic_PortNum_MIN, {0, {0}}, 0, 0, 0, 0, 0, 0} #define meshtastic_Waypoint_init_default {0, 0, 0, 0, 0, "", "", 0} +#define meshtastic_MqttClientProxyMessage_init_default {"", 0, {{0, {0}}}, 0} #define meshtastic_MeshPacket_init_default {0, 0, 0, 0, {meshtastic_Data_init_default}, 0, 0, 0, 0, 0, _meshtastic_MeshPacket_Priority_MIN, 0, _meshtastic_MeshPacket_Delayed_MIN} #define meshtastic_NodeInfo_init_default {0, false, meshtastic_User_init_default, false, meshtastic_Position_init_default, 0, 0, false, meshtastic_DeviceMetrics_init_default, 0} #define meshtastic_MyNodeInfo_init_default {0, 0, 0, "", _meshtastic_CriticalErrorCode_MIN, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, 0, 0} @@ -879,6 +901,7 @@ extern "C" { #define meshtastic_Routing_init_zero {0, {meshtastic_RouteDiscovery_init_zero}} #define meshtastic_Data_init_zero {_meshtastic_PortNum_MIN, {0, {0}}, 0, 0, 0, 0, 0, 0} #define meshtastic_Waypoint_init_zero {0, 0, 0, 0, 0, "", "", 0} +#define meshtastic_MqttClientProxyMessage_init_zero {"", 0, {{0, {0}}}, 0} #define meshtastic_MeshPacket_init_zero {0, 0, 0, 0, {meshtastic_Data_init_zero}, 0, 0, 0, 0, 0, _meshtastic_MeshPacket_Priority_MIN, 0, _meshtastic_MeshPacket_Delayed_MIN} #define meshtastic_NodeInfo_init_zero {0, false, meshtastic_User_init_zero, false, meshtastic_Position_init_zero, 0, 0, false, meshtastic_DeviceMetrics_init_zero, 0} #define meshtastic_MyNodeInfo_init_zero {0, 0, 0, "", _meshtastic_CriticalErrorCode_MIN, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, 0, 0} @@ -940,6 +963,10 @@ extern "C" { #define meshtastic_Waypoint_name_tag 6 #define meshtastic_Waypoint_description_tag 7 #define meshtastic_Waypoint_icon_tag 8 +#define meshtastic_MqttClientProxyMessage_topic_tag 1 +#define meshtastic_MqttClientProxyMessage_data_tag 2 +#define meshtastic_MqttClientProxyMessage_text_tag 3 +#define meshtastic_MqttClientProxyMessage_retained_tag 4 #define meshtastic_MeshPacket_from_tag 1 #define meshtastic_MeshPacket_to_tag 2 #define meshtastic_MeshPacket_channel_tag 3 @@ -988,6 +1015,7 @@ extern "C" { #define meshtastic_ToRadio_want_config_id_tag 3 #define meshtastic_ToRadio_disconnect_tag 4 #define meshtastic_ToRadio_xmodemPacket_tag 5 +#define meshtastic_ToRadio_mqttClientProxyMessage_tag 6 #define meshtastic_Compressed_portnum_tag 1 #define meshtastic_Compressed_data_tag 2 #define meshtastic_Neighbor_node_id_tag 1 @@ -1018,6 +1046,7 @@ extern "C" { #define meshtastic_FromRadio_queueStatus_tag 11 #define meshtastic_FromRadio_xmodemPacket_tag 12 #define meshtastic_FromRadio_metadata_tag 13 +#define meshtastic_FromRadio_mqttClientProxyMessage_tag 14 /* Struct field encoding specification for nanopb */ #define meshtastic_Position_FIELDLIST(X, a) \ @@ -1094,6 +1123,14 @@ X(a, STATIC, SINGULAR, FIXED32, icon, 8) #define meshtastic_Waypoint_CALLBACK NULL #define meshtastic_Waypoint_DEFAULT NULL +#define meshtastic_MqttClientProxyMessage_FIELDLIST(X, a) \ +X(a, STATIC, SINGULAR, STRING, topic, 1) \ +X(a, STATIC, ONEOF, BYTES, (payload_variant,data,payload_variant.data), 2) \ +X(a, STATIC, ONEOF, STRING, (payload_variant,text,payload_variant.text), 3) \ +X(a, STATIC, SINGULAR, BOOL, retained, 4) +#define meshtastic_MqttClientProxyMessage_CALLBACK NULL +#define meshtastic_MqttClientProxyMessage_DEFAULT NULL + #define meshtastic_MeshPacket_FIELDLIST(X, a) \ X(a, STATIC, SINGULAR, FIXED32, from, 1) \ X(a, STATIC, SINGULAR, FIXED32, to, 2) \ @@ -1175,7 +1212,8 @@ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,moduleConfig,moduleConfig), X(a, STATIC, ONEOF, MESSAGE, (payload_variant,channel,channel), 10) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,queueStatus,queueStatus), 11) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,xmodemPacket,xmodemPacket), 12) \ -X(a, STATIC, ONEOF, MESSAGE, (payload_variant,metadata,metadata), 13) +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,metadata,metadata), 13) \ +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,mqttClientProxyMessage,mqttClientProxyMessage), 14) #define meshtastic_FromRadio_CALLBACK NULL #define meshtastic_FromRadio_DEFAULT NULL #define meshtastic_FromRadio_payload_variant_packet_MSGTYPE meshtastic_MeshPacket @@ -1188,16 +1226,19 @@ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,metadata,metadata), 13) #define meshtastic_FromRadio_payload_variant_queueStatus_MSGTYPE meshtastic_QueueStatus #define meshtastic_FromRadio_payload_variant_xmodemPacket_MSGTYPE meshtastic_XModem #define meshtastic_FromRadio_payload_variant_metadata_MSGTYPE meshtastic_DeviceMetadata +#define meshtastic_FromRadio_payload_variant_mqttClientProxyMessage_MSGTYPE meshtastic_MqttClientProxyMessage #define meshtastic_ToRadio_FIELDLIST(X, a) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,packet,packet), 1) \ X(a, STATIC, ONEOF, UINT32, (payload_variant,want_config_id,want_config_id), 3) \ X(a, STATIC, ONEOF, BOOL, (payload_variant,disconnect,disconnect), 4) \ -X(a, STATIC, ONEOF, MESSAGE, (payload_variant,xmodemPacket,xmodemPacket), 5) +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,xmodemPacket,xmodemPacket), 5) \ +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,mqttClientProxyMessage,mqttClientProxyMessage), 6) #define meshtastic_ToRadio_CALLBACK NULL #define meshtastic_ToRadio_DEFAULT NULL #define meshtastic_ToRadio_payload_variant_packet_MSGTYPE meshtastic_MeshPacket #define meshtastic_ToRadio_payload_variant_xmodemPacket_MSGTYPE meshtastic_XModem +#define meshtastic_ToRadio_payload_variant_mqttClientProxyMessage_MSGTYPE meshtastic_MqttClientProxyMessage #define meshtastic_Compressed_FIELDLIST(X, a) \ X(a, STATIC, SINGULAR, UENUM, portnum, 1) \ @@ -1239,6 +1280,7 @@ extern const pb_msgdesc_t meshtastic_RouteDiscovery_msg; extern const pb_msgdesc_t meshtastic_Routing_msg; extern const pb_msgdesc_t meshtastic_Data_msg; extern const pb_msgdesc_t meshtastic_Waypoint_msg; +extern const pb_msgdesc_t meshtastic_MqttClientProxyMessage_msg; extern const pb_msgdesc_t meshtastic_MeshPacket_msg; extern const pb_msgdesc_t meshtastic_NodeInfo_msg; extern const pb_msgdesc_t meshtastic_MyNodeInfo_msg; @@ -1258,6 +1300,7 @@ extern const pb_msgdesc_t meshtastic_DeviceMetadata_msg; #define meshtastic_Routing_fields &meshtastic_Routing_msg #define meshtastic_Data_fields &meshtastic_Data_msg #define meshtastic_Waypoint_fields &meshtastic_Waypoint_msg +#define meshtastic_MqttClientProxyMessage_fields &meshtastic_MqttClientProxyMessage_msg #define meshtastic_MeshPacket_fields &meshtastic_MeshPacket_msg #define meshtastic_NodeInfo_fields &meshtastic_NodeInfo_msg #define meshtastic_MyNodeInfo_fields &meshtastic_MyNodeInfo_msg @@ -1274,9 +1317,10 @@ extern const pb_msgdesc_t meshtastic_DeviceMetadata_msg; #define meshtastic_Compressed_size 243 #define meshtastic_Data_size 270 #define meshtastic_DeviceMetadata_size 46 -#define meshtastic_FromRadio_size 330 +#define meshtastic_FromRadio_size 510 #define meshtastic_LogRecord_size 81 #define meshtastic_MeshPacket_size 321 +#define meshtastic_MqttClientProxyMessage_size 501 #define meshtastic_MyNodeInfo_size 179 #define meshtastic_NeighborInfo_size 142 #define meshtastic_Neighbor_size 11 @@ -1285,7 +1329,7 @@ extern const pb_msgdesc_t meshtastic_DeviceMetadata_msg; #define meshtastic_QueueStatus_size 23 #define meshtastic_RouteDiscovery_size 40 #define meshtastic_Routing_size 42 -#define meshtastic_ToRadio_size 324 +#define meshtastic_ToRadio_size 504 #define meshtastic_User_size 77 #define meshtastic_Waypoint_size 165 diff --git a/src/mesh/generated/meshtastic/module_config.pb.c b/src/mesh/generated/meshtastic/module_config.pb.c index 9352feb17..86614d18c 100644 --- a/src/mesh/generated/meshtastic/module_config.pb.c +++ b/src/mesh/generated/meshtastic/module_config.pb.c @@ -39,6 +39,9 @@ PB_BIND(meshtastic_ModuleConfig_TelemetryConfig, meshtastic_ModuleConfig_Telemet PB_BIND(meshtastic_ModuleConfig_CannedMessageConfig, meshtastic_ModuleConfig_CannedMessageConfig, AUTO) +PB_BIND(meshtastic_ModuleConfig_AmbientLightingConfig, meshtastic_ModuleConfig_AmbientLightingConfig, AUTO) + + PB_BIND(meshtastic_RemoteHardwarePin, meshtastic_RemoteHardwarePin, AUTO) diff --git a/src/mesh/generated/meshtastic/module_config.pb.h b/src/mesh/generated/meshtastic/module_config.pb.h index 6273a89ae..43b330b04 100644 --- a/src/mesh/generated/meshtastic/module_config.pb.h +++ b/src/mesh/generated/meshtastic/module_config.pb.h @@ -112,6 +112,8 @@ typedef struct _meshtastic_ModuleConfig_MQTTConfig { /* The root topic to use for MQTT messages. Default is "msh". This is useful if you want to use a single MQTT server for multiple meshtastic networks and separate them via ACLs */ char root[16]; + /* If true, we can use the connected phone / client to proxy messages to MQTT instead of a direct connection */ + bool proxy_to_client_enabled; } meshtastic_ModuleConfig_MQTTConfig; /* NeighborInfoModule Config */ @@ -279,6 +281,20 @@ typedef struct _meshtastic_ModuleConfig_CannedMessageConfig { bool send_bell; } meshtastic_ModuleConfig_CannedMessageConfig; +/* Ambient Lighting Module - Settings for control of onboard LEDs to allow users to adjust the brightness levels and respective color levels. +Initially created for the RAK14001 RGB LED module. */ +typedef struct _meshtastic_ModuleConfig_AmbientLightingConfig { + /* Sets LED to on or off. */ + bool led_state; + /* Sets the overall current for the LED, firmware side range for the RAK14001 is 1-31, but users should be given a range of 0-100% */ + uint8_t current; + uint8_t red; /* Red level */ + /* Sets the green level of the LED, firmware side values are 0-255, but users should be given a range of 0-100% */ + uint8_t green; /* Green level */ + /* Sets the blue level of the LED, firmware side values are 0-255, but users should be given a range of 0-100% */ + uint8_t blue; /* Blue level */ +} meshtastic_ModuleConfig_AmbientLightingConfig; + /* A GPIO pin definition for remote hardware module */ typedef struct _meshtastic_RemoteHardwarePin { /* GPIO Pin number (must match Arduino) */ @@ -324,6 +340,8 @@ typedef struct _meshtastic_ModuleConfig { meshtastic_ModuleConfig_RemoteHardwareConfig remote_hardware; /* TODO: REPLACE */ meshtastic_ModuleConfig_NeighborInfoConfig neighbor_info; + /* TODO: REPLACE */ + meshtastic_ModuleConfig_AmbientLightingConfig ambient_lighting; } payload_variant; } meshtastic_ModuleConfig; @@ -370,12 +388,13 @@ extern "C" { #define meshtastic_ModuleConfig_CannedMessageConfig_inputbroker_event_ccw_ENUMTYPE meshtastic_ModuleConfig_CannedMessageConfig_InputEventChar #define meshtastic_ModuleConfig_CannedMessageConfig_inputbroker_event_press_ENUMTYPE meshtastic_ModuleConfig_CannedMessageConfig_InputEventChar + #define meshtastic_RemoteHardwarePin_type_ENUMTYPE meshtastic_RemoteHardwarePinType /* Initializer values for message structs */ #define meshtastic_ModuleConfig_init_default {0, {meshtastic_ModuleConfig_MQTTConfig_init_default}} -#define meshtastic_ModuleConfig_MQTTConfig_init_default {0, "", "", "", 0, 0, 0, ""} +#define meshtastic_ModuleConfig_MQTTConfig_init_default {0, "", "", "", 0, 0, 0, "", 0} #define meshtastic_ModuleConfig_RemoteHardwareConfig_init_default {0, 0, 0, {meshtastic_RemoteHardwarePin_init_default, meshtastic_RemoteHardwarePin_init_default, meshtastic_RemoteHardwarePin_init_default, meshtastic_RemoteHardwarePin_init_default}} #define meshtastic_ModuleConfig_NeighborInfoConfig_init_default {0, 0} #define meshtastic_ModuleConfig_AudioConfig_init_default {0, 0, _meshtastic_ModuleConfig_AudioConfig_Audio_Baud_MIN, 0, 0, 0, 0} @@ -385,9 +404,10 @@ extern "C" { #define meshtastic_ModuleConfig_RangeTestConfig_init_default {0, 0, 0} #define meshtastic_ModuleConfig_TelemetryConfig_init_default {0, 0, 0, 0, 0, 0, 0} #define meshtastic_ModuleConfig_CannedMessageConfig_init_default {0, 0, 0, 0, _meshtastic_ModuleConfig_CannedMessageConfig_InputEventChar_MIN, _meshtastic_ModuleConfig_CannedMessageConfig_InputEventChar_MIN, _meshtastic_ModuleConfig_CannedMessageConfig_InputEventChar_MIN, 0, 0, "", 0} +#define meshtastic_ModuleConfig_AmbientLightingConfig_init_default {0, 0, 0, 0, 0} #define meshtastic_RemoteHardwarePin_init_default {0, "", _meshtastic_RemoteHardwarePinType_MIN} #define meshtastic_ModuleConfig_init_zero {0, {meshtastic_ModuleConfig_MQTTConfig_init_zero}} -#define meshtastic_ModuleConfig_MQTTConfig_init_zero {0, "", "", "", 0, 0, 0, ""} +#define meshtastic_ModuleConfig_MQTTConfig_init_zero {0, "", "", "", 0, 0, 0, "", 0} #define meshtastic_ModuleConfig_RemoteHardwareConfig_init_zero {0, 0, 0, {meshtastic_RemoteHardwarePin_init_zero, meshtastic_RemoteHardwarePin_init_zero, meshtastic_RemoteHardwarePin_init_zero, meshtastic_RemoteHardwarePin_init_zero}} #define meshtastic_ModuleConfig_NeighborInfoConfig_init_zero {0, 0} #define meshtastic_ModuleConfig_AudioConfig_init_zero {0, 0, _meshtastic_ModuleConfig_AudioConfig_Audio_Baud_MIN, 0, 0, 0, 0} @@ -397,6 +417,7 @@ extern "C" { #define meshtastic_ModuleConfig_RangeTestConfig_init_zero {0, 0, 0} #define meshtastic_ModuleConfig_TelemetryConfig_init_zero {0, 0, 0, 0, 0, 0, 0} #define meshtastic_ModuleConfig_CannedMessageConfig_init_zero {0, 0, 0, 0, _meshtastic_ModuleConfig_CannedMessageConfig_InputEventChar_MIN, _meshtastic_ModuleConfig_CannedMessageConfig_InputEventChar_MIN, _meshtastic_ModuleConfig_CannedMessageConfig_InputEventChar_MIN, 0, 0, "", 0} +#define meshtastic_ModuleConfig_AmbientLightingConfig_init_zero {0, 0, 0, 0, 0} #define meshtastic_RemoteHardwarePin_init_zero {0, "", _meshtastic_RemoteHardwarePinType_MIN} /* Field tags (for use in manual encoding/decoding) */ @@ -408,6 +429,7 @@ extern "C" { #define meshtastic_ModuleConfig_MQTTConfig_json_enabled_tag 6 #define meshtastic_ModuleConfig_MQTTConfig_tls_enabled_tag 7 #define meshtastic_ModuleConfig_MQTTConfig_root_tag 8 +#define meshtastic_ModuleConfig_MQTTConfig_proxy_to_client_enabled_tag 9 #define meshtastic_ModuleConfig_NeighborInfoConfig_enabled_tag 1 #define meshtastic_ModuleConfig_NeighborInfoConfig_update_interval_tag 2 #define meshtastic_ModuleConfig_AudioConfig_codec2_enabled_tag 1 @@ -465,6 +487,11 @@ extern "C" { #define meshtastic_ModuleConfig_CannedMessageConfig_enabled_tag 9 #define meshtastic_ModuleConfig_CannedMessageConfig_allow_input_source_tag 10 #define meshtastic_ModuleConfig_CannedMessageConfig_send_bell_tag 11 +#define meshtastic_ModuleConfig_AmbientLightingConfig_led_state_tag 1 +#define meshtastic_ModuleConfig_AmbientLightingConfig_current_tag 2 +#define meshtastic_ModuleConfig_AmbientLightingConfig_red_tag 3 +#define meshtastic_ModuleConfig_AmbientLightingConfig_green_tag 4 +#define meshtastic_ModuleConfig_AmbientLightingConfig_blue_tag 5 #define meshtastic_RemoteHardwarePin_gpio_pin_tag 1 #define meshtastic_RemoteHardwarePin_name_tag 2 #define meshtastic_RemoteHardwarePin_type_tag 3 @@ -481,6 +508,7 @@ extern "C" { #define meshtastic_ModuleConfig_audio_tag 8 #define meshtastic_ModuleConfig_remote_hardware_tag 9 #define meshtastic_ModuleConfig_neighbor_info_tag 10 +#define meshtastic_ModuleConfig_ambient_lighting_tag 11 /* Struct field encoding specification for nanopb */ #define meshtastic_ModuleConfig_FIELDLIST(X, a) \ @@ -493,7 +521,8 @@ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,telemetry,payload_variant.te X(a, STATIC, ONEOF, MESSAGE, (payload_variant,canned_message,payload_variant.canned_message), 7) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,audio,payload_variant.audio), 8) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,remote_hardware,payload_variant.remote_hardware), 9) \ -X(a, STATIC, ONEOF, MESSAGE, (payload_variant,neighbor_info,payload_variant.neighbor_info), 10) +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,neighbor_info,payload_variant.neighbor_info), 10) \ +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,ambient_lighting,payload_variant.ambient_lighting), 11) #define meshtastic_ModuleConfig_CALLBACK NULL #define meshtastic_ModuleConfig_DEFAULT NULL #define meshtastic_ModuleConfig_payload_variant_mqtt_MSGTYPE meshtastic_ModuleConfig_MQTTConfig @@ -506,6 +535,7 @@ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,neighbor_info,payload_varian #define meshtastic_ModuleConfig_payload_variant_audio_MSGTYPE meshtastic_ModuleConfig_AudioConfig #define meshtastic_ModuleConfig_payload_variant_remote_hardware_MSGTYPE meshtastic_ModuleConfig_RemoteHardwareConfig #define meshtastic_ModuleConfig_payload_variant_neighbor_info_MSGTYPE meshtastic_ModuleConfig_NeighborInfoConfig +#define meshtastic_ModuleConfig_payload_variant_ambient_lighting_MSGTYPE meshtastic_ModuleConfig_AmbientLightingConfig #define meshtastic_ModuleConfig_MQTTConfig_FIELDLIST(X, a) \ X(a, STATIC, SINGULAR, BOOL, enabled, 1) \ @@ -515,7 +545,8 @@ X(a, STATIC, SINGULAR, STRING, password, 4) \ X(a, STATIC, SINGULAR, BOOL, encryption_enabled, 5) \ X(a, STATIC, SINGULAR, BOOL, json_enabled, 6) \ X(a, STATIC, SINGULAR, BOOL, tls_enabled, 7) \ -X(a, STATIC, SINGULAR, STRING, root, 8) +X(a, STATIC, SINGULAR, STRING, root, 8) \ +X(a, STATIC, SINGULAR, BOOL, proxy_to_client_enabled, 9) #define meshtastic_ModuleConfig_MQTTConfig_CALLBACK NULL #define meshtastic_ModuleConfig_MQTTConfig_DEFAULT NULL @@ -616,6 +647,15 @@ X(a, STATIC, SINGULAR, BOOL, send_bell, 11) #define meshtastic_ModuleConfig_CannedMessageConfig_CALLBACK NULL #define meshtastic_ModuleConfig_CannedMessageConfig_DEFAULT NULL +#define meshtastic_ModuleConfig_AmbientLightingConfig_FIELDLIST(X, a) \ +X(a, STATIC, SINGULAR, BOOL, led_state, 1) \ +X(a, STATIC, SINGULAR, UINT32, current, 2) \ +X(a, STATIC, SINGULAR, UINT32, red, 3) \ +X(a, STATIC, SINGULAR, UINT32, green, 4) \ +X(a, STATIC, SINGULAR, UINT32, blue, 5) +#define meshtastic_ModuleConfig_AmbientLightingConfig_CALLBACK NULL +#define meshtastic_ModuleConfig_AmbientLightingConfig_DEFAULT NULL + #define meshtastic_RemoteHardwarePin_FIELDLIST(X, a) \ X(a, STATIC, SINGULAR, UINT32, gpio_pin, 1) \ X(a, STATIC, SINGULAR, STRING, name, 2) \ @@ -634,6 +674,7 @@ extern const pb_msgdesc_t meshtastic_ModuleConfig_StoreForwardConfig_msg; extern const pb_msgdesc_t meshtastic_ModuleConfig_RangeTestConfig_msg; extern const pb_msgdesc_t meshtastic_ModuleConfig_TelemetryConfig_msg; extern const pb_msgdesc_t meshtastic_ModuleConfig_CannedMessageConfig_msg; +extern const pb_msgdesc_t meshtastic_ModuleConfig_AmbientLightingConfig_msg; extern const pb_msgdesc_t meshtastic_RemoteHardwarePin_msg; /* Defines for backwards compatibility with code written before nanopb-0.4.0 */ @@ -648,20 +689,22 @@ extern const pb_msgdesc_t meshtastic_RemoteHardwarePin_msg; #define meshtastic_ModuleConfig_RangeTestConfig_fields &meshtastic_ModuleConfig_RangeTestConfig_msg #define meshtastic_ModuleConfig_TelemetryConfig_fields &meshtastic_ModuleConfig_TelemetryConfig_msg #define meshtastic_ModuleConfig_CannedMessageConfig_fields &meshtastic_ModuleConfig_CannedMessageConfig_msg +#define meshtastic_ModuleConfig_AmbientLightingConfig_fields &meshtastic_ModuleConfig_AmbientLightingConfig_msg #define meshtastic_RemoteHardwarePin_fields &meshtastic_RemoteHardwarePin_msg /* Maximum encoded size of messages (where known) */ +#define meshtastic_ModuleConfig_AmbientLightingConfig_size 14 #define meshtastic_ModuleConfig_AudioConfig_size 19 #define meshtastic_ModuleConfig_CannedMessageConfig_size 49 #define meshtastic_ModuleConfig_ExternalNotificationConfig_size 40 -#define meshtastic_ModuleConfig_MQTTConfig_size 220 +#define meshtastic_ModuleConfig_MQTTConfig_size 222 #define meshtastic_ModuleConfig_NeighborInfoConfig_size 8 #define meshtastic_ModuleConfig_RangeTestConfig_size 10 #define meshtastic_ModuleConfig_RemoteHardwareConfig_size 96 #define meshtastic_ModuleConfig_SerialConfig_size 28 #define meshtastic_ModuleConfig_StoreForwardConfig_size 22 #define meshtastic_ModuleConfig_TelemetryConfig_size 26 -#define meshtastic_ModuleConfig_size 223 +#define meshtastic_ModuleConfig_size 225 #define meshtastic_RemoteHardwarePin_size 21 #ifdef __cplusplus diff --git a/src/mesh/generated/meshtastic/portnums.pb.h b/src/mesh/generated/meshtastic/portnums.pb.h index 089d7b59f..e4aaeeb96 100644 --- a/src/mesh/generated/meshtastic/portnums.pb.h +++ b/src/mesh/generated/meshtastic/portnums.pb.h @@ -54,6 +54,8 @@ typedef enum _meshtastic_PortNum { /* Audio Payloads. Encapsulated codec2 packets. On 2.4 GHZ Bandwidths only for now */ meshtastic_PortNum_AUDIO_APP = 9, + /* Payloads for clients with a network connection proxying MQTT pub/sub to the device */ + meshtastic_PortNum_MQTT_CLIENT_PROXY_APP = 10, /* Provides a 'ping' service that replies to any packet it receives. Also serves as a small example module. */ meshtastic_PortNum_REPLY_APP = 32, diff --git a/src/modules/AdminModule.cpp b/src/modules/AdminModule.cpp index ae254e7f7..be76f62a5 100644 --- a/src/modules/AdminModule.cpp +++ b/src/modules/AdminModule.cpp @@ -16,9 +16,7 @@ #include "unistd.h" #endif -#if HAS_WIFI || HAS_ETHERNET #include "mqtt/MQTT.h" -#endif #define DEFAULT_REBOOT_SECONDS 7 @@ -567,7 +565,7 @@ void AdminModule::handleGetDeviceConnectionStatus(const meshtastic_MeshPacket &r if (conn.wifi.status.is_connected) { conn.wifi.rssi = WiFi.RSSI(); conn.wifi.status.ip_address = WiFi.localIP(); - conn.wifi.status.is_mqtt_connected = mqtt && mqtt->connected(); + conn.wifi.status.is_mqtt_connected = mqtt && mqtt->isConnectedDirectly(); conn.wifi.status.is_syslog_connected = false; // FIXME wire this up } #endif @@ -578,7 +576,7 @@ void AdminModule::handleGetDeviceConnectionStatus(const meshtastic_MeshPacket &r if (Ethernet.linkStatus() == LinkON) { conn.ethernet.status.is_connected = true; conn.ethernet.status.ip_address = Ethernet.localIP(); - conn.ethernet.status.is_mqtt_connected = mqtt && mqtt->connected(); + conn.ethernet.status.is_mqtt_connected = mqtt && mqtt->isConnectedDirectly(); conn.ethernet.status.is_syslog_connected = false; // FIXME wire this up } else { conn.ethernet.status.is_connected = false; diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index 532a2d125..c10f9182d 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -25,12 +25,16 @@ Allocator &mqttPool = staticMqttPool; void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { - mqtt->onPublish(topic, payload, length); + mqtt->onReceive(topic, payload, length); } -void MQTT::onPublish(char *topic, byte *payload, unsigned int length) +void MQTT::onClientProxyReceive(meshtastic_MqttClientProxyMessage msg) +{ + onReceive(msg.topic, msg.payload_variant.data.bytes, msg.payload_variant.data.size); +} + +void MQTT::onReceive(char *topic, byte *payload, size_t length) { - // parsing ServiceEnvelope meshtastic_ServiceEnvelope e = meshtastic_ServiceEnvelope_init_default; if (moduleConfig.mqtt.json_enabled && (strncmp(topic, jsonTopic.c_str(), jsonTopic.length()) == 0)) { @@ -153,10 +157,13 @@ void mqttInit() new MQTT(); } +#ifdef HAS_NETWORKING MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE) +#else +MQTT::MQTT() : concurrency::OSThread("mqtt"), mqttQueue(MAX_MQTT_QUEUE) +#endif { if (moduleConfig.mqtt.enabled) { - assert(!mqtt); mqtt = this; @@ -170,22 +177,77 @@ MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_ jsonTopic = "msh" + jsonTopic; } - pubSub.setCallback(mqttCallback); - +#ifdef HAS_NETWORKING + if (!moduleConfig.mqtt.proxy_to_client_enabled) + pubSub.setCallback(mqttCallback); +#endif // preflightSleepObserver.observe(&preflightSleep); } else { disable(); } } -bool MQTT::connected() +bool MQTT::isConnectedDirectly() { +#ifdef HAS_NETWORKING return pubSub.connected(); +#else + return false; +#endif +} + +bool MQTT::publish(const char *topic, const char *payload, bool retained) +{ + if (moduleConfig.mqtt.proxy_to_client_enabled) { + meshtastic_MqttClientProxyMessage *msg = mqttClientProxyMessagePool.allocZeroed(); + msg->which_payload_variant = meshtastic_MqttClientProxyMessage_text_tag; + strcpy(msg->topic, topic); + strcpy(msg->payload_variant.text, payload); + msg->retained = retained; + service.sendMqttMessageToClientProxy(msg); + return true; + } +#ifdef HAS_NETWORKING + else if (isConnectedDirectly()) { + return pubSub.publish(topic, payload, retained); + } +#endif + return false; +} + +bool MQTT::publish(const char *topic, const uint8_t *payload, size_t length, bool retained) +{ + if (moduleConfig.mqtt.proxy_to_client_enabled) { + meshtastic_MqttClientProxyMessage *msg = mqttClientProxyMessagePool.allocZeroed(); + msg->which_payload_variant = meshtastic_MqttClientProxyMessage_data_tag; + strcpy(msg->topic, topic); + msg->payload_variant.data.size = length; + memcpy(msg->payload_variant.data.bytes, payload, length); + msg->retained = retained; + service.sendMqttMessageToClientProxy(msg); + return true; + } +#ifdef HAS_NETWORKING + else if (isConnectedDirectly()) { + return pubSub.publish(topic, payload, length, retained); + } +#endif + return false; } void MQTT::reconnect() { if (wantsLink()) { + if (moduleConfig.mqtt.proxy_to_client_enabled) { + LOG_INFO("MQTT connecting via client proxy instead...\n"); + enabled = true; + runASAP = true; + reconnectCount = 0; + + publishStatus(); + return; // Don't try to connect directly to the server + } +#ifdef HAS_NETWORKING // Defaults int serverPort = 1883; const char *serverAddr = default_mqtt_address; @@ -197,7 +259,6 @@ void MQTT::reconnect() mqttUsername = moduleConfig.mqtt.username; mqttPassword = moduleConfig.mqtt.password; } - #if HAS_WIFI && !defined(ARCH_PORTDUINO) if (moduleConfig.mqtt.tls_enabled) { // change default for encrypted to 8883 @@ -214,7 +275,7 @@ void MQTT::reconnect() LOG_INFO("Using non-TLS-encrypted session\n"); pubSub.setClient(mqttClient); } -#else +#elif HAS_NETWORKING pubSub.setClient(mqttClient); #endif @@ -229,8 +290,9 @@ void MQTT::reconnect() pubSub.setServer(serverAddr, serverPort); pubSub.setBufferSize(512); - LOG_INFO("Connecting to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, serverPort, mqttUsername, - mqttPassword); + LOG_INFO("Attempting to connnect directly to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, + serverPort, mqttUsername, mqttPassword); + auto myStatus = (statusTopic + owner.id); bool connected = pubSub.connect(owner.id, mqttUsername, mqttPassword, myStatus.c_str(), 1, true, "offline"); if (connected) { @@ -239,15 +301,12 @@ void MQTT::reconnect() runASAP = true; reconnectCount = 0; - /// FIXME, include more information in the status text - bool ok = pubSub.publish(myStatus.c_str(), "online", true); - LOG_INFO("published %d\n", ok); - + publishStatus(); sendSubscriptions(); } else { #if HAS_WIFI && !defined(ARCH_PORTDUINO) reconnectCount++; - LOG_ERROR("Failed to contact MQTT server (%d/%d)...\n", reconnectCount, reconnectMax); + LOG_ERROR("Failed to contact MQTT server directly (%d/%d)...\n", reconnectCount, reconnectMax); if (reconnectCount >= reconnectMax) { needReconnect = true; wifiReconnect->setIntervalFromNow(0); @@ -255,11 +314,13 @@ void MQTT::reconnect() } #endif } +#endif } } void MQTT::sendSubscriptions() { +#ifdef HAS_NETWORKING size_t numChan = channels.getNumChannels(); for (size_t i = 0; i < numChan; i++) { auto &ch = channels.getByIndex(i); @@ -274,6 +335,7 @@ void MQTT::sendSubscriptions() } } } +#endif } bool MQTT::wantsLink() const @@ -291,60 +353,44 @@ bool MQTT::wantsLink() const } } } + if (hasChannel && moduleConfig.mqtt.proxy_to_client_enabled) + return true; #if HAS_WIFI return hasChannel && WiFi.isConnected(); #endif #if HAS_ETHERNET - return hasChannel && (Ethernet.linkStatus() == LinkON); + return hasChannel && Ethernet.linkStatus() == LinkON; #endif return false; } int32_t MQTT::runOnce() { - if (!moduleConfig.mqtt.enabled) { + if (!moduleConfig.mqtt.enabled) return disable(); - } + bool wantConnection = wantsLink(); // If connected poll rapidly, otherwise only occasionally check for a wifi connection change and ability to contact server - if (!pubSub.loop()) { - if (wantConnection) { + if (moduleConfig.mqtt.proxy_to_client_enabled) { + publishQueuedMessages(); + return 200; + } +#ifdef HAS_NETWORKING + else if (!pubSub.loop()) { + if (!wantConnection) + return 5000; // If we don't want connection now, check again in 5 secs + else { reconnect(); - // If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP // connections are EXPENSIVE so try rarely) - if (pubSub.connected()) { - if (!mqttQueue.isEmpty()) { - // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets - meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0); - static uint8_t bytes[meshtastic_MeshPacket_size + 64]; - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env); - - std::string topic = cryptTopic + env->channel_id + "/" + owner.id; - LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes); - - pubSub.publish(topic.c_str(), bytes, numBytes, false); - - if (moduleConfig.mqtt.json_enabled) { - // handle json topic - auto jsonString = this->downstreamPacketToJson(env->packet); - if (jsonString.length() != 0) { - std::string topicJson = jsonTopic + env->channel_id + "/" + owner.id; - LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), - jsonString.c_str()); - pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); - } - } - mqttPool.release(env); - } + if (isConnectedDirectly()) { + publishQueuedMessages(); return 200; - } else { + } else return 30000; - } - } else - return 5000; // If we don't want connection now, check again in 5 secs + } } else { // we are connected to server, check often for new requests on the TCP port if (!wantConnection) { @@ -355,6 +401,44 @@ int32_t MQTT::runOnce() powerFSM.trigger(EVENT_CONTACT_FROM_PHONE); // Suppress entering light sleep (because that would turn off bluetooth) return 20; } +#endif + return 30000; +} + +/// FIXME, include more information in the status text +void MQTT::publishStatus() +{ + auto myStatus = (statusTopic + owner.id); + bool ok = publish(myStatus.c_str(), "online", true); + LOG_INFO("published online=%d\n", ok); +} + +void MQTT::publishQueuedMessages() +{ + if (!mqttQueue.isEmpty()) { + LOG_DEBUG("Publishing enqueued MQTT message\n"); + // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets + meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0); + static uint8_t bytes[meshtastic_MeshPacket_size + 64]; + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env); + + std::string topic = cryptTopic + env->channel_id + "/" + owner.id; + LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes); + + publish(topic.c_str(), bytes, numBytes, false); + + if (moduleConfig.mqtt.json_enabled) { + // handle json topic + auto jsonString = this->meshPacketToJson(env->packet); + if (jsonString.length() != 0) { + std::string topicJson = jsonTopic + env->channel_id + "/" + owner.id; + LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), + jsonString.c_str()); + publish(topicJson.c_str(), jsonString.c_str(), false); + } + } + mqttPool.release(env); + } } void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex) @@ -368,27 +452,26 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex) env->channel_id = (char *)channelId; env->gateway_id = owner.id; env->packet = (meshtastic_MeshPacket *)∓ + LOG_DEBUG("MQTT onSend - Publishing portnum %i message\n", env->packet->decoded.portnum); - // don't bother sending if not connected... - if (pubSub.connected()) { - + if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) { // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets static uint8_t bytes[meshtastic_MeshPacket_size + 64]; size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env); std::string topic = cryptTopic + channelId + "/" + owner.id; - LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes); + LOG_DEBUG("MQTT Publish %s, %u bytes\n", topic.c_str(), numBytes); - pubSub.publish(topic.c_str(), bytes, numBytes, false); + publish(topic.c_str(), bytes, numBytes, false); if (moduleConfig.mqtt.json_enabled) { // handle json topic - auto jsonString = this->downstreamPacketToJson((meshtastic_MeshPacket *)&mp); + auto jsonString = this->meshPacketToJson((meshtastic_MeshPacket *)&mp); if (jsonString.length() != 0) { std::string topicJson = jsonTopic + channelId + "/" + owner.id; LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); - pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); + publish(topicJson.c_str(), jsonString.c_str(), false); } } } else { @@ -408,7 +491,7 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex) } // converts a downstream packet into a json message -std::string MQTT::downstreamPacketToJson(meshtastic_MeshPacket *mp) +std::string MQTT::meshPacketToJson(meshtastic_MeshPacket *mp) { // the created jsonObj is immutable after creation, so // we need to do the heavy lifting before assembling it. diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index 1e626c3e0..fc436c22e 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -5,15 +5,20 @@ #include "concurrency/OSThread.h" #include "mesh/Channels.h" #include "mesh/generated/meshtastic/mqtt.pb.h" -#include #if HAS_WIFI #include +#define HAS_NETWORKING 1 #if !defined(ARCH_PORTDUINO) #include #endif #endif #if HAS_ETHERNET #include +#define HAS_NETWORKING 1 +#endif + +#ifdef HAS_NETWORKING +#include #endif #define MAX_MQTT_QUEUE 16 @@ -35,12 +40,9 @@ class MQTT : private concurrency::OSThread #if HAS_ETHERNET EthernetClient mqttClient; #endif -#if !defined(DEBUG_HEAP_MQTT) - PubSubClient pubSub; public: -#else - public: +#ifdef HAS_NETWORKING PubSubClient pubSub; #endif MQTT(); @@ -59,7 +61,13 @@ class MQTT : private concurrency::OSThread */ void reconnect(); - bool connected(); + bool isConnectedDirectly(); + + bool publish(const char *topic, const char *payload, bool retained); + + bool publish(const char *topic, const uint8_t *payload, size_t length, const bool retained); + + void onClientProxyReceive(meshtastic_MqttClientProxyMessage msg); protected: PointerQueue mqttQueue; @@ -80,14 +88,17 @@ class MQTT : private concurrency::OSThread */ void sendSubscriptions(); - /// Just C glue to call onPublish + /// Callback for direct mqtt subscription messages static void mqttCallback(char *topic, byte *payload, unsigned int length); /// Called when a new publish arrives from the MQTT server - void onPublish(char *topic, byte *payload, unsigned int length); + void onReceive(char *topic, byte *payload, size_t length); /// Called when a new publish arrives from the MQTT server - std::string downstreamPacketToJson(meshtastic_MeshPacket *mp); + std::string meshPacketToJson(meshtastic_MeshPacket *mp); + + void publishStatus(); + void publishQueuedMessages(); /// Return 0 if sleep is okay, veto sleep if we are connected to pubsub server // int preflightSleepCb(void *unused = NULL) { return pubSub.connected() ? 1 : 0; }