diff --git a/src/concurrency/PacketQueue.h b/src/concurrency/PacketQueue.h new file mode 100644 index 000000000..312dac5e5 --- /dev/null +++ b/src/concurrency/PacketQueue.h @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include + +#ifdef BLOCKING_PACKET_QUEUE +#include +#endif + +/** + * Generic platform independent and re-entrant queue wrapper that can be used to + * safely pass (generic) movable objects between threads. + */ +template class PacketQueue +{ + public: + PacketQueue() {} + + PacketQueue(PacketQueue const &other) = delete; + + /** + * Push movable object into queue + */ + void push(T &&packet) + { + std::lock_guard lock(mutex); + queue.push(packet.move()); +#ifdef BLOCKING_PACKET_QUEUE + cond.notify_one(); +#endif + } + +#ifdef BLOCKING_PACKET_QUEUE + /** + * Pop movable object from queue (blocking) + */ + std::unique_ptr pop(void) + { + std::unique_lock lock(mutex); + cond.wait(lock, [this] { return !queue.empty(); }); + T packet = queue.front()->move(); + queue.pop(); + return packet; + } +#endif + + /** + * Pop movable object from queue (non-blocking) + */ + std::unique_ptr try_pop() + { + std::lock_guard lock(mutex); + if (queue.empty()) + return {nullptr}; // std::move(T()); + auto packet = queue.front()->move(); + queue.pop(); + return packet; + } + + uint32_t size() const + { + std::lock_guard lock(mutex); + return queue.size(); + } + + private: + mutable std::mutex mutex; + std::queue> queue; +#ifdef BLOCKING_PACKET_QUEUE + std::condition_variable cond; +#endif +}; \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index fbfb983d2..92c651489 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -89,6 +89,11 @@ NRF52Bluetooth *nrf52Bluetooth; AudioThread *audioThread; #endif +#ifdef USE_PACKET_API +#include "sharedMem/MeshPacketServer.h" +#include "sharedMem/PacketClient.h" +#endif + using namespace concurrency; // We always create a screen object, but we only init it if we find the hardware @@ -860,6 +865,11 @@ void setup() initApiServer(TCPPort); #endif +#ifdef USE_PACKET_API + MeshPacketServer::init(); + PacketClient::init(); +#endif + // Start airtime logger thread. airTime = new AirTime(); diff --git a/src/mesh/PhoneAPI.h b/src/mesh/PhoneAPI.h index 450649d7b..9a6ec4b85 100644 --- a/src/mesh/PhoneAPI.h +++ b/src/mesh/PhoneAPI.h @@ -124,6 +124,9 @@ class PhoneAPI */ virtual void handleDisconnect(); + /// begin a new connection + void handleStartConfig(); + private: void releasePhonePacket(); @@ -131,9 +134,6 @@ class PhoneAPI void releaseMqttClientProxyPhonePacket(); - /// begin a new connection - void handleStartConfig(); - /** * Handle a packet that the phone wants us to send. We can write to it but can not keep a reference to it * @return true true if a packet was queued for sending diff --git a/src/mesh/api/PacketAPI.cpp b/src/mesh/api/PacketAPI.cpp new file mode 100644 index 000000000..d5507c98a --- /dev/null +++ b/src/mesh/api/PacketAPI.cpp @@ -0,0 +1,80 @@ +#include "api/PacketAPI.h" +#include "MeshService.h" +#include "RadioInterface.h" +#include "sharedMem/MeshPacketServer.h" + +PacketAPI *packetAPI = nullptr; + +void PacketAPI::init(void) {} + +PacketAPI::PacketAPI(PacketServer *_server) : concurrency::OSThread("PacketAPI"), isConnected(false), server(_server) {} + +int32_t PacketAPI::runOnce() +{ + bool success = sendPacket(); + success |= receivePacket(); + return success ? 10 : 50; +} + +bool PacketAPI::receivePacket(void) +{ + if (server->hasData()) { + isConnected = true; + + // TODO: think about redesign or drop class MeshPacketServer + meshtastic_ToRadio *mr; + // if (typeid(*server) == typeid(MeshPacketServer)) { + // dynamic_cast(server)->receivePacket(*mr); + // } + // else { + auto p = server->receivePacket()->move(); + int id = p->getPacketId(); + LOG_DEBUG("Received packet id=%u\n", id); + // mr = (meshtastic_ToRadio*)&dynamic_cast*>(p.get())->getData(); + mr = (meshtastic_ToRadio *)&static_cast *>(p.get())->getData(); + //} + + switch (mr->which_payload_variant) { + case meshtastic_ToRadio_packet_tag: { + meshtastic_MeshPacket *mp = &mr->packet; + printPacket("PACKET FROM QUEUE", mp); + service.handleToRadio(*mp); + break; + } + case meshtastic_ToRadio_want_config_id_tag: { + uint32_t config_nonce = mr->want_config_id; + LOG_INFO("Screen wants config, nonce=%u\n", config_nonce); + handleStartConfig(); + break; + } + default: + LOG_ERROR("Error: unhandled meshtastic_ToRadio variant: %d\n", mr->which_payload_variant); + break; + } + return true; + } else + return false; +} + +bool PacketAPI::sendPacket(void) +{ + // fill dummy buffer; we don't use it, we directly send the fromRadio structure + uint32_t len = getFromRadio(txBuf); + if (len != 0) { + // TODO: think about redesign or drop class MeshPacketServer + // if (typeid(*server) == typeid(MeshPacketServer)) + // return dynamic_cast(server)->sendPacket(fromRadioScratch); + // else + return server->sendPacket(DataPacket(fromRadioScratch.id, fromRadioScratch)); + } else + return false; +} + +/** + * return true if we got (once!) contact from our client and the server send queue is not full + */ +bool PacketAPI::checkIsConnected() +{ + isConnected |= server->hasData(); + return isConnected && server->available(); +} diff --git a/src/mesh/api/PacketAPI.h b/src/mesh/api/PacketAPI.h new file mode 100644 index 000000000..015bec224 --- /dev/null +++ b/src/mesh/api/PacketAPI.h @@ -0,0 +1,36 @@ +#pragma once + +#include "PhoneAPI.h" +#include "concurrency/OSThread.h" +#include "sharedMem/PacketServer.h" + +/** + * A version of the phone API used for inter task communication based on protobuf packets, e.g. + * between two tasks running on CPU0 and CPU1, respectively. + * + */ +class PacketAPI : public PhoneAPI, public concurrency::OSThread +{ + public: + PacketAPI(PacketServer *_server); + static void init(void); + virtual ~PacketAPI(){}; + virtual int32_t runOnce(); + + protected: + // Check the current underlying physical queue to see if the client is fetching packets + bool checkIsConnected() override; + + void onNowHasData(uint32_t fromRadioNum) override {} + void onConnectionChanged(bool connected) override {} + + private: + bool receivePacket(void); + bool sendPacket(void); + + bool isConnected; + PacketServer *server; + uint8_t txBuf[MAX_TO_FROM_RADIO_SIZE] = {0}; // dummy buf to obey PhoneAPI +}; + +extern PacketAPI *packetAPI; \ No newline at end of file diff --git a/src/mesh/sharedMem/MeshPacketServer.cpp b/src/mesh/sharedMem/MeshPacketServer.cpp new file mode 100644 index 000000000..8df56ab4a --- /dev/null +++ b/src/mesh/sharedMem/MeshPacketServer.cpp @@ -0,0 +1,44 @@ +#include "sharedMem/MeshPacketServer.h" +#include "api/PacketAPI.h" +#include "sharedMem/SharedQueue.h" + +SharedQueue *sharedQueue = nullptr; + +MeshPacketServer *meshPacketServer = nullptr; + +void MeshPacketServer::init(void) +{ + meshPacketServer = new MeshPacketServer; + packetAPI = new PacketAPI(meshPacketServer); + meshPacketServer->begin(); +} + +MeshPacketServer::MeshPacketServer() {} + +void MeshPacketServer::begin(void) +{ + sharedQueue = new SharedQueue; + PacketServer::begin(sharedQueue); +} + +bool MeshPacketServer::receivePacket(meshtastic_ToRadio &to) +{ + // auto p = PacketServer::receivePacket()->move(); + auto p = PacketServer::receivePacket()->move(); + if (p) { + // TODO: avoid data copy :( + // to = dynamic_cast*>(p.get())->getData(); + to = static_cast *>(p.get())->getData(); + } + return p != nullptr; +} + +bool MeshPacketServer::sendPacket(meshtastic_FromRadio &from) +{ + return PacketServer::sendPacket(DataPacket(from.id, from)); +} + +bool MeshPacketServer::sendPacket(meshtastic_FromRadio &&from) +{ + return PacketServer::sendPacket(DataPacket(from.id, from)); +} diff --git a/src/mesh/sharedMem/MeshPacketServer.h b/src/mesh/sharedMem/MeshPacketServer.h new file mode 100644 index 000000000..858f968df --- /dev/null +++ b/src/mesh/sharedMem/MeshPacketServer.h @@ -0,0 +1,20 @@ +#include "mesh-pb-constants.h" +#include "sharedMem/PacketServer.h" + +/** + * @brief This is a wrapper class for the PaketServer to avoid dealing with DataPackets + * in the application code. + * + */ +class MeshPacketServer : public PacketServer +{ + public: + MeshPacketServer(); + static void init(void); + virtual void begin(void); + virtual bool receivePacket(meshtastic_ToRadio &to); + virtual bool sendPacket(meshtastic_FromRadio &from); + virtual bool sendPacket(meshtastic_FromRadio &&from); +}; + +extern MeshPacketServer *meshPacketServer; \ No newline at end of file diff --git a/src/mesh/sharedMem/Packet.h b/src/mesh/sharedMem/Packet.h new file mode 100644 index 000000000..5ddff3f8e --- /dev/null +++ b/src/mesh/sharedMem/Packet.h @@ -0,0 +1,62 @@ +#pragma once + +#include + +/** + * Polymorphic packets that can be moved into and out of packet queues. + */ +class Packet +{ + public: + using PacketPtr = std::unique_ptr; + + Packet(int packetId) : id(packetId) {} + + // virtual move constructor + virtual PacketPtr move() { return PacketPtr(new Packet(std::move(*this))); } + + // Disable copying + Packet(const Packet &) = delete; + Packet &operator=(const Packet &) = delete; + + virtual ~Packet() {} + + int getPacketId() const { return id; } + + protected: + // Enable moving + Packet(Packet &&) = default; + Packet &operator=(Packet &&) = default; + + private: + int id; +}; + +/** + * generic packet type class + */ +template class DataPacket : public Packet +{ + public: + template DataPacket(int id, Args &&...args) : Packet(id), data(new PacketType(std::forward(args)...)) + { + } + + PacketPtr move() override { return PacketPtr(new DataPacket(std::move(*this))); } + + // Disable copying + DataPacket(const DataPacket &) = delete; + DataPacket &operator=(const DataPacket &) = delete; + + virtual ~DataPacket() {} + + const PacketType &getData() const { return *data; } + + protected: + // Enable moving + DataPacket(DataPacket &&) = default; + DataPacket &operator=(DataPacket &&) = default; + + private: + std::unique_ptr data; +}; diff --git a/src/mesh/sharedMem/PacketClient.cpp b/src/mesh/sharedMem/PacketClient.cpp new file mode 100644 index 000000000..82eb4f67e --- /dev/null +++ b/src/mesh/sharedMem/PacketClient.cpp @@ -0,0 +1,57 @@ +#include "sharedMem/PacketClient.h" +#include "DebugConfiguration.h" +#include "sharedMem/SharedQueue.h" +#include + +const uint32_t max_packet_queue_size = 10; + +PacketClient *packetClient = nullptr; + +void PacketClient::init(void) +{ + // for now we hard-code (only) one client task, but in principle one could + // create as many PacketServer/PacketClient pairs as desired. + packetClient = new PacketClient(); + packetClient->connect(sharedQueue); +} + +PacketClient::PacketClient() : queue(nullptr) {} + +int PacketClient::connect(SharedQueue *_queue) +{ + if (!queue) { + queue = _queue; + } else if (_queue != queue) { + LOG_WARN("Client already connected."); + } + return queue->serverQueueSize(); +} + +Packet::PacketPtr PacketClient::receivePacket() +{ + assert(queue); + if (queue->serverQueueSize() == 0) + return {nullptr}; + return queue->clientReceive(); +} + +bool PacketClient::sendPacket(Packet &&p) +{ + assert(queue); + if (queue->clientQueueSize() >= max_packet_queue_size) + return false; + queue->clientSend(std::move(p)); + return true; +} + +bool PacketClient::hasData() const +{ + assert(queue); + return queue->serverQueueSize() > 0; +} + +bool PacketClient::available() const +{ + assert(queue); + return queue->clientQueueSize() < max_packet_queue_size; +} diff --git a/src/mesh/sharedMem/PacketClient.h b/src/mesh/sharedMem/PacketClient.h new file mode 100644 index 000000000..4da3c34ec --- /dev/null +++ b/src/mesh/sharedMem/PacketClient.h @@ -0,0 +1,27 @@ +#pragma once + +#include "Packet.h" + +class SharedQueue; + +/** + * @brief Generic client implementation to receive from and + * send packets to the shared queue + * + */ +class PacketClient +{ + public: + PacketClient(); + static void init(void); + virtual int connect(SharedQueue *_queue); + virtual bool sendPacket(Packet &&p); + virtual Packet::PacketPtr receivePacket(); + virtual bool hasData() const; + virtual bool available() const; + + private: + SharedQueue *queue; +}; + +extern PacketClient *packetClient; diff --git a/src/mesh/sharedMem/PacketServer.cpp b/src/mesh/sharedMem/PacketServer.cpp new file mode 100644 index 000000000..5c9d692ad --- /dev/null +++ b/src/mesh/sharedMem/PacketServer.cpp @@ -0,0 +1,51 @@ +#include "sharedMem/PacketServer.h" +#include "sharedMem/SharedQueue.h" +#include + +const uint32_t max_packet_queue_size = 50; + +PacketServer::PacketServer() : queue(nullptr) {} + +void PacketServer::begin(SharedQueue *_queue) +{ + queue = _queue; +} + +#if 1 +Packet::PacketPtr PacketServer::receivePacket(void) +{ + assert(queue); + if (queue->clientQueueSize() == 0) + return {nullptr}; + return queue->serverReceive(); +} +#else // template variant with typed return values +template <> Packet::PacketPtr PacketServer::receivePacket() +{ + assert(queue); + if (queue->clientQueueSize() == 0) + return {nullptr}; + return queue->serverReceive(); +} +#endif + +bool PacketServer::sendPacket(Packet &&p) +{ + assert(queue); + if (queue->serverQueueSize() >= max_packet_queue_size) + return false; + queue->serverSend(std::move(p)); + return true; +} + +bool PacketServer::hasData() const +{ + assert(queue); + return queue->clientQueueSize() > 0; +} + +bool PacketServer::available() const +{ + assert(queue); + return queue->serverQueueSize() < max_packet_queue_size; +} diff --git a/src/mesh/sharedMem/PacketServer.h b/src/mesh/sharedMem/PacketServer.h new file mode 100644 index 000000000..34152ff14 --- /dev/null +++ b/src/mesh/sharedMem/PacketServer.h @@ -0,0 +1,25 @@ +#pragma once + +#include "concurrency/PacketQueue.h" +#include "sharedMem/Packet.h" + +class SharedQueue; + +/** + * Generic server implementation (base class) for bidirectional task communication + * Uses a queue that is shared with the + */ +class PacketServer +{ + public: + PacketServer(); + virtual void begin(SharedQueue *_queue); + virtual bool sendPacket(Packet &&p); + virtual Packet::PacketPtr receivePacket(); + // template T receivePacket(); + virtual bool hasData() const; + virtual bool available() const; + + private: + SharedQueue *queue; +}; diff --git a/src/mesh/sharedMem/SharedQueue.cpp b/src/mesh/sharedMem/SharedQueue.cpp new file mode 100644 index 000000000..f1971c4b0 --- /dev/null +++ b/src/mesh/sharedMem/SharedQueue.cpp @@ -0,0 +1,37 @@ +#include "sharedMem/SharedQueue.h" + +SharedQueue::SharedQueue() {} + +SharedQueue::~SharedQueue() {} + +bool SharedQueue::serverSend(Packet &&p) +{ + serverQueue.push(std::move(p)); + return true; +} + +Packet::PacketPtr SharedQueue::serverReceive() +{ + return clientQueue.try_pop(); +} + +size_t SharedQueue::serverQueueSize() const +{ + return serverQueue.size(); +} + +bool SharedQueue::clientSend(Packet &&p) +{ + clientQueue.push(std::move(p)); + return true; +} + +Packet::PacketPtr SharedQueue::clientReceive() +{ + return serverQueue.try_pop(); +} + +size_t SharedQueue::clientQueueSize() const +{ + return clientQueue.size(); +} diff --git a/src/mesh/sharedMem/SharedQueue.h b/src/mesh/sharedMem/SharedQueue.h new file mode 100644 index 000000000..9b5ea4ca0 --- /dev/null +++ b/src/mesh/sharedMem/SharedQueue.h @@ -0,0 +1,35 @@ +#pragma once + +#include "concurrency/PacketQueue.h" +#include "sharedMem/Packet.h" + +/** + * @brief Queue wrapper that aggregates two thread queues (namely client and server) + * for bidirectional packet transfer between two threads or processes. + * + * This queue may also be created in shared memory (e.g. in Linux for inter-process communication) + */ +class SharedQueue +{ + public: + SharedQueue(); + virtual ~SharedQueue(); + + // server methods + virtual bool serverSend(Packet &&p); + virtual Packet::PacketPtr serverReceive(); + virtual size_t serverQueueSize() const; + + // client methods + virtual bool clientSend(Packet &&p); + virtual Packet::PacketPtr clientReceive(); + virtual size_t clientQueueSize() const; + + private: + // the server pushes into serverQueue and the client pushes into clientQueue + // receiving is done from the opposite queue, respectively + PacketQueue serverQueue; + PacketQueue clientQueue; +}; + +extern SharedQueue *sharedQueue; \ No newline at end of file diff --git a/variants/portduino/platformio.ini b/variants/portduino/platformio.ini index d37c6be21..957c20006 100644 --- a/variants/portduino/platformio.ini +++ b/variants/portduino/platformio.ini @@ -1,6 +1,9 @@ [env:native] extends = portduino_base -build_flags = ${portduino_base.build_flags} -O0 -I variants/portduino +build_flags = ${portduino_base.build_flags} -O0 + -I variants/portduino + -D USE_PACKET_API + -D DEBUG_HEAP board = cross_platform lib_deps = ${portduino_base.lib_deps} -build_src_filter = ${portduino_base.build_src_filter} \ No newline at end of file +build_src_filter = ${portduino_base.build_src_filter} + \ No newline at end of file diff --git a/variants/t-deck/platformio.ini b/variants/t-deck/platformio.ini index cb6033300..14ab09804 100644 --- a/variants/t-deck/platformio.ini +++ b/variants/t-deck/platformio.ini @@ -5,6 +5,7 @@ board = t-deck upload_protocol = esptool #upload_port = COM29 +build_src_filter = ${esp32_base.build_src_filter} + build_flags = ${esp32_base.build_flags} -DT_DECK -DBOARD_HAS_PSRAM