PacketQueue thread communication

This commit is contained in:
mverch67 2024-02-21 19:40:14 +00:00 committed by Manuel
parent 72050530f1
commit 942b7d21a2
16 changed files with 566 additions and 5 deletions

View File

@ -0,0 +1,73 @@
#pragma once
#include <memory>
#include <mutex>
#include <queue>
#ifdef BLOCKING_PACKET_QUEUE
#include <condition_variable>
#endif
/**
* Generic platform independent and re-entrant queue wrapper that can be used to
* safely pass (generic) movable objects between threads.
*/
template <typename T> class PacketQueue
{
public:
PacketQueue() {}
PacketQueue(PacketQueue const &other) = delete;
/**
* Push movable object into queue
*/
void push(T &&packet)
{
std::lock_guard<std::mutex> lock(mutex);
queue.push(packet.move());
#ifdef BLOCKING_PACKET_QUEUE
cond.notify_one();
#endif
}
#ifdef BLOCKING_PACKET_QUEUE
/**
* Pop movable object from queue (blocking)
*/
std::unique_ptr<T> pop(void)
{
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, [this] { return !queue.empty(); });
T packet = queue.front()->move();
queue.pop();
return packet;
}
#endif
/**
* Pop movable object from queue (non-blocking)
*/
std::unique_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lock(mutex);
if (queue.empty())
return {nullptr}; // std::move(T());
auto packet = queue.front()->move();
queue.pop();
return packet;
}
uint32_t size() const
{
std::lock_guard<std::mutex> lock(mutex);
return queue.size();
}
private:
mutable std::mutex mutex;
std::queue<std::unique_ptr<T>> queue;
#ifdef BLOCKING_PACKET_QUEUE
std::condition_variable cond;
#endif
};

View File

@ -89,6 +89,11 @@ NRF52Bluetooth *nrf52Bluetooth;
AudioThread *audioThread; AudioThread *audioThread;
#endif #endif
#ifdef USE_PACKET_API
#include "sharedMem/MeshPacketServer.h"
#include "sharedMem/PacketClient.h"
#endif
using namespace concurrency; using namespace concurrency;
// We always create a screen object, but we only init it if we find the hardware // We always create a screen object, but we only init it if we find the hardware
@ -860,6 +865,11 @@ void setup()
initApiServer(TCPPort); initApiServer(TCPPort);
#endif #endif
#ifdef USE_PACKET_API
MeshPacketServer::init();
PacketClient::init();
#endif
// Start airtime logger thread. // Start airtime logger thread.
airTime = new AirTime(); airTime = new AirTime();

View File

@ -124,6 +124,9 @@ class PhoneAPI
*/ */
virtual void handleDisconnect(); virtual void handleDisconnect();
/// begin a new connection
void handleStartConfig();
private: private:
void releasePhonePacket(); void releasePhonePacket();
@ -131,9 +134,6 @@ class PhoneAPI
void releaseMqttClientProxyPhonePacket(); void releaseMqttClientProxyPhonePacket();
/// begin a new connection
void handleStartConfig();
/** /**
* Handle a packet that the phone wants us to send. We can write to it but can not keep a reference to it * Handle a packet that the phone wants us to send. We can write to it but can not keep a reference to it
* @return true true if a packet was queued for sending * @return true true if a packet was queued for sending

View File

@ -0,0 +1,80 @@
#include "api/PacketAPI.h"
#include "MeshService.h"
#include "RadioInterface.h"
#include "sharedMem/MeshPacketServer.h"
PacketAPI *packetAPI = nullptr;
void PacketAPI::init(void) {}
PacketAPI::PacketAPI(PacketServer *_server) : concurrency::OSThread("PacketAPI"), isConnected(false), server(_server) {}
int32_t PacketAPI::runOnce()
{
bool success = sendPacket();
success |= receivePacket();
return success ? 10 : 50;
}
bool PacketAPI::receivePacket(void)
{
if (server->hasData()) {
isConnected = true;
// TODO: think about redesign or drop class MeshPacketServer
meshtastic_ToRadio *mr;
// if (typeid(*server) == typeid(MeshPacketServer)) {
// dynamic_cast<MeshPacketServer*>(server)->receivePacket(*mr);
// }
// else {
auto p = server->receivePacket()->move();
int id = p->getPacketId();
LOG_DEBUG("Received packet id=%u\n", id);
// mr = (meshtastic_ToRadio*)&dynamic_cast<DataPacket<meshtastic_ToRadio>*>(p.get())->getData();
mr = (meshtastic_ToRadio *)&static_cast<DataPacket<meshtastic_ToRadio> *>(p.get())->getData();
//}
switch (mr->which_payload_variant) {
case meshtastic_ToRadio_packet_tag: {
meshtastic_MeshPacket *mp = &mr->packet;
printPacket("PACKET FROM QUEUE", mp);
service.handleToRadio(*mp);
break;
}
case meshtastic_ToRadio_want_config_id_tag: {
uint32_t config_nonce = mr->want_config_id;
LOG_INFO("Screen wants config, nonce=%u\n", config_nonce);
handleStartConfig();
break;
}
default:
LOG_ERROR("Error: unhandled meshtastic_ToRadio variant: %d\n", mr->which_payload_variant);
break;
}
return true;
} else
return false;
}
bool PacketAPI::sendPacket(void)
{
// fill dummy buffer; we don't use it, we directly send the fromRadio structure
uint32_t len = getFromRadio(txBuf);
if (len != 0) {
// TODO: think about redesign or drop class MeshPacketServer
// if (typeid(*server) == typeid(MeshPacketServer))
// return dynamic_cast<MeshPacketServer*>(server)->sendPacket(fromRadioScratch);
// else
return server->sendPacket(DataPacket<meshtastic_FromRadio>(fromRadioScratch.id, fromRadioScratch));
} else
return false;
}
/**
* return true if we got (once!) contact from our client and the server send queue is not full
*/
bool PacketAPI::checkIsConnected()
{
isConnected |= server->hasData();
return isConnected && server->available();
}

