mirror of
https://github.com/meshtastic/firmware.git
synced 2025-04-23 17:13:38 +00:00
JSON MQTT Integration (#1283)
* Added downstream JSON MQTT handling. * Added uplink JSON messaging * Fixed shadow variable. * Added missing dependency. * Fixes Environment -> Telemetry. * Fixed native issue. * Added json11 pio reg dependency. * Fixed json11 library dependency. Co-authored-by: Sacha Weatherstone <sachaw100@hotmail.com>
This commit is contained in:
parent
f33cd4081e
commit
d4e6dd32c5
@ -91,6 +91,7 @@ lib_deps =
|
|||||||
https://github.com/geeksville/ArduinoThread.git#72921ac222eed6f526ba1682023cee290d9aa1b3
|
https://github.com/geeksville/ArduinoThread.git#72921ac222eed6f526ba1682023cee290d9aa1b3
|
||||||
PubSubClient
|
PubSubClient
|
||||||
nanopb/Nanopb@^0.4.6
|
nanopb/Nanopb@^0.4.6
|
||||||
|
meshtastic/json11@^1.0.2
|
||||||
|
|
||||||
; Used for the code analysis in PIO Home / Inspect
|
; Used for the code analysis in PIO Home / Inspect
|
||||||
check_tool = cppcheck
|
check_tool = cppcheck
|
||||||
@ -119,7 +120,7 @@ lib_deps =
|
|||||||
adafruit/Adafruit BME280 Library@^2.2.2
|
adafruit/Adafruit BME280 Library@^2.2.2
|
||||||
adafruit/Adafruit BME680 Library@^2.0.1
|
adafruit/Adafruit BME680 Library@^2.0.1
|
||||||
adafruit/Adafruit MCP9808 Library@^2.0.0
|
adafruit/Adafruit MCP9808 Library@^2.0.0
|
||||||
|
|
||||||
; Common settings for ESP targes, mixin with extends = esp32_base
|
; Common settings for ESP targes, mixin with extends = esp32_base
|
||||||
[esp32_base]
|
[esp32_base]
|
||||||
extends = arduino_base
|
extends = arduino_base
|
||||||
|
@ -1,19 +1,22 @@
|
|||||||
#include "MQTT.h"
|
#include "MQTT.h"
|
||||||
|
#include "MeshService.h"
|
||||||
#include "NodeDB.h"
|
#include "NodeDB.h"
|
||||||
#include "PowerFSM.h"
|
#include "PowerFSM.h"
|
||||||
#include "main.h"
|
#include "main.h"
|
||||||
#include "mesh/Channels.h"
|
#include "mesh/Channels.h"
|
||||||
#include "mesh/Router.h"
|
#include "mesh/Router.h"
|
||||||
#include "mesh/generated/mqtt.pb.h"
|
#include "mesh/generated/mqtt.pb.h"
|
||||||
|
#include "mesh/generated/telemetry.pb.h"
|
||||||
#include "sleep.h"
|
#include "sleep.h"
|
||||||
#include <WiFi.h>
|
#include <WiFi.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
#include <json11.hpp>
|
||||||
|
|
||||||
MQTT *mqtt;
|
MQTT *mqtt;
|
||||||
|
|
||||||
String statusTopic = "msh/1/stat/";
|
String statusTopic = "msh/1/stat/";
|
||||||
String cryptTopic = "msh/1/c/"; // msh/1/c/CHANNELID/NODEID
|
String cryptTopic = "msh/1/c/"; // msh/1/c/CHANNELID/NODEID
|
||||||
String txtTopic = "msh/1/txt/"; // msh/1/txt/CHANNELID/NODEID
|
String jsonTopic = "msh/1/json/"; // msh/1/json/CHANNELID/NODEID
|
||||||
|
|
||||||
void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
|
void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
|
||||||
{
|
{
|
||||||
@ -25,7 +28,43 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
|||||||
// parsing ServiceEnvelope
|
// parsing ServiceEnvelope
|
||||||
ServiceEnvelope e = ServiceEnvelope_init_default;
|
ServiceEnvelope e = ServiceEnvelope_init_default;
|
||||||
if (!pb_decode_from_bytes(payload, length, ServiceEnvelope_fields, &e)) {
|
if (!pb_decode_from_bytes(payload, length, ServiceEnvelope_fields, &e)) {
|
||||||
DEBUG_MSG("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
|
||||||
|
// check if this is a json payload message
|
||||||
|
using namespace json11;
|
||||||
|
char payloadStr[length + 1];
|
||||||
|
memcpy(payloadStr, payload, length);
|
||||||
|
payloadStr[length] = 0; // null terminated string
|
||||||
|
std::string err;
|
||||||
|
auto json = Json::parse(payloadStr, err);
|
||||||
|
if (err.empty()) {
|
||||||
|
DEBUG_MSG("Received json payload on MQTT, parsing..\n");
|
||||||
|
// check if it is a valid envelope
|
||||||
|
if (json.object_items().count("sender") != 0 && json.object_items().count("payload") != 0) {
|
||||||
|
// this is a valid envelope
|
||||||
|
if (json["sender"].string_value().compare(owner.id) != 0) {
|
||||||
|
std::string jsonPayloadStr = json["payload"].dump();
|
||||||
|
DEBUG_MSG("Received json payload %s, length %u\n", jsonPayloadStr.c_str(), jsonPayloadStr.length());
|
||||||
|
// construct protobuf data packet using TEXT_MESSAGE, send it to the mesh
|
||||||
|
MeshPacket *p = router->allocForSending();
|
||||||
|
p->decoded.portnum = PortNum_TEXT_MESSAGE_APP;
|
||||||
|
if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) {
|
||||||
|
memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length());
|
||||||
|
p->decoded.payload.size = jsonPayloadStr.length();
|
||||||
|
MeshPacket *packet = packetPool.allocCopy(*p);
|
||||||
|
service.sendToMesh(packet, RX_SRC_LOCAL);
|
||||||
|
} else {
|
||||||
|
DEBUG_MSG("Received MQTT json payload too long, dropping\n");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
DEBUG_MSG("Ignoring downlink message we originally sent.\n");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
DEBUG_MSG("Received json payload on MQTT but not a valid envelope\n");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// no json, this is an invalid payload
|
||||||
|
DEBUG_MSG("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (strcmp(e.gateway_id, owner.id) == 0)
|
if (strcmp(e.gateway_id, owner.id) == 0)
|
||||||
DEBUG_MSG("Ignoring downlink message we originally sent.\n");
|
DEBUG_MSG("Ignoring downlink message we originally sent.\n");
|
||||||
@ -73,11 +112,11 @@ void MQTT::reconnect()
|
|||||||
const char *mqttPassword = "large4cats";
|
const char *mqttPassword = "large4cats";
|
||||||
|
|
||||||
if (*radioConfig.preferences.mqtt_server) {
|
if (*radioConfig.preferences.mqtt_server) {
|
||||||
serverAddr = radioConfig.preferences.mqtt_server; // Override the default
|
serverAddr = radioConfig.preferences.mqtt_server; // Override the default
|
||||||
mqttUsername = radioConfig.preferences.mqtt_username; //do not use the hardcoded credentials for a custom mqtt server
|
mqttUsername = radioConfig.preferences.mqtt_username; // do not use the hardcoded credentials for a custom mqtt server
|
||||||
mqttPassword = radioConfig.preferences.mqtt_password;
|
mqttPassword = radioConfig.preferences.mqtt_password;
|
||||||
} else {
|
} else {
|
||||||
//we are using the default server. Use the hardcoded credentials by default, but allow overriding
|
// we are using the default server. Use the hardcoded credentials by default, but allow overriding
|
||||||
if (*radioConfig.preferences.mqtt_username && radioConfig.preferences.mqtt_username[0] != '\0') {
|
if (*radioConfig.preferences.mqtt_username && radioConfig.preferences.mqtt_username[0] != '\0') {
|
||||||
mqttUsername = radioConfig.preferences.mqtt_username;
|
mqttUsername = radioConfig.preferences.mqtt_username;
|
||||||
}
|
}
|
||||||
@ -96,7 +135,8 @@ void MQTT::reconnect()
|
|||||||
}
|
}
|
||||||
pubSub.setServer(serverAddr, serverPort);
|
pubSub.setServer(serverAddr, serverPort);
|
||||||
|
|
||||||
DEBUG_MSG("Connecting to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, serverPort, mqttUsername, mqttPassword);
|
DEBUG_MSG("Connecting to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, serverPort, mqttUsername,
|
||||||
|
mqttPassword);
|
||||||
auto myStatus = (statusTopic + owner.id);
|
auto myStatus = (statusTopic + owner.id);
|
||||||
bool connected = pubSub.connect(owner.id, mqttUsername, mqttPassword, myStatus.c_str(), 1, true, "offline");
|
bool connected = pubSub.connect(owner.id, mqttUsername, mqttPassword, myStatus.c_str(), 1, true, "offline");
|
||||||
if (connected) {
|
if (connected) {
|
||||||
@ -123,6 +163,9 @@ void MQTT::sendSubscriptions()
|
|||||||
String topic = cryptTopic + channels.getGlobalId(i) + "/#";
|
String topic = cryptTopic + channels.getGlobalId(i) + "/#";
|
||||||
DEBUG_MSG("Subscribing to %s\n", topic.c_str());
|
DEBUG_MSG("Subscribing to %s\n", topic.c_str());
|
||||||
pubSub.subscribe(topic.c_str(), 1); // FIXME, is QOS 1 right?
|
pubSub.subscribe(topic.c_str(), 1); // FIXME, is QOS 1 right?
|
||||||
|
String topicDecoded = jsonTopic + channels.getGlobalId(i) + "/#";
|
||||||
|
DEBUG_MSG("Subscribing to %s\n", topicDecoded.c_str());
|
||||||
|
pubSub.subscribe(topicDecoded.c_str(), 1); // FIXME, is QOS 1 right?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -195,12 +238,124 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
|||||||
|
|
||||||
pubSub.publish(topic.c_str(), bytes, numBytes, false);
|
pubSub.publish(topic.c_str(), bytes, numBytes, false);
|
||||||
|
|
||||||
// publish to txt topic for messages of type TEXT_MESSAGE_APP
|
// handle json topic
|
||||||
if (mp.decoded.portnum == PortNum_TEXT_MESSAGE_APP) {
|
using namespace json11;
|
||||||
String plaintextTopic = txtTopic + channelId + "/" + owner.id;
|
auto jsonString = this->downstreamPacketToJson((MeshPacket *)&mp);
|
||||||
DEBUG_MSG("publish txt %s, %u bytes\n", topic.c_str(), numBytes);
|
if (jsonString.length() != 0) {
|
||||||
auto &p = mp.decoded;
|
String topicJson = jsonTopic + channelId + "/" + owner.id;
|
||||||
pubSub.publish(plaintextTopic.c_str(), p.payload.bytes, p.payload.size, false);
|
DEBUG_MSG("publish json message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
|
||||||
|
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// converts a downstream packet into a json message
|
||||||
|
String MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||||
|
{
|
||||||
|
using namespace json11;
|
||||||
|
|
||||||
|
// the created jsonObj is immutable after creation, so
|
||||||
|
// we need to do the heavy lifting before assembling it.
|
||||||
|
String msgType;
|
||||||
|
Json msgPayload;
|
||||||
|
|
||||||
|
switch (mp->decoded.portnum) {
|
||||||
|
case PortNum_TEXT_MESSAGE_APP: {
|
||||||
|
msgType = "text";
|
||||||
|
// convert bytes to string
|
||||||
|
DEBUG_MSG("got text message of size %u\n", mp->decoded.payload.size);
|
||||||
|
char payloadStr[(mp->decoded.payload.size) + 1];
|
||||||
|
memcpy(payloadStr, mp->decoded.payload.bytes, mp->decoded.payload.size);
|
||||||
|
payloadStr[mp->decoded.payload.size] = 0; // null terminated string
|
||||||
|
// check if this is a JSON payload
|
||||||
|
std::string err;
|
||||||
|
auto json = Json::parse(payloadStr, err);
|
||||||
|
if (err.empty()) {
|
||||||
|
DEBUG_MSG("text message payload is of type json\n");
|
||||||
|
// if it is, then we can just use the json object
|
||||||
|
msgPayload = json;
|
||||||
|
} else {
|
||||||
|
// if it isn't, then we need to create a json object
|
||||||
|
// with the string as the value
|
||||||
|
DEBUG_MSG("text message payload is of type plaintext\n");
|
||||||
|
msgPayload = Json::object({{"text", payloadStr}});
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PortNum_TELEMETRY_APP: {
|
||||||
|
msgType = "telemetry";
|
||||||
|
Telemetry scratch;
|
||||||
|
Telemetry *decoded = NULL;
|
||||||
|
if (mp->which_payloadVariant == MeshPacket_decoded_tag) {
|
||||||
|
memset(&scratch, 0, sizeof(scratch));
|
||||||
|
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &Telemetry_msg,
|
||||||
|
&scratch)) {
|
||||||
|
decoded = &scratch;
|
||||||
|
msgPayload = Json::object{
|
||||||
|
{"temperature", decoded->temperature},
|
||||||
|
{"relative_humidity", decoded->relative_humidity},
|
||||||
|
{"barometric_pressure", decoded->barometric_pressure},
|
||||||
|
{"gas_resistance", decoded->gas_resistance},
|
||||||
|
{"voltage", decoded->voltage},
|
||||||
|
{"current", decoded->current},
|
||||||
|
};
|
||||||
|
} else
|
||||||
|
DEBUG_MSG("Error decoding protobuf for telemetry message!\n");
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PortNum_NODEINFO_APP: {
|
||||||
|
msgType = "nodeinfo";
|
||||||
|
User scratch;
|
||||||
|
User *decoded = NULL;
|
||||||
|
if (mp->which_payloadVariant == MeshPacket_decoded_tag) {
|
||||||
|
memset(&scratch, 0, sizeof(scratch));
|
||||||
|
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &User_msg, &scratch)) {
|
||||||
|
decoded = &scratch;
|
||||||
|
msgPayload = Json::object{{"id", decoded->id},
|
||||||
|
{"longname", decoded->long_name},
|
||||||
|
{"shortname", decoded->short_name},
|
||||||
|
{"hardware", decoded->hw_model}};
|
||||||
|
|
||||||
|
} else
|
||||||
|
DEBUG_MSG("Error decoding protobuf for nodeinfo message!\n");
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PortNum_POSITION_APP: {
|
||||||
|
msgType = "position";
|
||||||
|
Position scratch;
|
||||||
|
Position *decoded = NULL;
|
||||||
|
if (mp->which_payloadVariant == MeshPacket_decoded_tag) {
|
||||||
|
memset(&scratch, 0, sizeof(scratch));
|
||||||
|
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &Position_msg, &scratch)) {
|
||||||
|
decoded = &scratch;
|
||||||
|
msgPayload = Json::object{
|
||||||
|
{"latitude_i", decoded->latitude_i}, {"longitude_i", decoded->longitude_i}, {"altitude", decoded->altitude}};
|
||||||
|
} else {
|
||||||
|
DEBUG_MSG("Error decoding protobuf for position message!\n");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// add more packet types here if needed
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// assemble the final jsonObj
|
||||||
|
Json jsonObj = Json::object{{"id", Json((int)mp->id)},
|
||||||
|
{"timestamp", Json((int)mp->rx_time)},
|
||||||
|
{"to", Json((int)mp->to)},
|
||||||
|
{"from", Json((int)mp->from)},
|
||||||
|
{"channel", Json((int)mp->channel)},
|
||||||
|
{"type", msgType.c_str()},
|
||||||
|
{"sender", owner.id},
|
||||||
|
{"payload", msgPayload}};
|
||||||
|
|
||||||
|
// serialize and return it
|
||||||
|
std::string jsonStr = jsonObj.dump();
|
||||||
|
DEBUG_MSG("serialized json message: %s\n", jsonStr.c_str());
|
||||||
|
|
||||||
|
return jsonStr.c_str();
|
||||||
|
}
|
@ -57,6 +57,9 @@ class MQTT : private concurrency::OSThread
|
|||||||
/// Called when a new publish arrives from the MQTT server
|
/// Called when a new publish arrives from the MQTT server
|
||||||
void onPublish(char *topic, byte *payload, unsigned int length);
|
void onPublish(char *topic, byte *payload, unsigned int length);
|
||||||
|
|
||||||
|
/// Called when a new publish arrives from the MQTT server
|
||||||
|
String downstreamPacketToJson(MeshPacket *mp);
|
||||||
|
|
||||||
/// Return 0 if sleep is okay, veto sleep if we are connected to pubsub server
|
/// Return 0 if sleep is okay, veto sleep if we are connected to pubsub server
|
||||||
// int preflightSleepCb(void *unused = NULL) { return pubSub.connected() ? 1 : 0; }
|
// int preflightSleepCb(void *unused = NULL) { return pubSub.connected() ? 1 : 0; }
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user