mirror of
https://github.com/meshtastic/firmware.git
synced 2025-06-16 18:12:07 +00:00
mqtt: begin subscription support
This commit is contained in:
parent
55b8314a2a
commit
50a69d77e6
@ -9,13 +9,29 @@
|
|||||||
MQTT *mqtt;
|
MQTT *mqtt;
|
||||||
|
|
||||||
String statusTopic = "mesh/stat/";
|
String statusTopic = "mesh/stat/";
|
||||||
|
String cryptTopic = "mesh/crypt/"; // mesh/crypt/CHANNELID/NODEID
|
||||||
|
|
||||||
void mqttCallback(char *topic, byte *payload, unsigned int length)
|
void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
|
||||||
{
|
{
|
||||||
DEBUG_MSG("MQTT topic %s\n", topic);
|
mqtt->onPublish(topic, payload, length);
|
||||||
|
}
|
||||||
|
|
||||||
// After parsing ServiceEnvelope
|
void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||||
// FIXME - make sure to free both strings and the MeshPacket
|
{
|
||||||
|
// parsing ServiceEnvelope
|
||||||
|
ServiceEnvelope e = ServiceEnvelope_init_default;
|
||||||
|
if (!pb_decode_from_bytes(payload, length, ServiceEnvelope_fields, &e)) {
|
||||||
|
DEBUG_MSG("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
||||||
|
} else {
|
||||||
|
DEBUG_MSG("Received MQTT topic %s, len=%u\n", topic, length);
|
||||||
|
|
||||||
|
// FIXME, ignore messages sent by us (requires decryption) or if we don't have the channel key
|
||||||
|
|
||||||
|
// make sure to free both strings and the MeshPacket (passing in NULL is acceptable)
|
||||||
|
free(e.channel_id);
|
||||||
|
free(e.gateway_id);
|
||||||
|
free(e.packet);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mqttInit()
|
void mqttInit()
|
||||||
@ -53,15 +69,30 @@ void MQTT::reconnect()
|
|||||||
static char subsStr[64]; /* We keep this static because the mqtt lib
|
static char subsStr[64]; /* We keep this static because the mqtt lib
|
||||||
might not be copying it */
|
might not be copying it */
|
||||||
// snprintf(subsStr, sizeof(subsStr), "/ezd/todev/%s/#", clientId);
|
// snprintf(subsStr, sizeof(subsStr), "/ezd/todev/%s/#", clientId);
|
||||||
// mqtt.subscribe(subsStr, 1); // we use qos 1 because we don't want to miss messages
|
// pubSub.subscribe(subsStr, 1); // we use qos 1 because we don't want to miss messages
|
||||||
|
|
||||||
/// FIXME, include more information in the status text
|
/// FIXME, include more information in the status text
|
||||||
bool ok = pubSub.publish(myStatus.c_str(), "online", true);
|
bool ok = pubSub.publish(myStatus.c_str(), "online", true);
|
||||||
DEBUG_MSG("published %d\n", ok);
|
DEBUG_MSG("published %d\n", ok);
|
||||||
|
|
||||||
|
sendSubscriptions();
|
||||||
} else
|
} else
|
||||||
DEBUG_MSG("Failed to contact MQTT server...\n");
|
DEBUG_MSG("Failed to contact MQTT server...\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MQTT::sendSubscriptions()
|
||||||
|
{
|
||||||
|
size_t numChan = channels.getNumChannels();
|
||||||
|
for (size_t i = 0; i < numChan; i++) {
|
||||||
|
auto &ch = channels.getByIndex(i);
|
||||||
|
if (ch.settings.uplink_enabled) {
|
||||||
|
String topic = cryptTopic + channels.getGlobalId(i) + "/#";
|
||||||
|
DEBUG_MSG("Subscribing to %s\n", topic.c_str());
|
||||||
|
pubSub.subscribe(topic.c_str(), 1); // FIXME, is QOS 1 right?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool MQTT::wantsLink() const
|
bool MQTT::wantsLink() const
|
||||||
{
|
{
|
||||||
bool hasChannel = false;
|
bool hasChannel = false;
|
||||||
@ -124,18 +155,9 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
|||||||
static uint8_t bytes[MeshPacket_size + 64];
|
static uint8_t bytes[MeshPacket_size + 64];
|
||||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, &env);
|
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, &env);
|
||||||
|
|
||||||
const char *topic = getCryptTopic(channelId);
|
String topic = cryptTopic + channelId + "/" + owner.id;
|
||||||
DEBUG_MSG("publish %s, %u bytes\n", topic, numBytes);
|
DEBUG_MSG("publish %s, %u bytes\n", topic.c_str(), numBytes);
|
||||||
|
|
||||||
pubSub.publish(topic, bytes, numBytes, false);
|
pubSub.publish(topic.c_str(), bytes, numBytes, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *MQTT::getCryptTopic(const char *channelId)
|
|
||||||
{
|
|
||||||
static char buf[128];
|
|
||||||
|
|
||||||
// "mesh/crypt/CHANNELID/NODEID/PORTID"
|
|
||||||
snprintf(buf, sizeof(buf), "mesh/crypt/%s/%s", channelId, owner.id);
|
|
||||||
return buf;
|
|
||||||
}
|
|
@ -36,8 +36,6 @@ class MQTT : private concurrency::OSThread
|
|||||||
virtual int32_t runOnce();
|
virtual int32_t runOnce();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const char *getCryptTopic(const char *channelId);
|
|
||||||
|
|
||||||
/** return true if we have a channel that wants uplink/downlink
|
/** return true if we have a channel that wants uplink/downlink
|
||||||
*/
|
*/
|
||||||
bool wantsLink() const;
|
bool wantsLink() const;
|
||||||
@ -45,6 +43,16 @@ class MQTT : private concurrency::OSThread
|
|||||||
/** Attempt to connect to server if necessary
|
/** Attempt to connect to server if necessary
|
||||||
*/
|
*/
|
||||||
void reconnect();
|
void reconnect();
|
||||||
|
|
||||||
|
/** Tell the server what subscriptions we want (based on channels.downlink_enabled)
|
||||||
|
*/
|
||||||
|
void sendSubscriptions();
|
||||||
|
|
||||||
|
/// Just C glue to call onPublish
|
||||||
|
static void mqttCallback(char *topic, byte *payload, unsigned int length);
|
||||||
|
|
||||||
|
/// Called when a new publish arrives from the MQTT server
|
||||||
|
void onPublish(char *topic, byte *payload, unsigned int length);
|
||||||
};
|
};
|
||||||
|
|
||||||
void mqttInit();
|
void mqttInit();
|
||||||
|
Loading…
Reference in New Issue
Block a user