36
src/mesh/api/PacketAPI.h Normal file
View File

@ -0,0 +1,36 @@
#pragma once
#include "PhoneAPI.h"
#include "concurrency/OSThread.h"
#include "sharedMem/PacketServer.h"
/**
* A version of the phone API used for inter task communication based on protobuf packets, e.g.
* between two tasks running on CPU0 and CPU1, respectively.
*
*/
class PacketAPI : public PhoneAPI, public concurrency::OSThread
{
public:
PacketAPI(PacketServer *_server);
static void init(void);
virtual ~PacketAPI(){};
virtual int32_t runOnce();
protected:
// Check the current underlying physical queue to see if the client is fetching packets
bool checkIsConnected() override;
void onNowHasData(uint32_t fromRadioNum) override {}
void onConnectionChanged(bool connected) override {}
private:
bool receivePacket(void);
bool sendPacket(void);
bool isConnected;
PacketServer *server;
uint8_t txBuf[MAX_TO_FROM_RADIO_SIZE] = {0}; // dummy buf to obey PhoneAPI
};
extern PacketAPI *packetAPI;

View File

@ -0,0 +1,44 @@
#include "sharedMem/MeshPacketServer.h"
#include "api/PacketAPI.h"
#include "sharedMem/SharedQueue.h"
SharedQueue *sharedQueue = nullptr;
MeshPacketServer *meshPacketServer = nullptr;
void MeshPacketServer::init(void)
{
meshPacketServer = new MeshPacketServer;
packetAPI = new PacketAPI(meshPacketServer);
meshPacketServer->begin();
}
MeshPacketServer::MeshPacketServer() {}
void MeshPacketServer::begin(void)
{
sharedQueue = new SharedQueue;
PacketServer::begin(sharedQueue);
}
bool MeshPacketServer::receivePacket(meshtastic_ToRadio &to)
{
// auto p = PacketServer::receivePacket<Packet::PacketPtr>()->move();
auto p = PacketServer::receivePacket()->move();
if (p) {
// TODO: avoid data copy :(
// to = dynamic_cast<DataPacket<meshtastic_ToRadio>*>(p.get())->getData();
to = static_cast<DataPacket<meshtastic_ToRadio> *>(p.get())->getData();
}
return p != nullptr;
}
bool MeshPacketServer::sendPacket(meshtastic_FromRadio &from)
{
return PacketServer::sendPacket(DataPacket<meshtastic_FromRadio>(from.id, from));
}
bool MeshPacketServer::sendPacket(meshtastic_FromRadio &&from)
{
return PacketServer::sendPacket(DataPacket<meshtastic_FromRadio>(from.id, from));
}

