2021-04-03 06:54:10 +00:00
|
|
|
#include "MQTT.h"
|
|
|
|
#include "NodeDB.h"
|
2021-04-03 14:27:06 +00:00
|
|
|
#include "main.h"
|
2021-04-04 01:20:37 +00:00
|
|
|
#include "mesh/Channels.h"
|
2021-04-05 01:24:00 +00:00
|
|
|
#include "mesh/Router.h"
|
2021-04-03 08:06:40 +00:00
|
|
|
#include "mesh/generated/mqtt.pb.h"
|
2021-08-03 00:42:44 +00:00
|
|
|
#include "sleep.h"
|
2021-04-03 06:54:10 +00:00
|
|
|
#include <WiFi.h>
|
|
|
|
#include <assert.h>
|
|
|
|
|
|
|
|
MQTT *mqtt;
|
|
|
|
|
2021-04-10 02:55:56 +00:00
|
|
|
String statusTopic = "msh/1/stat/";
|
|
|
|
String cryptTopic = "msh/1/c/"; // msh/1/c/CHANNELID/NODEID
|
2021-04-03 06:54:10 +00:00
|
|
|
|
2021-04-05 00:42:52 +00:00
|
|
|
void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
|
2021-04-03 06:54:10 +00:00
|
|
|
{
|
2021-04-05 00:42:52 +00:00
|
|
|
mqtt->onPublish(topic, payload, length);
|
|
|
|
}
|
|
|
|
|
|
|
|
void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
|
|
|
{
|
|
|
|
// parsing ServiceEnvelope
|
|
|
|
ServiceEnvelope e = ServiceEnvelope_init_default;
|
|
|
|
if (!pb_decode_from_bytes(payload, length, ServiceEnvelope_fields, &e)) {
|
|
|
|
DEBUG_MSG("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
|
|
|
} else {
|
2021-04-11 01:23:34 +00:00
|
|
|
if (strcmp(e.gateway_id, owner.id) == 0)
|
|
|
|
DEBUG_MSG("Ignoring downlink message we originally sent.\n");
|
|
|
|
else {
|
|
|
|
if (e.packet) {
|
|
|
|
DEBUG_MSG("Received MQTT topic %s, len=%u\n", topic, length);
|
|
|
|
MeshPacket *p = packetPool.allocCopy(*e.packet);
|
2021-04-03 06:54:10 +00:00
|
|
|
|
2021-04-11 01:23:34 +00:00
|
|
|
// ignore messages sent by us or if we don't have the channel key
|
|
|
|
if (router && p->from != nodeDB.getNodeNum() && perhapsDecode(p))
|
|
|
|
router->enqueueReceivedMessage(p);
|
|
|
|
else
|
|
|
|
packetPool.release(p);
|
|
|
|
}
|
2021-04-05 01:24:00 +00:00
|
|
|
}
|
2021-04-05 00:42:52 +00:00
|
|
|
|
|
|
|
// make sure to free both strings and the MeshPacket (passing in NULL is acceptable)
|
|
|
|
free(e.channel_id);
|
|
|
|
free(e.gateway_id);
|
|
|
|
free(e.packet);
|
|
|
|
}
|
2021-04-03 06:54:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void mqttInit()
|
|
|
|
{
|
2021-04-04 23:48:46 +00:00
|
|
|
new MQTT();
|
2021-04-03 06:54:10 +00:00
|
|
|
}
|
|
|
|
|
2021-04-03 14:27:06 +00:00
|
|
|
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient)
|
2021-04-03 06:54:10 +00:00
|
|
|
{
|
|
|
|
assert(!mqtt);
|
|
|
|
mqtt = this;
|
|
|
|
|
2021-04-04 23:48:46 +00:00
|
|
|
pubSub.setCallback(mqttCallback);
|
2021-08-03 00:42:44 +00:00
|
|
|
|
|
|
|
preflightSleepObserver.observe(&preflightSleep);
|
2021-04-04 23:48:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void MQTT::reconnect()
|
|
|
|
{
|
2021-08-02 17:41:31 +00:00
|
|
|
const char *serverAddr = "mqtt.meshtastic.org"; // default hostname
|
|
|
|
int serverPort = 1883; // default server port
|
2021-04-03 06:54:10 +00:00
|
|
|
|
|
|
|
if (*radioConfig.preferences.mqtt_server)
|
|
|
|
serverAddr = radioConfig.preferences.mqtt_server; // Override the default
|
|
|
|
|
2021-07-01 21:57:50 +00:00
|
|
|
String server = String(serverAddr);
|
|
|
|
int delimIndex = server.indexOf(':');
|
2021-07-02 12:47:12 +00:00
|
|
|
if (delimIndex > 0) {
|
|
|
|
String port = server.substring(delimIndex+1, server.length());
|
2021-08-02 17:41:31 +00:00
|
|
|
server[delimIndex] = 0;
|
|
|
|
serverPort = port.toInt();
|
2021-08-02 17:50:55 +00:00
|
|
|
serverAddr = server.c_str();
|
2021-07-02 12:47:12 +00:00
|
|
|
}
|
2021-08-02 17:50:55 +00:00
|
|
|
pubSub.setServer(serverAddr, serverPort);
|
2021-04-03 06:54:10 +00:00
|
|
|
|
2021-08-03 00:42:44 +00:00
|
|
|
DEBUG_MSG("Connecting to MQTT server %s, port: %d\n", serverAddr, serverPort);
|
2021-04-03 07:04:03 +00:00
|
|
|
auto myStatus = (statusTopic + owner.id);
|
2021-04-10 02:55:56 +00:00
|
|
|
bool connected = pubSub.connect(owner.id, "meshdev", "large4cats", myStatus.c_str(), 1, true, "offline");
|
2021-04-03 06:54:10 +00:00
|
|
|
if (connected) {
|
|
|
|
DEBUG_MSG("MQTT connected\n");
|
2021-04-03 14:27:06 +00:00
|
|
|
enabled = true; // Start running background process again
|
|
|
|
runASAP = true;
|
2021-04-03 06:54:10 +00:00
|
|
|
|
|
|
|
/// FIXME, include more information in the status text
|
|
|
|
bool ok = pubSub.publish(myStatus.c_str(), "online", true);
|
|
|
|
DEBUG_MSG("published %d\n", ok);
|
2021-04-05 00:42:52 +00:00
|
|
|
|
|
|
|
sendSubscriptions();
|
2021-04-04 23:48:46 +00:00
|
|
|
} else
|
|
|
|
DEBUG_MSG("Failed to contact MQTT server...\n");
|
|
|
|
}
|
|
|
|
|
2021-04-05 00:42:52 +00:00
|
|
|
void MQTT::sendSubscriptions()
|
|
|
|
{
|
|
|
|
size_t numChan = channels.getNumChannels();
|
|
|
|
for (size_t i = 0; i < numChan; i++) {
|
|
|
|
auto &ch = channels.getByIndex(i);
|
2021-04-10 03:57:20 +00:00
|
|
|
if (ch.settings.downlink_enabled) {
|
2021-04-05 00:42:52 +00:00
|
|
|
String topic = cryptTopic + channels.getGlobalId(i) + "/#";
|
|
|
|
DEBUG_MSG("Subscribing to %s\n", topic.c_str());
|
|
|
|
pubSub.subscribe(topic.c_str(), 1); // FIXME, is QOS 1 right?
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-04 23:48:46 +00:00
|
|
|
bool MQTT::wantsLink() const
|
|
|
|
{
|
|
|
|
bool hasChannel = false;
|
|
|
|
|
|
|
|
if (radioConfig.preferences.mqtt_disabled) {
|
|
|
|
// DEBUG_MSG("MQTT disabled...\n");
|
|
|
|
} else {
|
|
|
|
// No need for link if no channel needed it
|
|
|
|
size_t numChan = channels.getNumChannels();
|
|
|
|
for (size_t i = 0; i < numChan; i++) {
|
|
|
|
auto &ch = channels.getByIndex(i);
|
|
|
|
if (ch.settings.uplink_enabled || ch.settings.downlink_enabled) {
|
|
|
|
hasChannel = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2021-04-03 06:54:10 +00:00
|
|
|
}
|
2021-04-04 23:48:46 +00:00
|
|
|
|
|
|
|
return hasChannel && WiFi.isConnected();
|
2021-04-03 06:54:10 +00:00
|
|
|
}
|
|
|
|
|
2021-04-03 14:27:06 +00:00
|
|
|
int32_t MQTT::runOnce()
|
|
|
|
{
|
2021-04-04 23:48:46 +00:00
|
|
|
bool wantConnection = wantsLink();
|
2021-04-03 14:27:06 +00:00
|
|
|
|
2021-04-04 23:48:46 +00:00
|
|
|
// If connected poll rapidly, otherwise only occasionally check for a wifi connection change and ability to contact server
|
|
|
|
if (!pubSub.loop()) {
|
|
|
|
if (wantConnection) {
|
|
|
|
reconnect();
|
|
|
|
|
|
|
|
// If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely)
|
|
|
|
return pubSub.connected() ? 20 : 30000;
|
|
|
|
} else
|
|
|
|
return 5000; // If we don't want connection now, check again in 5 secs
|
|
|
|
} else {
|
|
|
|
// we are connected to server, check often for new requests on the TCP port
|
|
|
|
if (!wantConnection) {
|
|
|
|
DEBUG_MSG("MQTT link not needed, dropping\n");
|
|
|
|
pubSub.disconnect();
|
|
|
|
}
|
|
|
|
|
|
|
|
return 20;
|
|
|
|
}
|
2021-04-03 14:27:06 +00:00
|
|
|
}
|
|
|
|
|
2021-04-04 01:20:37 +00:00
|
|
|
void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
2021-04-03 06:54:10 +00:00
|
|
|
{
|
2021-04-04 23:57:06 +00:00
|
|
|
auto &ch = channels.getByIndex(chIndex);
|
2021-04-03 08:06:40 +00:00
|
|
|
|
2021-04-04 23:57:06 +00:00
|
|
|
// don't bother sending if not connected...
|
|
|
|
if (pubSub.connected() && ch.settings.uplink_enabled) {
|
|
|
|
const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel
|
2021-04-03 08:06:40 +00:00
|
|
|
|
|
|
|
ServiceEnvelope env = ServiceEnvelope_init_default;
|
|
|
|
env.channel_id = (char *)channelId;
|
|
|
|
env.gateway_id = owner.id;
|
|
|
|
env.packet = (MeshPacket *)∓
|
2021-04-03 06:54:10 +00:00
|
|
|
|
2021-04-03 08:06:40 +00:00
|
|
|
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
|
|
|
static uint8_t bytes[MeshPacket_size + 64];
|
|
|
|
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, &env);
|
|
|
|
|
2021-04-05 00:42:52 +00:00
|
|
|
String topic = cryptTopic + channelId + "/" + owner.id;
|
|
|
|
DEBUG_MSG("publish %s, %u bytes\n", topic.c_str(), numBytes);
|
2021-04-03 08:06:40 +00:00
|
|
|
|
2021-04-05 00:42:52 +00:00
|
|
|
pubSub.publish(topic.c_str(), bytes, numBytes, false);
|
2021-04-03 08:06:40 +00:00
|
|
|
}
|
2021-04-03 06:54:10 +00:00
|
|
|
}
|