#include "MQTT.h" #include "MeshService.h" #include "NodeDB.h" #include "PowerFSM.h" #include "main.h" #include "mesh/Channels.h" #include "mesh/Router.h" #include "mesh/generated/meshtastic/mqtt.pb.h" #include "mesh/generated/meshtastic/telemetry.pb.h" #include "modules/RoutingModule.h" #if defined(ARCH_ESP32) #include "../mesh/generated/meshtastic/paxcount.pb.h" #endif #include "mesh/generated/meshtastic/remote_hardware.pb.h" #include "sleep.h" #if HAS_WIFI #include "mesh/wifi/WiFiAPClient.h" #include #endif #include const int reconnectMax = 5; MQTT *mqtt; static MemoryDynamic staticMqttPool; Allocator &mqttPool = staticMqttPool; void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { mqtt->onReceive(topic, payload, length); } void MQTT::onClientProxyReceive(meshtastic_MqttClientProxyMessage msg) { onReceive(msg.topic, msg.payload_variant.data.bytes, msg.payload_variant.data.size); } void MQTT::onReceive(char *topic, byte *payload, size_t length) { meshtastic_ServiceEnvelope e = meshtastic_ServiceEnvelope_init_default; if (moduleConfig.mqtt.json_enabled && (strncmp(topic, jsonTopic.c_str(), jsonTopic.length()) == 0)) { // check if this is a json payload message by comparing the topic start char payloadStr[length + 1]; memcpy(payloadStr, payload, length); payloadStr[length] = 0; // null terminated string JSONValue *json_value = JSON::Parse(payloadStr); if (json_value != NULL) { // check if it is a valid envelope JSONObject json; json = json_value->AsObject(); // parse the channel name from the topic string by looking for "json/" const char *jsonSlash = "json/"; char *ptr = strstr(topic, jsonSlash) + sizeof(jsonSlash) + 1; // set pointer to after "json/" ptr = strtok(ptr, "/") ? strtok(ptr, "/") : ptr; // if another "/" was added, parse string up to that character meshtastic_Channel sendChannel = channels.getByName(ptr); // We allow downlink JSON packets only on a channel named "mqtt" if (strncasecmp(channels.getGlobalId(sendChannel.index), Channels::mqttChannel, strlen(Channels::mqttChannel)) == 0 && sendChannel.settings.downlink_enabled) { if (isValidJsonEnvelope(json)) { // this is a valid envelope if (json["type"]->AsString().compare("sendtext") == 0 && json["payload"]->IsString()) { std::string jsonPayloadStr = json["payload"]->AsString(); LOG_INFO("JSON payload %s, length %u\n", jsonPayloadStr.c_str(), jsonPayloadStr.length()); // construct protobuf data packet using TEXT_MESSAGE, send it to the mesh meshtastic_MeshPacket *p = router->allocForSending(); p->decoded.portnum = meshtastic_PortNum_TEXT_MESSAGE_APP; if (json.find("channel") != json.end() && json["channel"]->IsNumber() && (json["channel"]->AsNumber() < channels.getNumChannels())) p->channel = json["channel"]->AsNumber(); if (json.find("to") != json.end() && json["to"]->IsNumber()) p->to = json["to"]->AsNumber(); if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) { memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length()); p->decoded.payload.size = jsonPayloadStr.length(); service.sendToMesh(p, RX_SRC_LOCAL); } else { LOG_WARN("Received MQTT json payload too long, dropping\n"); } } else if (json["type"]->AsString().compare("sendposition") == 0 && json["payload"]->IsObject()) { // invent the "sendposition" type for a valid envelope JSONObject posit; posit = json["payload"]->AsObject(); // get nested JSON Position meshtastic_Position pos = meshtastic_Position_init_default; if (posit.find("latitude_i") != posit.end() && posit["latitude_i"]->IsNumber()) pos.latitude_i = posit["latitude_i"]->AsNumber(); if (posit.find("longitude_i") != posit.end() && posit["longitude_i"]->IsNumber()) pos.longitude_i = posit["longitude_i"]->AsNumber(); if (posit.find("altitude") != posit.end() && posit["altitude"]->IsNumber()) pos.altitude = posit["altitude"]->AsNumber(); if (posit.find("time") != posit.end() && posit["time"]->IsNumber()) pos.time = posit["time"]->AsNumber(); // construct protobuf data packet using POSITION, send it to the mesh meshtastic_MeshPacket *p = router->allocForSending(); p->decoded.portnum = meshtastic_PortNum_POSITION_APP; if (json.find("channel") != json.end() && json["channel"]->IsNumber() && (json["channel"]->AsNumber() < channels.getNumChannels())) p->channel = json["channel"]->AsNumber(); if (json.find("to") != json.end() && json["to"]->IsNumber()) p->to = json["to"]->AsNumber(); p->decoded.payload.size = pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &meshtastic_Position_msg, &pos); // make the Data protobuf from position service.sendToMesh(p, RX_SRC_LOCAL); } else { LOG_DEBUG("JSON Ignoring downlink message with unsupported type.\n"); } } else { LOG_ERROR("JSON Received payload on MQTT but not a valid envelope.\n"); } } else { LOG_WARN("JSON downlink received on channel not called 'mqtt' or without downlink enabled.\n"); } } else { // no json, this is an invalid payload LOG_ERROR("JSON Received payload on MQTT but not a valid JSON\n"); } delete json_value; } else { if (length == 0) { LOG_WARN("Empty MQTT payload received, topic %s!\n", topic); return; } else if (!pb_decode_from_bytes(payload, length, &meshtastic_ServiceEnvelope_msg, &e)) { LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length); return; } else { meshtastic_Channel ch = channels.getByName(e.channel_id); if (strcmp(e.gateway_id, owner.id) == 0) { // Generate an implicit ACK towards ourselves (handled and processed only locally!) for this message. // We do this because packets are not rebroadcasted back into MQTT anymore and we assume that at least one node // receives it when we get our own packet back. Then we'll stop our retransmissions. if (e.packet && getFrom(e.packet) == nodeDB.getNodeNum()) routingModule->sendAckNak(meshtastic_Routing_Error_NONE, getFrom(e.packet), e.packet->id, ch.index); else LOG_INFO("Ignoring downlink message we originally sent.\n"); } else { // Find channel by channel_id and check downlink_enabled if (strcmp(e.channel_id, channels.getGlobalId(ch.index)) == 0 && e.packet && ch.settings.downlink_enabled) { LOG_INFO("Received MQTT topic %s, len=%u\n", topic, length); meshtastic_MeshPacket *p = packetPool.allocCopy(*e.packet); p->via_mqtt = true; // Mark that the packet was received via MQTT if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) { p->channel = ch.index; } // ignore messages if we don't have the channel key if (router && perhapsDecode(p)) router->enqueueReceivedMessage(p); else packetPool.release(p); } } } // make sure to free both strings and the MeshPacket (passing in NULL is acceptable) free(e.channel_id); free(e.gateway_id); free(e.packet); } } void mqttInit() { new MQTT(); } #ifdef HAS_NETWORKING MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE) #else MQTT::MQTT() : concurrency::OSThread("mqtt"), mqttQueue(MAX_MQTT_QUEUE) #endif { if (moduleConfig.mqtt.enabled) { LOG_DEBUG("Initializing MQTT\n"); assert(!mqtt); mqtt = this; if (*moduleConfig.mqtt.root) { statusTopic = moduleConfig.mqtt.root + statusTopic; cryptTopic = moduleConfig.mqtt.root + cryptTopic; jsonTopic = moduleConfig.mqtt.root + jsonTopic; mapTopic = moduleConfig.mqtt.root + jsonTopic; } else { statusTopic = "msh" + statusTopic; cryptTopic = "msh" + cryptTopic; jsonTopic = "msh" + jsonTopic; mapTopic = "msh" + mapTopic; } if (moduleConfig.mqtt.map_reporting_enabled && moduleConfig.mqtt.has_map_report_settings) { map_position_precision = moduleConfig.mqtt.map_report_settings.position_precision; map_publish_interval_secs = moduleConfig.mqtt.map_report_settings.publish_interval_secs; } #ifdef HAS_NETWORKING if (!moduleConfig.mqtt.proxy_to_client_enabled) pubSub.setCallback(mqttCallback); #endif if (moduleConfig.mqtt.proxy_to_client_enabled) { LOG_INFO("MQTT configured to use client proxy...\n"); enabled = true; runASAP = true; reconnectCount = 0; publishStatus(); } // preflightSleepObserver.observe(&preflightSleep); } else { disable(); } } bool MQTT::isConnectedDirectly() { #ifdef HAS_NETWORKING return pubSub.connected(); #else return false; #endif } bool MQTT::publish(const char *topic, const char *payload, bool retained) { if (moduleConfig.mqtt.proxy_to_client_enabled) { meshtastic_MqttClientProxyMessage *msg = mqttClientProxyMessagePool.allocZeroed(); msg->which_payload_variant = meshtastic_MqttClientProxyMessage_text_tag; strcpy(msg->topic, topic); strcpy(msg->payload_variant.text, payload); msg->retained = retained; service.sendMqttMessageToClientProxy(msg); return true; } #ifdef HAS_NETWORKING else if (isConnectedDirectly()) { return pubSub.publish(topic, payload, retained); } #endif return false; } bool MQTT::publish(const char *topic, const uint8_t *payload, size_t length, bool retained) { if (moduleConfig.mqtt.proxy_to_client_enabled) { meshtastic_MqttClientProxyMessage *msg = mqttClientProxyMessagePool.allocZeroed(); msg->which_payload_variant = meshtastic_MqttClientProxyMessage_data_tag; strcpy(msg->topic, topic); msg->payload_variant.data.size = length; memcpy(msg->payload_variant.data.bytes, payload, length); msg->retained = retained; service.sendMqttMessageToClientProxy(msg); return true; } #ifdef HAS_NETWORKING else if (isConnectedDirectly()) { return pubSub.publish(topic, payload, length, retained); } #endif return false; } void MQTT::reconnect() { if (wantsLink()) { if (moduleConfig.mqtt.proxy_to_client_enabled) { LOG_INFO("MQTT connecting via client proxy instead...\n"); enabled = true; runASAP = true; reconnectCount = 0; publishStatus(); return; // Don't try to connect directly to the server } #ifdef HAS_NETWORKING // Defaults int serverPort = 1883; const char *serverAddr = default_mqtt_address; const char *mqttUsername = default_mqtt_username; const char *mqttPassword = default_mqtt_password; if (*moduleConfig.mqtt.address) { serverAddr = moduleConfig.mqtt.address; mqttUsername = moduleConfig.mqtt.username; mqttPassword = moduleConfig.mqtt.password; } #if HAS_WIFI && !defined(ARCH_PORTDUINO) if (moduleConfig.mqtt.tls_enabled) { // change default for encrypted to 8883 try { serverPort = 8883; wifiSecureClient.setInsecure(); pubSub.setClient(wifiSecureClient); LOG_INFO("Using TLS-encrypted session\n"); } catch (const std::exception &e) { LOG_ERROR("MQTT ERROR: %s\n", e.what()); } } else { LOG_INFO("Using non-TLS-encrypted session\n"); pubSub.setClient(mqttClient); } #elif HAS_NETWORKING pubSub.setClient(mqttClient); #endif String server = String(serverAddr); int delimIndex = server.indexOf(':'); if (delimIndex > 0) { String port = server.substring(delimIndex + 1, server.length()); server[delimIndex] = 0; serverPort = port.toInt(); serverAddr = server.c_str(); } pubSub.setServer(serverAddr, serverPort); pubSub.setBufferSize(512); LOG_INFO("Attempting to connect directly to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, serverPort, mqttUsername, mqttPassword); auto myStatus = (statusTopic + owner.id); bool connected = pubSub.connect(owner.id, mqttUsername, mqttPassword, myStatus.c_str(), 1, true, "offline"); if (connected) { LOG_INFO("MQTT connected\n"); enabled = true; // Start running background process again runASAP = true; reconnectCount = 0; publishStatus(); sendSubscriptions(); } else { #if HAS_WIFI && !defined(ARCH_PORTDUINO) reconnectCount++; LOG_ERROR("Failed to contact MQTT server directly (%d/%d)...\n", reconnectCount, reconnectMax); if (reconnectCount >= reconnectMax) { needReconnect = true; wifiReconnect->setIntervalFromNow(0); reconnectCount = 0; } #endif } #endif } } void MQTT::sendSubscriptions() { #ifdef HAS_NETWORKING size_t numChan = channels.getNumChannels(); for (size_t i = 0; i < numChan; i++) { const auto &ch = channels.getByIndex(i); if (ch.settings.downlink_enabled) { std::string topic = cryptTopic + channels.getGlobalId(i) + "/#"; LOG_INFO("Subscribing to %s\n", topic.c_str()); pubSub.subscribe(topic.c_str(), 1); // FIXME, is QOS 1 right? #ifndef ARCH_NRF52 // JSON is not supported on nRF52, see issue #2804 if (moduleConfig.mqtt.json_enabled == true) { std::string topicDecoded = jsonTopic + channels.getGlobalId(i) + "/#"; LOG_INFO("Subscribing to %s\n", topicDecoded.c_str()); pubSub.subscribe(topicDecoded.c_str(), 1); // FIXME, is QOS 1 right? } #endif // ARCH_NRF52 } } #endif } bool MQTT::wantsLink() const { bool hasChannelorMapReport = false; if (moduleConfig.mqtt.enabled) { hasChannelorMapReport = moduleConfig.mqtt.map_reporting_enabled; if (!hasChannelorMapReport) { // No need for link if no channel needed it size_t numChan = channels.getNumChannels(); for (size_t i = 0; i < numChan; i++) { const auto &ch = channels.getByIndex(i); if (ch.settings.uplink_enabled || ch.settings.downlink_enabled) { hasChannelorMapReport = true; break; } } } } if (hasChannelorMapReport && moduleConfig.mqtt.proxy_to_client_enabled) return true; #if HAS_WIFI return hasChannelorMapReport && WiFi.isConnected(); #endif #if HAS_ETHERNET return hasChannelorMapReport && Ethernet.linkStatus() == LinkON; #endif return false; } int32_t MQTT::runOnce() { if (!moduleConfig.mqtt.enabled) return disable(); bool wantConnection = wantsLink(); perhapsReportToMap(); // If connected poll rapidly, otherwise only occasionally check for a wifi connection change and ability to contact server if (moduleConfig.mqtt.proxy_to_client_enabled) { publishQueuedMessages(); return 200; } #ifdef HAS_NETWORKING else if (!pubSub.loop()) { if (!wantConnection) return 5000; // If we don't want connection now, check again in 5 secs else { reconnect(); // If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP // connections are EXPENSIVE so try rarely) if (isConnectedDirectly()) { publishQueuedMessages(); return 200; } else return 30000; } } else { // we are connected to server, check often for new requests on the TCP port if (!wantConnection) { LOG_INFO("MQTT link not needed, dropping\n"); pubSub.disconnect(); } powerFSM.trigger(EVENT_CONTACT_FROM_PHONE); // Suppress entering light sleep (because that would turn off bluetooth) return 20; } #endif return 30000; } /// FIXME, include more information in the status text void MQTT::publishStatus() { auto myStatus = (statusTopic + owner.id); bool ok = publish(myStatus.c_str(), "online", true); LOG_INFO("published online=%d\n", ok); } void MQTT::publishQueuedMessages() { if (!mqttQueue.isEmpty()) { LOG_DEBUG("Publishing enqueued MQTT message\n"); // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0); static uint8_t bytes[meshtastic_MeshPacket_size + 64]; size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env); std::string topic = cryptTopic + env->channel_id + "/" + owner.id; LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes); publish(topic.c_str(), bytes, numBytes, false); #ifndef ARCH_NRF52 // JSON is not supported on nRF52, see issue #2804 if (moduleConfig.mqtt.json_enabled) { // handle json topic auto jsonString = this->meshPacketToJson(env->packet); if (jsonString.length() != 0) { std::string topicJson = jsonTopic + env->channel_id + "/" + owner.id; LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); publish(topicJson.c_str(), jsonString.c_str(), false); } } #endif // ARCH_NRF52 mqttPool.release(env); } } void MQTT::onSend(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket &mp_decoded, ChannelIndex chIndex) { if (mp.via_mqtt) return; // Don't send messages that came from MQTT back into MQTT auto &ch = channels.getByIndex(chIndex); if (&mp.decoded && strcmp(moduleConfig.mqtt.address, default_mqtt_address) == 0 && (mp.decoded.portnum == meshtastic_PortNum_RANGE_TEST_APP || mp.decoded.portnum == meshtastic_PortNum_DETECTION_SENSOR_APP)) { LOG_DEBUG("MQTT onSend - Ignoring range test or detection sensor message on public mqtt\n"); return; } if (ch.settings.uplink_enabled) { const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel meshtastic_ServiceEnvelope *env = mqttPool.allocZeroed(); env->channel_id = (char *)channelId; env->gateway_id = owner.id; LOG_DEBUG("MQTT onSend - Publishing "); if (moduleConfig.mqtt.encryption_enabled) { env->packet = (meshtastic_MeshPacket *)∓ LOG_DEBUG("encrypted message\n"); } else { env->packet = (meshtastic_MeshPacket *)&mp_decoded; LOG_DEBUG("portnum %i message\n", env->packet->decoded.portnum); } if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) { // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets static uint8_t bytes[meshtastic_MeshPacket_size + 64]; size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env); std::string topic = cryptTopic + channelId + "/" + owner.id; LOG_DEBUG("MQTT Publish %s, %u bytes\n", topic.c_str(), numBytes); publish(topic.c_str(), bytes, numBytes, false); #ifndef ARCH_NRF52 // JSON is not supported on nRF52, see issue #2804 if (moduleConfig.mqtt.json_enabled) { // handle json topic auto jsonString = this->meshPacketToJson((meshtastic_MeshPacket *)&mp_decoded); if (jsonString.length() != 0) { std::string topicJson = jsonTopic + channelId + "/" + owner.id; LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); publish(topicJson.c_str(), jsonString.c_str(), false); } } #endif // ARCH_NRF52 } else { LOG_INFO("MQTT not connected, queueing packet\n"); if (mqttQueue.numFree() == 0) { LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n"); meshtastic_ServiceEnvelope *d = mqttQueue.dequeuePtr(0); if (d) mqttPool.release(d); } // make a copy of serviceEnvelope and queue it meshtastic_ServiceEnvelope *copied = mqttPool.allocCopy(*env); assert(mqttQueue.enqueue(copied, 0)); } mqttPool.release(env); } } void MQTT::perhapsReportToMap() { if (!moduleConfig.mqtt.map_reporting_enabled || !(moduleConfig.mqtt.proxy_to_client_enabled || isConnectedDirectly())) return; if (map_position_precision == 0 || (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)) { LOG_WARN("MQTT Map reporting is enabled, but precision is 0 or no position available.\n"); return; } if (millis() - last_report_to_map < map_publish_interval_secs * 1000) { return; } else { // Allocate ServiceEnvelope and fill it meshtastic_ServiceEnvelope *se = mqttPool.allocZeroed(); se->channel_id = (char *)channels.getGlobalId(channels.getPrimaryIndex()); // Use primary channel as the channel_id se->gateway_id = owner.id; // Allocate MeshPacket and fill it meshtastic_MeshPacket *mp = packetPool.allocZeroed(); mp->which_payload_variant = meshtastic_MeshPacket_decoded_tag; mp->from = nodeDB.getNodeNum(); mp->to = NODENUM_BROADCAST; mp->decoded.portnum = meshtastic_PortNum_MAP_REPORT_APP; // Fill MapReport message meshtastic_MapReport mapReport = meshtastic_MapReport_init_default; memcpy(mapReport.long_name, owner.long_name, sizeof(owner.long_name)); memcpy(mapReport.short_name, owner.short_name, sizeof(owner.short_name)); mapReport.role = config.device.role; mapReport.hw_model = owner.hw_model; strncpy(mapReport.firmware_version, optstr(APP_VERSION), sizeof(mapReport.firmware_version)); mapReport.region = config.lora.region; mapReport.modem_preset = config.lora.modem_preset; mapReport.has_default_channel = channels.hasDefaultChannel(); // Set position with precision (same as in PositionModule) if (map_position_precision < 32 && map_position_precision > 0) { mapReport.latitude_i = localPosition.latitude_i & (UINT32_MAX << (32 - map_position_precision)); mapReport.longitude_i = localPosition.longitude_i & (UINT32_MAX << (32 - map_position_precision)); mapReport.latitude_i += (1 << (31 - map_position_precision)); mapReport.longitude_i += (1 << (31 - map_position_precision)); } else { mapReport.latitude_i = localPosition.latitude_i; mapReport.longitude_i = localPosition.longitude_i; } mapReport.altitude = localPosition.altitude; mapReport.position_precision = map_position_precision; mapReport.num_online_local_nodes = nodeDB.getNumOnlineMeshNodes(true); // Encode MapReport message and set it to MeshPacket in ServiceEnvelope mp->decoded.payload.size = pb_encode_to_bytes(mp->decoded.payload.bytes, sizeof(mp->decoded.payload.bytes), &meshtastic_MapReport_msg, &mapReport); se->packet = mp; // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets static uint8_t bytes[meshtastic_MeshPacket_size + 64]; size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, se); LOG_INFO("MQTT Publish map report to %s\n", mapTopic.c_str()); publish(mapTopic.c_str(), bytes, numBytes, false); // Release the allocated memory for ServiceEnvelope and MeshPacket mqttPool.release(se); packetPool.release(mp); // Update the last report time last_report_to_map = millis(); } } // converts a downstream packet into a json message std::string MQTT::meshPacketToJson(meshtastic_MeshPacket *mp) { // the created jsonObj is immutable after creation, so // we need to do the heavy lifting before assembling it. std::string msgType; JSONObject jsonObj; if (mp->which_payload_variant == meshtastic_MeshPacket_decoded_tag) { JSONObject msgPayload; switch (mp->decoded.portnum) { case meshtastic_PortNum_TEXT_MESSAGE_APP: { msgType = "text"; // convert bytes to string LOG_DEBUG("got text message of size %u\n", mp->decoded.payload.size); char payloadStr[(mp->decoded.payload.size) + 1]; memcpy(payloadStr, mp->decoded.payload.bytes, mp->decoded.payload.size); payloadStr[mp->decoded.payload.size] = 0; // null terminated string // check if this is a JSON payload JSONValue *json_value = JSON::Parse(payloadStr); if (json_value != NULL) { LOG_INFO("text message payload is of type json\n"); // if it is, then we can just use the json object jsonObj["payload"] = json_value; } else { // if it isn't, then we need to create a json object // with the string as the value LOG_INFO("text message payload is of type plaintext\n"); msgPayload["text"] = new JSONValue(payloadStr); jsonObj["payload"] = new JSONValue(msgPayload); } break; } case meshtastic_PortNum_TELEMETRY_APP: { msgType = "telemetry"; meshtastic_Telemetry scratch; meshtastic_Telemetry *decoded = NULL; memset(&scratch, 0, sizeof(scratch)); if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_Telemetry_msg, &scratch)) { decoded = &scratch; if (decoded->which_variant == meshtastic_Telemetry_device_metrics_tag) { msgPayload["battery_level"] = new JSONValue((uint)decoded->variant.device_metrics.battery_level); msgPayload["voltage"] = new JSONValue(decoded->variant.device_metrics.voltage); msgPayload["channel_utilization"] = new JSONValue(decoded->variant.device_metrics.channel_utilization); msgPayload["air_util_tx"] = new JSONValue(decoded->variant.device_metrics.air_util_tx); } else if (decoded->which_variant == meshtastic_Telemetry_environment_metrics_tag) { msgPayload["temperature"] = new JSONValue(decoded->variant.environment_metrics.temperature); msgPayload["relative_humidity"] = new JSONValue(decoded->variant.environment_metrics.relative_humidity); msgPayload["barometric_pressure"] = new JSONValue(decoded->variant.environment_metrics.barometric_pressure); msgPayload["gas_resistance"] = new JSONValue(decoded->variant.environment_metrics.gas_resistance); msgPayload["voltage"] = new JSONValue(decoded->variant.environment_metrics.voltage); msgPayload["current"] = new JSONValue(decoded->variant.environment_metrics.current); } else if (decoded->which_variant == meshtastic_Telemetry_power_metrics_tag) { msgPayload["voltage_ch1"] = new JSONValue(decoded->variant.power_metrics.ch1_voltage); msgPayload["current_ch1"] = new JSONValue(decoded->variant.power_metrics.ch1_current); msgPayload["voltage_ch2"] = new JSONValue(decoded->variant.power_metrics.ch2_voltage); msgPayload["current_ch2"] = new JSONValue(decoded->variant.power_metrics.ch2_current); msgPayload["voltage_ch3"] = new JSONValue(decoded->variant.power_metrics.ch3_voltage); msgPayload["current_ch3"] = new JSONValue(decoded->variant.power_metrics.ch3_current); } jsonObj["payload"] = new JSONValue(msgPayload); } else { LOG_ERROR("Error decoding protobuf for telemetry message!\n"); } break; } case meshtastic_PortNum_NODEINFO_APP: { msgType = "nodeinfo"; meshtastic_User scratch; meshtastic_User *decoded = NULL; memset(&scratch, 0, sizeof(scratch)); if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_User_msg, &scratch)) { decoded = &scratch; msgPayload["id"] = new JSONValue(decoded->id); msgPayload["longname"] = new JSONValue(decoded->long_name); msgPayload["shortname"] = new JSONValue(decoded->short_name); msgPayload["hardware"] = new JSONValue(decoded->hw_model); jsonObj["payload"] = new JSONValue(msgPayload); } else { LOG_ERROR("Error decoding protobuf for nodeinfo message!\n"); } break; } case meshtastic_PortNum_POSITION_APP: { msgType = "position"; meshtastic_Position scratch; meshtastic_Position *decoded = NULL; memset(&scratch, 0, sizeof(scratch)); if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_Position_msg, &scratch)) { decoded = &scratch; if ((int)decoded->time) { msgPayload["time"] = new JSONValue((uint)decoded->time); } if ((int)decoded->timestamp) { msgPayload["timestamp"] = new JSONValue((uint)decoded->timestamp); } msgPayload["latitude_i"] = new JSONValue((int)decoded->latitude_i); msgPayload["longitude_i"] = new JSONValue((int)decoded->longitude_i); if ((int)decoded->altitude) { msgPayload["altitude"] = new JSONValue((int)decoded->altitude); } if ((int)decoded->ground_speed) { msgPayload["ground_speed"] = new JSONValue((uint)decoded->ground_speed); } if (int(decoded->ground_track)) { msgPayload["ground_track"] = new JSONValue((uint)decoded->ground_track); } if (int(decoded->sats_in_view)) { msgPayload["sats_in_view"] = new JSONValue((uint)decoded->sats_in_view); } if ((int)decoded->PDOP) { msgPayload["PDOP"] = new JSONValue((int)decoded->PDOP); } if ((int)decoded->HDOP) { msgPayload["HDOP"] = new JSONValue((int)decoded->HDOP); } if ((int)decoded->VDOP) { msgPayload["VDOP"] = new JSONValue((int)decoded->VDOP); } if ((int)decoded->precision_bits) { msgPayload["precision_bits"] = new JSONValue((int)decoded->precision_bits); } jsonObj["payload"] = new JSONValue(msgPayload); } else { LOG_ERROR("Error decoding protobuf for position message!\n"); } break; } case meshtastic_PortNum_WAYPOINT_APP: { msgType = "position"; meshtastic_Waypoint scratch; meshtastic_Waypoint *decoded = NULL; memset(&scratch, 0, sizeof(scratch)); if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_Waypoint_msg, &scratch)) { decoded = &scratch; msgPayload["id"] = new JSONValue((uint)decoded->id); msgPayload["name"] = new JSONValue(decoded->name); msgPayload["description"] = new JSONValue(decoded->description); msgPayload["expire"] = new JSONValue((uint)decoded->expire); msgPayload["locked_to"] = new JSONValue((uint)decoded->locked_to); msgPayload["latitude_i"] = new JSONValue((int)decoded->latitude_i); msgPayload["longitude_i"] = new JSONValue((int)decoded->longitude_i); jsonObj["payload"] = new JSONValue(msgPayload); } else { LOG_ERROR("Error decoding protobuf for position message!\n"); } break; } case meshtastic_PortNum_NEIGHBORINFO_APP: { msgType = "neighborinfo"; meshtastic_NeighborInfo scratch; meshtastic_NeighborInfo *decoded = NULL; memset(&scratch, 0, sizeof(scratch)); if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_NeighborInfo_msg, &scratch)) { decoded = &scratch; msgPayload["node_id"] = new JSONValue((uint)decoded->node_id); msgPayload["node_broadcast_interval_secs"] = new JSONValue((uint)decoded->node_broadcast_interval_secs); msgPayload["last_sent_by_id"] = new JSONValue((uint)decoded->last_sent_by_id); msgPayload["neighbors_count"] = new JSONValue(decoded->neighbors_count); JSONArray neighbors; for (uint8_t i = 0; i < decoded->neighbors_count; i++) { JSONObject neighborObj; neighborObj["node_id"] = new JSONValue((uint)decoded->neighbors[i].node_id); neighborObj["snr"] = new JSONValue((int)decoded->neighbors[i].snr); neighbors.push_back(new JSONValue(neighborObj)); } msgPayload["neighbors"] = new JSONValue(neighbors); jsonObj["payload"] = new JSONValue(msgPayload); } else { LOG_ERROR("Error decoding protobuf for neighborinfo message!\n"); } break; } case meshtastic_PortNum_TRACEROUTE_APP: { if (mp->decoded.request_id) { // Only report the traceroute response msgType = "traceroute"; meshtastic_RouteDiscovery scratch; meshtastic_RouteDiscovery *decoded = NULL; memset(&scratch, 0, sizeof(scratch)); if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_RouteDiscovery_msg, &scratch)) { decoded = &scratch; JSONArray route; // Route this message took // Lambda function for adding a long name to the route auto addToRoute = [](JSONArray *route, NodeNum num) { char long_name[40] = "Unknown"; meshtastic_NodeInfoLite *node = nodeDB.getMeshNode(num); bool name_known = node ? node->has_user : false; if (name_known) memcpy(long_name, node->user.long_name, sizeof(long_name)); route->push_back(new JSONValue(long_name)); }; addToRoute(&route, mp->to); // Started at the original transmitter (destination of response) for (uint8_t i = 0; i < decoded->route_count; i++) { addToRoute(&route, decoded->route[i]); } addToRoute(&route, mp->from); // Ended at the original destination (source of response) msgPayload["route"] = new JSONValue(route); jsonObj["payload"] = new JSONValue(msgPayload); } else { LOG_ERROR("Error decoding protobuf for traceroute message!\n"); } } break; } case meshtastic_PortNum_DETECTION_SENSOR_APP: { msgType = "detection"; char payloadStr[(mp->decoded.payload.size) + 1]; memcpy(payloadStr, mp->decoded.payload.bytes, mp->decoded.payload.size); payloadStr[mp->decoded.payload.size] = 0; // null terminated string msgPayload["text"] = new JSONValue(payloadStr); jsonObj["payload"] = new JSONValue(msgPayload); break; } #ifdef ARCH_ESP32 case meshtastic_PortNum_PAXCOUNTER_APP: { msgType = "paxcounter"; meshtastic_Paxcount scratch; meshtastic_Paxcount *decoded = NULL; memset(&scratch, 0, sizeof(scratch)); if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_Paxcount_msg, &scratch)) { decoded = &scratch; msgPayload["wifi_count"] = new JSONValue((uint)decoded->wifi); msgPayload["ble_count"] = new JSONValue((uint)decoded->ble); msgPayload["uptime"] = new JSONValue((uint)decoded->uptime); jsonObj["payload"] = new JSONValue(msgPayload); } else { LOG_ERROR("Error decoding protobuf for Paxcount message!\n"); } break; } #endif case meshtastic_PortNum_REMOTE_HARDWARE_APP: { meshtastic_HardwareMessage scratch; meshtastic_HardwareMessage *decoded = NULL; memset(&scratch, 0, sizeof(scratch)); if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_HardwareMessage_msg, &scratch)) { decoded = &scratch; if (decoded->type == meshtastic_HardwareMessage_Type_GPIOS_CHANGED) { msgType = "gpios_changed"; msgPayload["gpio_value"] = new JSONValue((uint)decoded->gpio_value); jsonObj["payload"] = new JSONValue(msgPayload); } else if (decoded->type == meshtastic_HardwareMessage_Type_READ_GPIOS_REPLY) { msgType = "gpios_read_reply"; msgPayload["gpio_value"] = new JSONValue((uint)decoded->gpio_value); msgPayload["gpio_mask"] = new JSONValue((uint)decoded->gpio_mask); jsonObj["payload"] = new JSONValue(msgPayload); } } else { LOG_ERROR("Error decoding protobuf for RemoteHardware message!\n"); } break; } // add more packet types here if needed default: break; } } else { LOG_WARN("Couldn't convert encrypted payload of MeshPacket to JSON\n"); } jsonObj["id"] = new JSONValue((uint)mp->id); jsonObj["timestamp"] = new JSONValue((uint)mp->rx_time); jsonObj["to"] = new JSONValue((uint)mp->to); jsonObj["from"] = new JSONValue((uint)mp->from); jsonObj["channel"] = new JSONValue((uint)mp->channel); jsonObj["type"] = new JSONValue(msgType.c_str()); jsonObj["sender"] = new JSONValue(owner.id); if (mp->rx_rssi != 0) jsonObj["rssi"] = new JSONValue((int)mp->rx_rssi); if (mp->rx_snr != 0) jsonObj["snr"] = new JSONValue((float)mp->rx_snr); if (mp->hop_start != 0 && mp->hop_limit <= mp->hop_start) jsonObj["hops_away"] = new JSONValue((uint)(mp->hop_start - mp->hop_limit)); // serialize and write it to the stream JSONValue *value = new JSONValue(jsonObj); std::string jsonStr = value->Stringify(); LOG_INFO("serialized json message: %s\n", jsonStr.c_str()); delete value; return jsonStr; } bool MQTT::isValidJsonEnvelope(JSONObject &json) { // if "sender" is provided, avoid processing packets we uplinked return (json.find("sender") != json.end() ? (json["sender"]->AsString().compare(owner.id) != 0) : true) && (json.find("from") != json.end()) && json["from"]->IsNumber() && (json["from"]->AsNumber() == nodeDB.getNodeNum()) && // only accept message if the "from" is us (json.find("type") != json.end()) && json["type"]->IsString() && // should specify a type (json.find("payload") != json.end()); // should have a payload }