View File

@ -0,0 +1,20 @@
#include "mesh-pb-constants.h"
#include "sharedMem/PacketServer.h"
/**
* @brief This is a wrapper class for the PaketServer to avoid dealing with DataPackets
* in the application code.
*
*/
class MeshPacketServer : public PacketServer
{
public:
MeshPacketServer();
static void init(void);
virtual void begin(void);
virtual bool receivePacket(meshtastic_ToRadio &to);
virtual bool sendPacket(meshtastic_FromRadio &from);
virtual bool sendPacket(meshtastic_FromRadio &&from);
};
extern MeshPacketServer *meshPacketServer;

View File

@ -0,0 +1,62 @@
#pragma once
#include <memory>
/**
* Polymorphic packets that can be moved into and out of packet queues.
*/
class Packet
{
public:
using PacketPtr = std::unique_ptr<Packet>;
Packet(int packetId) : id(packetId) {}
// virtual move constructor
virtual PacketPtr move() { return PacketPtr(new Packet(std::move(*this))); }
// Disable copying
Packet(const Packet &) = delete;
Packet &operator=(const Packet &) = delete;
virtual ~Packet() {}
int getPacketId() const { return id; }
protected:
// Enable moving
Packet(Packet &&) = default;
Packet &operator=(Packet &&) = default;
private:
int id;
};
/**
* generic packet type class
*/
template <typename PacketType> class DataPacket : public Packet
{
public:
template <typename... Args> DataPacket(int id, Args &&...args) : Packet(id), data(new PacketType(std::forward<Args>(args)...))
{
}
PacketPtr move() override { return PacketPtr(new DataPacket(std::move(*this))); }
// Disable copying
DataPacket(const DataPacket &) = delete;
DataPacket &operator=(const DataPacket &) = delete;
virtual ~DataPacket() {}
const PacketType &getData() const { return *data; }
protected:
// Enable moving
DataPacket(DataPacket &&) = default;
DataPacket &operator=(DataPacket &&) = default;
private:
std::unique_ptr<PacketType> data;
};

View File

@ -0,0 +1,57 @@
#include "sharedMem/PacketClient.h"
#include "DebugConfiguration.h"
#include "sharedMem/SharedQueue.h"
#include <assert.h>
const uint32_t max_packet_queue_size = 10;
PacketClient *packetClient = nullptr;
void PacketClient::init(void)
{
// for now we hard-code (only) one client task, but in principle one could
// create as many PacketServer/PacketClient pairs as desired.
packetClient = new PacketClient();
packetClient->connect(sharedQueue);
}
PacketClient::PacketClient() : queue(nullptr) {}
int PacketClient::connect(SharedQueue *_queue)
{
if (!queue) {
queue = _queue;
} else if (_queue != queue) {
LOG_WARN("Client already connected.");
}
return queue->serverQueueSize();
}
Packet::PacketPtr PacketClient::receivePacket()
{
assert(queue);
if (queue->serverQueueSize() == 0)
return {nullptr};
return queue->clientReceive();
}
bool PacketClient::sendPacket(Packet &&p)
{
assert(queue);
if (queue->clientQueueSize() >= max_packet_queue_size)
return false;
queue->clientSend(std::move(p));
return true;
}
bool PacketClient::hasData() const
{
assert(queue);
return queue->serverQueueSize() > 0;
}
bool PacketClient::available() const
{
assert(queue);
return queue->clientQueueSize() < max_packet_queue_size;
}

View File

@ -0,0 +1,27 @@
#pragma once
#include "Packet.h"
class SharedQueue;
/**
* @brief Generic client implementation to receive from and
* send packets to the shared queue
*
*/
class PacketClient
{
public:
PacketClient();
static void init(void);
virtual int connect(SharedQueue *_queue);
virtual bool sendPacket(Packet &&p);
virtual Packet::PacketPtr receivePacket();
virtual bool hasData() const;
virtual bool available() const;
private:
SharedQueue *queue;
};
extern PacketClient *packetClient;

View File

