refactoring part 2: move queue stuff into library

This commit is contained in:
mverch67 2024-04-06 13:53:08 +02:00
parent 25d8be327d
commit 9e6766b081
8 changed files with 5 additions and 212 deletions

View File

@ -1,73 +0,0 @@
#pragma once
#include <memory>
#include <mutex>
#include <queue>
#ifdef BLOCKING_PACKET_QUEUE
#include <condition_variable>
#endif
/**
* Generic platform independent and re-entrant queue wrapper that can be used to
* safely pass (generic) movable objects between threads.
*/
template <typename T> class PacketQueue
{
public:
PacketQueue() {}
PacketQueue(PacketQueue const &other) = delete;
/**
* Push movable object into queue
*/
void push(T &&packet)
{
std::lock_guard<std::mutex> lock(mutex);
queue.push(packet.move());
#ifdef BLOCKING_PACKET_QUEUE
cond.notify_one();
#endif
}
#ifdef BLOCKING_PACKET_QUEUE
/**
* Pop movable object from queue (blocking)
*/
std::unique_ptr<T> pop(void)
{
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, [this] { return !queue.empty(); });
T packet = queue.front()->move();
queue.pop();
return packet;
}
#endif
/**
* Pop movable object from queue (non-blocking)
*/
std::unique_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lock(mutex);
if (queue.empty())
return {nullptr};
auto packet = queue.front()->move();
queue.pop();
return packet;
}
uint32_t size() const
{
std::lock_guard<std::mutex> lock(mutex);
return queue.size();
}
private:
mutable std::mutex mutex;
std::queue<std::unique_ptr<T>> queue;
#ifdef BLOCKING_PACKET_QUEUE
std::condition_variable cond;
#endif
};

View File

@ -1,62 +0,0 @@
#pragma once
#include <memory>
/**
* Polymorphic packets that can be moved into and out of packet queues.
*/
class Packet
{
public:
using PacketPtr = std::unique_ptr<Packet>;
Packet(int packetId) : id(packetId) {}
// virtual move constructor
virtual PacketPtr move() { return PacketPtr(new Packet(std::move(*this))); }
// Disable copying
Packet(const Packet &) = delete;
Packet &operator=(const Packet &) = delete;
virtual ~Packet() {}
int getPacketId() const { return id; }
protected:
// Enable moving
Packet(Packet &&) = default;
Packet &operator=(Packet &&) = default;
private:
int id;
};
/**
* generic packet type class
*/
template <typename PacketType> class DataPacket : public Packet
{
public:
template <typename... Args> DataPacket(int id, Args &&...args) : Packet(id), data(new PacketType(std::forward<Args>(args)...))
{
}
PacketPtr move() override { return PacketPtr(new DataPacket(std::move(*this))); }
// Disable copying
DataPacket(const DataPacket &) = delete;
DataPacket &operator=(const DataPacket &) = delete;
virtual ~DataPacket() {}
const PacketType &getData() const { return *data; }
protected:
// Enable moving
DataPacket(DataPacket &&) = default;
DataPacket &operator=(DataPacket &&) = default;
private:
std::unique_ptr<PacketType> data;
};

View File

@ -1,6 +1,7 @@
#include "sharedMem/PacketClient.h"
#include "Packet.h"
#include "SharedQueue.h"
#include "configuration.h"
#include "sharedMem/SharedQueue.h"
#include <assert.h>
const uint32_t max_packet_queue_size = 10;

View File

@ -1,7 +1,6 @@
#pragma once
#include "IClientBase.h"
#include "Packet.h"
class SharedQueue;

View File

@ -1,6 +1,6 @@
#include "sharedMem/PacketServer.h"
#include "SharedQueue.h"
#include "api/PacketAPI.h"
#include "sharedMem/SharedQueue.h"
#include <assert.h>
const uint32_t max_packet_queue_size = 50;

View File

@ -1,7 +1,7 @@
#pragma once
#include "concurrency/PacketQueue.h"
#include "sharedMem/Packet.h"
#include "Packet.h"
#include "PacketQueue.h"
class SharedQueue;

View File

@ -1,37 +0,0 @@
#include "sharedMem/SharedQueue.h"
SharedQueue::SharedQueue() {}
SharedQueue::~SharedQueue() {}
bool SharedQueue::serverSend(Packet &&p)
{
serverQueue.push(std::move(p));
return true;
}
Packet::PacketPtr SharedQueue::serverReceive()
{
return clientQueue.try_pop();
}
size_t SharedQueue::serverQueueSize() const
{
return serverQueue.size();
}
bool SharedQueue::clientSend(Packet &&p)
{
clientQueue.push(std::move(p));
return true;
}
Packet::PacketPtr SharedQueue::clientReceive()
{
return serverQueue.try_pop();
}
size_t SharedQueue::clientQueueSize() const
{
return clientQueue.size();
}

View File

@ -1,35 +0,0 @@
#pragma once
#include "concurrency/PacketQueue.h"
#include "sharedMem/Packet.h"
/**
* @brief Queue wrapper that aggregates two thread queues (namely client and server)
* for bidirectional packet transfer between two threads or processes.
*
* This queue may also be created in shared memory (e.g. in Linux for inter-process communication)
*/
class SharedQueue
{
public:
SharedQueue();
virtual ~SharedQueue();
// server methods
virtual bool serverSend(Packet &&p);
virtual Packet::PacketPtr serverReceive();
virtual size_t serverQueueSize() const;
// client methods
virtual bool clientSend(Packet &&p);
virtual Packet::PacketPtr clientReceive();
virtual size_t clientQueueSize() const;
private:
// the server pushes into serverQueue and the client pushes into clientQueue
// receiving is done from the opposite queue, respectively
PacketQueue<Packet> serverQueue;
PacketQueue<Packet> clientQueue;
};
extern SharedQueue *sharedQueue;