mirror of
https://github.com/meshtastic/firmware.git
synced 2025-04-23 17:13:38 +00:00
Another try to get the code format correct.
This commit is contained in:
parent
ca8d2204ba
commit
c4c85777d0
@ -45,15 +45,13 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length)
|
||||
{
|
||||
meshtastic_ServiceEnvelope e = meshtastic_ServiceEnvelope_init_default;
|
||||
|
||||
if (moduleConfig.mqtt.json_enabled && (strncmp(topic, jsonTopic.c_str(), jsonTopic.length()) == 0))
|
||||
{
|
||||
if (moduleConfig.mqtt.json_enabled && (strncmp(topic, jsonTopic.c_str(), jsonTopic.length()) == 0)) {
|
||||
// check if this is a json payload message by comparing the topic start
|
||||
char payloadStr[length + 1];
|
||||
memcpy(payloadStr, payload, length);
|
||||
payloadStr[length] = 0; // null terminated string
|
||||
JSONValue *json_value = JSON::Parse(payloadStr);
|
||||
if (json_value != NULL)
|
||||
{
|
||||
if (json_value != NULL) {
|
||||
// check if it is a valid envelope
|
||||
JSONObject json;
|
||||
json = json_value->AsObject();
|
||||
@ -65,13 +63,10 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length)
|
||||
meshtastic_Channel sendChannel = channels.getByName(ptr);
|
||||
// We allow downlink JSON packets only on a channel named "mqtt"
|
||||
if (strncasecmp(channels.getGlobalId(sendChannel.index), Channels::mqttChannel, strlen(Channels::mqttChannel)) == 0 &&
|
||||
sendChannel.settings.downlink_enabled)
|
||||
{
|
||||
if (isValidJsonEnvelope(json))
|
||||
{
|
||||
sendChannel.settings.downlink_enabled) {
|
||||
if (isValidJsonEnvelope(json)) {
|
||||
// this is a valid envelope
|
||||
if (json["type"]->AsString().compare("sendtext") == 0 && json["payload"]->IsString())
|
||||
{
|
||||
if (json["type"]->AsString().compare("sendtext") == 0 && json["payload"]->IsString()) {
|
||||
std::string jsonPayloadStr = json["payload"]->AsString();
|
||||
LOG_INFO("JSON payload %s, length %u\n", jsonPayloadStr.c_str(), jsonPayloadStr.length());
|
||||
|
||||
@ -85,19 +80,14 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length)
|
||||
p->to = json["to"]->AsNumber();
|
||||
if (json.find("hopLimit") != json.end() && json["hopLimit"]->IsNumber())
|
||||
p->hop_limit = json["hopLimit"]->AsNumber();
|
||||
if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes))
|
||||
{
|
||||
if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) {
|
||||
memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length());
|
||||
p->decoded.payload.size = jsonPayloadStr.length();
|
||||
service->sendToMesh(p, RX_SRC_LOCAL);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG_WARN("Received MQTT json payload too long, dropping\n");
|
||||
}
|
||||
}
|
||||
else if (json["type"]->AsString().compare("sendposition") == 0 && json["payload"]->IsObject())
|
||||
{
|
||||
} else if (json["type"]->AsString().compare("sendposition") == 0 && json["payload"]->IsObject()) {
|
||||
// invent the "sendposition" type for a valid envelope
|
||||
JSONObject posit;
|
||||
posit = json["payload"]->AsObject(); // get nested JSON Position
|
||||
@ -125,51 +115,34 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length)
|
||||
pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes),
|
||||
&meshtastic_Position_msg, &pos); // make the Data protobuf from position
|
||||
service->sendToMesh(p, RX_SRC_LOCAL);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG_DEBUG("JSON Ignoring downlink message with unsupported type.\n");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG_ERROR("JSON Received payload on MQTT but not a valid envelope.\n");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG_WARN("JSON downlink received on channel not called 'mqtt' or without downlink enabled.\n");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
// no json, this is an invalid payload
|
||||
LOG_ERROR("JSON Received payload on MQTT but not a valid JSON\n");
|
||||
}
|
||||
delete json_value;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (length == 0)
|
||||
{
|
||||
} else {
|
||||
if (length == 0) {
|
||||
LOG_WARN("Empty MQTT payload received, topic %s!\n", topic);
|
||||
return;
|
||||
}
|
||||
else if (!pb_decode_from_bytes(payload, length, &meshtastic_ServiceEnvelope_msg, &e))
|
||||
{
|
||||
} else if (!pb_decode_from_bytes(payload, length, &meshtastic_ServiceEnvelope_msg, &e)) {
|
||||
LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (e.channel_id == NULL || e.gateway_id == NULL)
|
||||
{
|
||||
} else {
|
||||
if (e.channel_id == NULL || e.gateway_id == NULL) {
|
||||
LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
||||
return;
|
||||
}
|
||||
meshtastic_Channel ch = channels.getByName(e.channel_id);
|
||||
if (strcmp(e.gateway_id, owner.id) == 0)
|
||||
{
|
||||
if (strcmp(e.gateway_id, owner.id) == 0) {
|
||||
// Generate an implicit ACK towards ourselves (handled and processed only locally!) for this message.
|
||||
// We do this because packets are not rebroadcasted back into MQTT anymore and we assume that at least one node
|
||||
// receives it when we get our own packet back. Then we'll stop our retransmissions.
|
||||
@ -177,34 +150,28 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length)
|
||||
routingModule->sendAckNak(meshtastic_Routing_Error_NONE, getFrom(e.packet), e.packet->id, ch.index);
|
||||
else
|
||||
LOG_INFO("Ignoring downlink message we originally sent.\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
// Find channel by channel_id and check downlink_enabled
|
||||
if ((strcmp(e.channel_id, "PKI") == 0 && e.packet) ||
|
||||
(strcmp(e.channel_id, channels.getGlobalId(ch.index)) == 0 && e.packet && ch.settings.downlink_enabled))
|
||||
{
|
||||
(strcmp(e.channel_id, channels.getGlobalId(ch.index)) == 0 && e.packet && ch.settings.downlink_enabled)) {
|
||||
LOG_INFO("Received MQTT topic %s, len=%u\n", topic, length);
|
||||
meshtastic_MeshPacket *p = packetPool.allocCopy(*e.packet);
|
||||
p->via_mqtt = true; // Mark that the packet was received via MQTT
|
||||
|
||||
if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag)
|
||||
{
|
||||
if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
|
||||
p->channel = ch.index;
|
||||
}
|
||||
|
||||
// PKI messages get accepted even if we can't decrypt
|
||||
if (router && p->which_payload_variant == meshtastic_MeshPacket_encrypted_tag &&
|
||||
strcmp(e.channel_id, "PKI") == 0)
|
||||
{
|
||||
strcmp(e.channel_id, "PKI") == 0) {
|
||||
meshtastic_NodeInfoLite *tx = nodeDB->getMeshNode(getFrom(p));
|
||||
meshtastic_NodeInfoLite *rx = nodeDB->getMeshNode(p->to);
|
||||
// Only accept PKI messages to us, or if we have both the sender and receiver in our nodeDB, as then it's
|
||||
// likely they discovered each other via a channel we have downlink enabled for
|
||||
if (p->to == nodeDB->getNodeNum() || (tx && tx->has_user && rx && rx->has_user))
|
||||
router->enqueueReceivedMessage(p);
|
||||
}
|
||||
else if (router && perhapsDecode(p)) // ignore messages if we don't have the channel key
|
||||
} else if (router && perhapsDecode(p)) // ignore messages if we don't have the channel key
|
||||
router->enqueueReceivedMessage(p);
|
||||
else
|
||||
packetPool.release(p);
|
||||
@ -229,28 +196,23 @@ MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_
|
||||
MQTT::MQTT() : concurrency::OSThread("mqtt"), mqttQueue(MAX_MQTT_QUEUE)
|
||||
#endif
|
||||
{
|
||||
if (moduleConfig.mqtt.enabled)
|
||||
{
|
||||
if (moduleConfig.mqtt.enabled) {
|
||||
LOG_DEBUG("Initializing MQTT\n");
|
||||
|
||||
assert(!mqtt);
|
||||
mqtt = this;
|
||||
|
||||
if (*moduleConfig.mqtt.root)
|
||||
{
|
||||
if (*moduleConfig.mqtt.root) {
|
||||
cryptTopic = moduleConfig.mqtt.root + cryptTopic;
|
||||
jsonTopic = moduleConfig.mqtt.root + jsonTopic;
|
||||
mapTopic = moduleConfig.mqtt.root + mapTopic;
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
cryptTopic = "msh" + cryptTopic;
|
||||
jsonTopic = "msh" + jsonTopic;
|
||||
mapTopic = "msh" + mapTopic;
|
||||
}
|
||||
|
||||
if (moduleConfig.mqtt.map_reporting_enabled && moduleConfig.mqtt.has_map_report_settings)
|
||||
{
|
||||
if (moduleConfig.mqtt.map_reporting_enabled && moduleConfig.mqtt.has_map_report_settings) {
|
||||
map_position_precision = Default::getConfiguredOrDefault(moduleConfig.mqtt.map_report_settings.position_precision,
|
||||
default_map_position_precision);
|
||||
map_publish_interval_msecs = Default::getConfiguredOrDefaultMs(
|
||||
@ -262,8 +224,7 @@ MQTT::MQTT() : concurrency::OSThread("mqtt"), mqttQueue(MAX_MQTT_QUEUE)
|
||||
pubSub.setCallback(mqttCallback);
|
||||
#endif
|
||||
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled)
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled) {
|
||||
LOG_INFO("MQTT configured to use client proxy...\n");
|
||||
enabled = true;
|
||||
runASAP = true;
|
||||
@ -271,9 +232,7 @@ MQTT::MQTT() : concurrency::OSThread("mqtt"), mqttQueue(MAX_MQTT_QUEUE)
|
||||
publishNodeInfo();
|
||||
}
|
||||
// preflightSleepObserver.observe(&preflightSleep);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
disable();
|
||||
}
|
||||
}
|
||||
@ -289,8 +248,7 @@ bool MQTT::isConnectedDirectly()
|
||||
|
||||
bool MQTT::publish(const char *topic, const char *payload, bool retained)
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled)
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled) {
|
||||
meshtastic_MqttClientProxyMessage *msg = mqttClientProxyMessagePool.allocZeroed();
|
||||
msg->which_payload_variant = meshtastic_MqttClientProxyMessage_text_tag;
|
||||
strcpy(msg->topic, topic);
|
||||
@ -300,8 +258,7 @@ bool MQTT::publish(const char *topic, const char *payload, bool retained)
|
||||
return true;
|
||||
}
|
||||
#if HAS_NETWORKING
|
||||
else if (isConnectedDirectly())
|
||||
{
|
||||
else if (isConnectedDirectly()) {
|
||||
return pubSub.publish(topic, payload, retained);
|
||||
}
|
||||
#endif
|
||||
@ -310,8 +267,7 @@ bool MQTT::publish(const char *topic, const char *payload, bool retained)
|
||||
|
||||
bool MQTT::publish(const char *topic, const uint8_t *payload, size_t length, bool retained)
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled)
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled) {
|
||||
meshtastic_MqttClientProxyMessage *msg = mqttClientProxyMessagePool.allocZeroed();
|
||||
msg->which_payload_variant = meshtastic_MqttClientProxyMessage_data_tag;
|
||||
strcpy(msg->topic, topic);
|
||||
@ -322,8 +278,7 @@ bool MQTT::publish(const char *topic, const uint8_t *payload, size_t length, boo
|
||||
return true;
|
||||
}
|
||||
#if HAS_NETWORKING
|
||||
else if (isConnectedDirectly())
|
||||
{
|
||||
else if (isConnectedDirectly()) {
|
||||
return pubSub.publish(topic, payload, length, retained);
|
||||
}
|
||||
#endif
|
||||
@ -332,10 +287,8 @@ bool MQTT::publish(const char *topic, const uint8_t *payload, size_t length, boo
|
||||
|
||||
void MQTT::reconnect()
|
||||
{
|
||||
if (wantsLink())
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled)
|
||||
{
|
||||
if (wantsLink()) {
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled) {
|
||||
LOG_INFO("MQTT connecting via client proxy instead...\n");
|
||||
enabled = true;
|
||||
runASAP = true;
|
||||
@ -351,31 +304,24 @@ void MQTT::reconnect()
|
||||
const char *mqttUsername = default_mqtt_username;
|
||||
const char *mqttPassword = default_mqtt_password;
|
||||
|
||||
if (*moduleConfig.mqtt.address)
|
||||
{
|
||||
if (*moduleConfig.mqtt.address) {
|
||||
serverAddr = moduleConfig.mqtt.address;
|
||||
mqttUsername = moduleConfig.mqtt.username;
|
||||
mqttPassword = moduleConfig.mqtt.password;
|
||||
}
|
||||
#if HAS_WIFI && !defined(ARCH_PORTDUINO)
|
||||
if (moduleConfig.mqtt.tls_enabled)
|
||||
{
|
||||
if (moduleConfig.mqtt.tls_enabled) {
|
||||
// change default for encrypted to 8883
|
||||
try
|
||||
{
|
||||
try {
|
||||
serverPort = 8883;
|
||||
wifiSecureClient.setInsecure();
|
||||
|
||||
pubSub.setClient(wifiSecureClient);
|
||||
LOG_INFO("Using TLS-encrypted session\n");
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
{
|
||||
} catch (const std::exception &e) {
|
||||
LOG_ERROR("MQTT ERROR: %s\n", e.what());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG_INFO("Using non-TLS-encrypted session\n");
|
||||
pubSub.setClient(mqttClient);
|
||||
}
|
||||
@ -385,8 +331,7 @@ void MQTT::reconnect()
|
||||
|
||||
String server = String(serverAddr);
|
||||
int delimIndex = server.indexOf(':');
|
||||
if (delimIndex > 0)
|
||||
{
|
||||
if (delimIndex > 0) {
|
||||
String port = server.substring(delimIndex + 1, server.length());
|
||||
server[delimIndex] = 0;
|
||||
serverPort = port.toInt();
|
||||
@ -399,8 +344,7 @@ void MQTT::reconnect()
|
||||
serverPort, mqttUsername, mqttPassword);
|
||||
|
||||
bool connected = pubSub.connect(owner.id, mqttUsername, mqttPassword);
|
||||
if (connected)
|
||||
{
|
||||
if (connected) {
|
||||
LOG_INFO("MQTT connected\n");
|
||||
enabled = true; // Start running background process again
|
||||
runASAP = true;
|
||||
@ -408,14 +352,11 @@ void MQTT::reconnect()
|
||||
|
||||
publishNodeInfo();
|
||||
sendSubscriptions();
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
#if HAS_WIFI && !defined(ARCH_PORTDUINO)
|
||||
reconnectCount++;
|
||||
LOG_ERROR("Failed to contact MQTT server directly (%d/%d)...\n", reconnectCount, reconnectMax);
|
||||
if (reconnectCount >= reconnectMax)
|
||||
{
|
||||
if (reconnectCount >= reconnectMax) {
|
||||
needReconnect = true;
|
||||
wifiReconnect->setIntervalFromNow(0);
|
||||
reconnectCount = 0;
|
||||
@ -431,18 +372,15 @@ void MQTT::sendSubscriptions()
|
||||
#if HAS_NETWORKING
|
||||
bool hasDownlink = false;
|
||||
size_t numChan = channels.getNumChannels();
|
||||
for (size_t i = 0; i < numChan; i++)
|
||||
{
|
||||
for (size_t i = 0; i < numChan; i++) {
|
||||
const auto &ch = channels.getByIndex(i);
|
||||
if (ch.settings.downlink_enabled)
|
||||
{
|
||||
if (ch.settings.downlink_enabled) {
|
||||
hasDownlink = true;
|
||||
std::string topic = cryptTopic + channels.getGlobalId(i) + "/+";
|
||||
LOG_INFO("Subscribing to %s\n", topic.c_str());
|
||||
pubSub.subscribe(topic.c_str(), 1); // FIXME, is QOS 1 right?
|
||||
#if !defined(ARCH_NRF52) || defined(NRF52_USE_JSON) // JSON is not supported on nRF52, see issue #2804 ### Fixed by using ArduinoJSON ###
|
||||
if (moduleConfig.mqtt.json_enabled == true)
|
||||
{
|
||||
if (moduleConfig.mqtt.json_enabled == true) {
|
||||
std::string topicDecoded = jsonTopic + channels.getGlobalId(i) + "/+";
|
||||
LOG_INFO("Subscribing to %s\n", topicDecoded.c_str());
|
||||
pubSub.subscribe(topicDecoded.c_str(), 1); // FIXME, is QOS 1 right?
|
||||
@ -451,8 +389,7 @@ void MQTT::sendSubscriptions()
|
||||
}
|
||||
}
|
||||
#if !MESHTASTIC_EXCLUDE_PKI
|
||||
if (hasDownlink)
|
||||
{
|
||||
if (hasDownlink) {
|
||||
std::string topic = cryptTopic + "PKI/+";
|
||||
LOG_INFO("Subscribing to %s\n", topic.c_str());
|
||||
pubSub.subscribe(topic.c_str(), 1);
|
||||
@ -489,34 +426,27 @@ int32_t MQTT::runOnce()
|
||||
perhapsReportToMap();
|
||||
|
||||
// If connected poll rapidly, otherwise only occasionally check for a wifi connection change and ability to contact server
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled)
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled) {
|
||||
publishQueuedMessages();
|
||||
return 200;
|
||||
}
|
||||
else if (!pubSub.loop())
|
||||
{
|
||||
|
||||
else if (!pubSub.loop()) {
|
||||
if (!wantConnection)
|
||||
return 5000; // If we don't want connection now, check again in 5 secs
|
||||
else
|
||||
{
|
||||
else {
|
||||
reconnect();
|
||||
// If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP
|
||||
// connections are EXPENSIVE so try rarely)
|
||||
if (isConnectedDirectly())
|
||||
{
|
||||
if (isConnectedDirectly()) {
|
||||
publishQueuedMessages();
|
||||
return 200;
|
||||
}
|
||||
else
|
||||
} else
|
||||
return 30000;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
// we are connected to server, check often for new requests on the TCP port
|
||||
if (!wantConnection)
|
||||
{
|
||||
if (!wantConnection) {
|
||||
LOG_INFO("MQTT link not needed, dropping\n");
|
||||
pubSub.disconnect();
|
||||
}
|
||||
@ -534,20 +464,16 @@ void MQTT::publishNodeInfo()
|
||||
}
|
||||
void MQTT::publishQueuedMessages()
|
||||
{
|
||||
if (!mqttQueue.isEmpty())
|
||||
{
|
||||
if (!mqttQueue.isEmpty()) {
|
||||
LOG_DEBUG("Publishing enqueued MQTT message\n");
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
|
||||
static uint8_t bytes[meshtastic_MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
|
||||
std::string topic;
|
||||
if (env->packet->pki_encrypted)
|
||||
{
|
||||
if (env->packet->pki_encrypted) {
|
||||
topic = cryptTopic + "PKI/" + owner.id;
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
topic = cryptTopic + env->channel_id + "/" + owner.id;
|
||||
}
|
||||
LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);
|
||||
@ -555,19 +481,14 @@ void MQTT::publishQueuedMessages()
|
||||
publish(topic.c_str(), bytes, numBytes, false);
|
||||
|
||||
#if !defined(ARCH_NRF52) || defined(NRF52_USE_JSON) // JSON is not supported on nRF52, see issue #2804 ### Fixed by using ArduinoJson ###
|
||||
if (moduleConfig.mqtt.json_enabled)
|
||||
{
|
||||
if (moduleConfig.mqtt.json_enabled) {
|
||||
// handle json topic
|
||||
auto jsonString = MeshPacketSerializer::JsonSerialize(env->packet);
|
||||
if (jsonString.length() != 0)
|
||||
{
|
||||
if (jsonString.length() != 0) {
|
||||
std::string topicJson;
|
||||
if (env->packet->pki_encrypted)
|
||||
{
|
||||
if (env->packet->pki_encrypted) {
|
||||
topicJson = jsonTopic + "PKI/" + owner.id;
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
topicJson = jsonTopic + env->channel_id + "/" + owner.id;
|
||||
}
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(),
|
||||
@ -585,8 +506,7 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket &
|
||||
if (mp.via_mqtt)
|
||||
return; // Don't send messages that came from MQTT back into MQTT
|
||||
bool uplinkEnabled = false;
|
||||
for (int i = 0; i <= 7; i++)
|
||||
{
|
||||
for (int i = 0; i <= 7; i++) {
|
||||
if (channels.getByIndex(i).settings.uplink_enabled)
|
||||
uplinkEnabled = true;
|
||||
}
|
||||
@ -594,10 +514,8 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket &
|
||||
return; // no channels have an uplink enabled
|
||||
auto &ch = channels.getByIndex(chIndex);
|
||||
|
||||
if (!mp.pki_encrypted)
|
||||
{
|
||||
if (mp_decoded.which_payload_variant != meshtastic_MeshPacket_decoded_tag)
|
||||
{
|
||||
if (!mp.pki_encrypted) {
|
||||
if (mp_decoded.which_payload_variant != meshtastic_MeshPacket_decoded_tag) {
|
||||
LOG_CRIT("MQTT::onSend(): mp_decoded isn't actually decoded\n");
|
||||
return;
|
||||
}
|
||||
@ -606,22 +524,19 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket &
|
||||
if (mp_decoded.from != nodeDB->getNodeNum() && mp_decoded.decoded.has_bitfield &&
|
||||
!(mp_decoded.decoded.bitfield & BITFIELD_OK_TO_MQTT_MASK) &&
|
||||
(ch.settings.psk.size < 2 || (ch.settings.psk.size == 16 && memcmp(ch.settings.psk.bytes, defaultpsk, 16)) ||
|
||||
(ch.settings.psk.size == 32 && memcmp(ch.settings.psk.bytes, eventpsk, 32))))
|
||||
{
|
||||
(ch.settings.psk.size == 32 && memcmp(ch.settings.psk.bytes, eventpsk, 32)))) {
|
||||
LOG_INFO("MQTT onSend - Not forwarding packet due to DontMqttMeBro flag\n");
|
||||
return;
|
||||
}
|
||||
|
||||
if (strcmp(moduleConfig.mqtt.address, default_mqtt_address) == 0 &&
|
||||
(mp_decoded.decoded.portnum == meshtastic_PortNum_RANGE_TEST_APP ||
|
||||
mp_decoded.decoded.portnum == meshtastic_PortNum_DETECTION_SENSOR_APP))
|
||||
{
|
||||
mp_decoded.decoded.portnum == meshtastic_PortNum_DETECTION_SENSOR_APP)) {
|
||||
LOG_DEBUG("MQTT onSend - Ignoring range test or detection sensor message on public mqtt\n");
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (mp.pki_encrypted || ch.settings.uplink_enabled)
|
||||
{
|
||||
if (mp.pki_encrypted || ch.settings.uplink_enabled) {
|
||||
const char *channelId = mp.pki_encrypted ? "PKI" : channels.getGlobalId(chIndex);
|
||||
|
||||
meshtastic_ServiceEnvelope *env = mqttPool.allocZeroed();
|
||||
@ -629,20 +544,16 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket &
|
||||
env->gateway_id = owner.id;
|
||||
|
||||
LOG_DEBUG("MQTT onSend - Publishing ");
|
||||
if (moduleConfig.mqtt.encryption_enabled)
|
||||
{
|
||||
if (moduleConfig.mqtt.encryption_enabled) {
|
||||
env->packet = (meshtastic_MeshPacket *)∓
|
||||
LOG_DEBUG("encrypted message\n");
|
||||
}
|
||||
else if (mp_decoded.which_payload_variant ==
|
||||
meshtastic_MeshPacket_decoded_tag)
|
||||
{ // Don't upload a still-encrypted PKI packet
|
||||
} else if (mp_decoded.which_payload_variant ==
|
||||
meshtastic_MeshPacket_decoded_tag) { // Don't upload a still-encrypted PKI packet
|
||||
env->packet = (meshtastic_MeshPacket *)&mp_decoded;
|
||||
LOG_DEBUG("portnum %i message\n", env->packet->decoded.portnum);
|
||||
}
|
||||
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly())
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) {
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
static uint8_t bytes[meshtastic_MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
|
||||
@ -652,12 +563,10 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket &
|
||||
publish(topic.c_str(), bytes, numBytes, false);
|
||||
|
||||
#if !defined(ARCH_NRF52) || defined(NRF52_USE_JSON) // JSON is not supported on nRF52, see issue #2804 ### Fixed by using ArduinoJson ###
|
||||
if (moduleConfig.mqtt.json_enabled)
|
||||
{
|
||||
if (moduleConfig.mqtt.json_enabled) {
|
||||
// handle json topic
|
||||
auto jsonString = MeshPacketSerializer::JsonSerialize((meshtastic_MeshPacket *)&mp_decoded);
|
||||
if (jsonString.length() != 0)
|
||||
{
|
||||
if (jsonString.length() != 0) {
|
||||
std::string topicJson = jsonTopic + channelId + "/" + owner.id;
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(),
|
||||
jsonString.c_str());
|
||||
@ -665,12 +574,9 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket &
|
||||
}
|
||||
}
|
||||
#endif // ARCH_NRF52 NRF52_USE_JSON
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG_INFO("MQTT not connected, queueing packet\n");
|
||||
if (mqttQueue.numFree() == 0)
|
||||
{
|
||||
if (mqttQueue.numFree() == 0) {
|
||||
LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n");
|
||||
meshtastic_ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
|
||||
if (d)
|
||||
@ -689,14 +595,10 @@ void MQTT::perhapsReportToMap()
|
||||
if (!moduleConfig.mqtt.map_reporting_enabled || !(moduleConfig.mqtt.proxy_to_client_enabled || isConnectedDirectly()))
|
||||
return;
|
||||
|
||||
if (millis() - last_report_to_map < map_publish_interval_msecs)
|
||||
{
|
||||
if (millis() - last_report_to_map < map_publish_interval_msecs) {
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (map_position_precision == 0 || (localPosition.latitude_i == 0 && localPosition.longitude_i == 0))
|
||||
{
|
||||
} else {
|
||||
if (map_position_precision == 0 || (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)) {
|
||||
last_report_to_map = millis();
|
||||
if (map_position_precision == 0)
|
||||
LOG_WARN("MQTT Map reporting is enabled, but precision is 0\n");
|
||||
@ -729,15 +631,12 @@ void MQTT::perhapsReportToMap()
|
||||
mapReport.has_default_channel = channels.hasDefaultChannel();
|
||||
|
||||
// Set position with precision (same as in PositionModule)
|
||||
if (map_position_precision < 32 && map_position_precision > 0)
|
||||
{
|
||||
if (map_position_precision < 32 && map_position_precision > 0) {
|
||||
mapReport.latitude_i = localPosition.latitude_i & (UINT32_MAX << (32 - map_position_precision));
|
||||
mapReport.longitude_i = localPosition.longitude_i & (UINT32_MAX << (32 - map_position_precision));
|
||||
mapReport.latitude_i += (1 << (31 - map_position_precision));
|
||||
mapReport.longitude_i += (1 << (31 - map_position_precision));
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
mapReport.latitude_i = localPosition.latitude_i;
|
||||
mapReport.longitude_i = localPosition.longitude_i;
|
||||
}
|
||||
|
@ -83,15 +83,11 @@ void onCccd(uint16_t conn_hdl, BLECharacteristic *chr, uint16_t cccd_value)
|
||||
// According to the GATT spec: cccd value = 0x0001 means notifications are enabled
|
||||
// and cccd value = 0x0002 means indications are enabled
|
||||
|
||||
if (chr->uuid == fromNum.uuid || chr->uuid == logRadio.uuid)
|
||||
{
|
||||
if (chr->uuid == fromNum.uuid || chr->uuid == logRadio.uuid) {
|
||||
auto result = cccd_value == 2 ? chr->indicateEnabled(conn_hdl) : chr->notifyEnabled(conn_hdl);
|
||||
if (result)
|
||||
{
|
||||
if (result) {
|
||||
LOG_INFO("Notify/Indicate enabled\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
LOG_INFO("Notify/Indicate disabled\n");
|
||||
}
|
||||
}
|
||||
@ -133,16 +129,13 @@ static void authorizeRead(uint16_t conn_hdl)
|
||||
*/
|
||||
void onFromRadioAuthorize(uint16_t conn_hdl, BLECharacteristic *chr, ble_gatts_evt_read_t *request)
|
||||
{
|
||||
if (request->offset == 0)
|
||||
{
|
||||
if (request->offset == 0) {
|
||||
// If the read is long, we will get multiple authorize invocations - we only populate data on the first
|
||||
size_t numBytes = bluetoothPhoneAPI->getFromRadio(fromRadioBytes);
|
||||
// Someone is going to read our value as soon as this callback returns. So fill it with the next message in the queue
|
||||
// or make empty if the queue is empty
|
||||
fromRadio.write(fromRadioBytes, numBytes);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
// LOG_INFO("Ignoring successor read\n");
|
||||
}
|
||||
authorizeRead(conn_hdl);
|
||||
@ -206,10 +199,8 @@ void NRF52Bluetooth::shutdown()
|
||||
// Shutdown bluetooth for minimum power draw
|
||||
LOG_INFO("Disable NRF52 bluetooth\n");
|
||||
uint8_t connection_num = Bluefruit.connected();
|
||||
if (connection_num)
|
||||
{
|
||||
for (uint8_t i = 0; i < connection_num; i++)
|
||||
{
|
||||
if (connection_num) {
|
||||
for (uint8_t i = 0; i < connection_num; i++) {
|
||||
LOG_INFO("NRF52 bluetooth disconnecting handle %d\n", i);
|
||||
Bluefruit.disconnect(i);
|
||||
}
|
||||
@ -245,8 +236,7 @@ void NRF52Bluetooth::setup()
|
||||
Bluefruit.Advertising.stop();
|
||||
Bluefruit.Advertising.clearData();
|
||||
Bluefruit.ScanResponse.clearData();
|
||||
if (config.bluetooth.mode != meshtastic_Config_BluetoothConfig_PairingMode_NO_PIN)
|
||||
{
|
||||
if (config.bluetooth.mode != meshtastic_Config_BluetoothConfig_PairingMode_NO_PIN) {
|
||||
configuredPasskey = config.bluetooth.mode == meshtastic_Config_BluetoothConfig_PairingMode_FIXED_PIN
|
||||
? config.bluetooth.fixed_pin
|
||||
: random(100000, 999999);
|
||||
@ -258,9 +248,7 @@ void NRF52Bluetooth::setup()
|
||||
Bluefruit.Security.setPairCompleteCallback(NRF52Bluetooth::onPairingCompleted);
|
||||
Bluefruit.Security.setSecuredCallback(NRF52Bluetooth::onConnectionSecured);
|
||||
meshBleService.setPermission(SECMODE_ENC_WITH_MITM, SECMODE_ENC_WITH_MITM);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
Bluefruit.Security.setIOCaps(false, false, false);
|
||||
meshBleService.setPermission(SECMODE_OPEN, SECMODE_OPEN);
|
||||
}
|
||||
@ -323,8 +311,7 @@ bool NRF52Bluetooth::onPairingPasskey(uint16_t conn_handle, uint8_t const passke
|
||||
LOG_INFO("BLE pairing process started with passkey %.3s %.3s\n", passkey, passkey + 3);
|
||||
powerFSM.trigger(EVENT_BLUETOOTH_PAIR);
|
||||
#if !defined(MESHTASTIC_EXCLUDE_SCREEN)
|
||||
screen->startAlert([](OLEDDisplay *display, OLEDDisplayUiState *state, int16_t x, int16_t y) -> void
|
||||
{
|
||||
screen->startAlert([](OLEDDisplay *display, OLEDDisplayUiState *state, int16_t x, int16_t y) -> void {
|
||||
char btPIN[16] = "888888";
|
||||
snprintf(btPIN, sizeof(btPIN), "%06u", configuredPasskey);
|
||||
int x_offset = display->width() / 2;
|
||||
@ -347,13 +334,12 @@ bool NRF52Bluetooth::onPairingPasskey(uint16_t conn_handle, uint8_t const passke
|
||||
String deviceName = "Name: ";
|
||||
deviceName.concat(getDeviceName());
|
||||
y_offset = display->height() == 64 ? y_offset + FONT_HEIGHT_LARGE - 6 : y_offset + FONT_HEIGHT_LARGE + 5;
|
||||
display->drawString(x_offset + x, y_offset + y, deviceName); });
|
||||
display->drawString(x_offset + x, y_offset + y, deviceName);
|
||||
});
|
||||
#endif
|
||||
if (match_request)
|
||||
{
|
||||
if (match_request) {
|
||||
uint32_t start_time = millis();
|
||||
while (millis() < start_time + 30000)
|
||||
{
|
||||
while (millis() < start_time + 30000) {
|
||||
if (!Bluefruit.connected(conn_handle))
|
||||
break;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user