@ -0,0 +1,51 @@
#include "sharedMem/PacketServer.h"
#include "sharedMem/SharedQueue.h"
#include <assert.h>
const uint32_t max_packet_queue_size = 50;
PacketServer::PacketServer() : queue(nullptr) {}
void PacketServer::begin(SharedQueue *_queue)
{
queue = _queue;
}
#if 1
Packet::PacketPtr PacketServer::receivePacket(void)
{
assert(queue);
if (queue->clientQueueSize() == 0)
return {nullptr};
return queue->serverReceive();
}
#else // template variant with typed return values
template <> Packet::PacketPtr PacketServer::receivePacket<Packet::PacketPtr>()
{
assert(queue);
if (queue->clientQueueSize() == 0)
return {nullptr};
return queue->serverReceive();
}
#endif
bool PacketServer::sendPacket(Packet &&p)
{
assert(queue);
if (queue->serverQueueSize() >= max_packet_queue_size)
return false;
queue->serverSend(std::move(p));
return true;
}
bool PacketServer::hasData() const
{
assert(queue);
return queue->clientQueueSize() > 0;
}
bool PacketServer::available() const
{
assert(queue);
return queue->serverQueueSize() < max_packet_queue_size;
}

View File

@ -0,0 +1,25 @@
#pragma once
#include "concurrency/PacketQueue.h"
#include "sharedMem/Packet.h"
class SharedQueue;
/**
* Generic server implementation (base class) for bidirectional task communication
* Uses a queue that is shared with the
*/
class PacketServer
{
public:
PacketServer();
virtual void begin(SharedQueue *_queue);
virtual bool sendPacket(Packet &&p);
virtual Packet::PacketPtr receivePacket();
// template<typename T> T receivePacket();
virtual bool hasData() const;
virtual bool available() const;
private:
SharedQueue *queue;
};

View File

@ -0,0 +1,37 @@
#include "sharedMem/SharedQueue.h"
SharedQueue::SharedQueue() {}
SharedQueue::~SharedQueue() {}
bool SharedQueue::serverSend(Packet &&p)
{
serverQueue.push(std::move(p));
return true;
}
Packet::PacketPtr SharedQueue::serverReceive()
{
return clientQueue.try_pop();
}
size_t SharedQueue::serverQueueSize() const
{
return serverQueue.size();
}
bool SharedQueue::clientSend(Packet &&p)
{
clientQueue.push(std::move(p));
return true;
}
Packet::PacketPtr SharedQueue::clientReceive()
{
return serverQueue.try_pop();
}
size_t SharedQueue::clientQueueSize() const
{
return clientQueue.size();
}

View File

@ -0,0 +1,35 @@
#pragma once
#include "concurrency/PacketQueue.h"
#include "sharedMem/Packet.h"
/**
* @brief Queue wrapper that aggregates two thread queues (namely client and server)
* for bidirectional packet transfer between two threads or processes.
*
* This queue may also be created in shared memory (e.g. in Linux for inter-process communication)
*/
class SharedQueue
{
public:
SharedQueue();
virtual ~SharedQueue();
// server methods
virtual bool serverSend(Packet &&p);
virtual Packet::PacketPtr serverReceive();
virtual size_t serverQueueSize() const;
// client methods
virtual bool clientSend(Packet &&p);
virtual Packet::PacketPtr clientReceive();
virtual size_t clientQueueSize() const;
private:
// the server pushes into serverQueue and the client pushes into clientQueue
// receiving is done from the opposite queue, respectively
PacketQueue<Packet> serverQueue;
PacketQueue<Packet> clientQueue;
};
extern SharedQueue *sharedQueue;

View File

@ -1,6 +1,9 @@
[env:native] [env:native]
extends = portduino_base extends = portduino_base
build_flags = ${portduino_base.build_flags} -O0 -I variants/portduino build_flags = ${portduino_base.build_flags} -O0
-I variants/portduino
-D USE_PACKET_API
-D DEBUG_HEAP
board = cross_platform board = cross_platform
lib_deps = ${portduino_base.lib_deps} lib_deps = ${portduino_base.lib_deps}
build_src_filter = ${portduino_base.build_src_filter} build_src_filter = ${portduino_base.build_src_filter} +<mesh/sharedMem/>

View File

@ -5,6 +5,7 @@ board = t-deck
upload_protocol = esptool upload_protocol = esptool
#upload_port = COM29 #upload_port = COM29
build_src_filter = ${esp32_base.build_src_filter} +<mesh/sharedMem/>
build_flags = ${esp32_base.build_flags} build_flags = ${esp32_base.build_flags}
-DT_DECK -DT_DECK
-DBOARD_HAS_PSRAM -DBOARD_HAS_PSRAM