Skip to content

Commit

Permalink
improve dummy link threading
Browse files Browse the repository at this point in the history
  • Loading branch information
Consti10 committed Jan 8, 2024
1 parent 637024d commit 3dc70d7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
28 changes: 20 additions & 8 deletions src/dummy_link/DummyLink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <string>
#include <vector>

#include "../HelperSources/SocketHelper.hpp"

// From http://www.atakansarioglu.com/linux-ipc-inter-process-messaging-linux-domain-socket-fifo-pipe-shared-memory-shm-example/

static sockaddr_un create_adr(const std::string& name){
Expand Down Expand Up @@ -90,7 +92,10 @@ DummyLink::DummyLink(bool is_air):m_is_air(is_air) {
m_fn_rx="air";
}
m_fd_rx=create_socket_read(m_fn_rx);
//SocketHelper::set_socket_send_rcv_timeout(m_fd_rx,std::chrono::milliseconds(10), true);
m_fd_tx=create_socket_send();
m_rx_queue=std::make_unique<moodycamel::BlockingReaderWriterCircularBuffer<std::shared_ptr<DummyLink::RxPacket>>>(
1000);
m_keep_receiving= true;
m_receive_thread=std::make_unique<std::thread>(&DummyLink::loop_rx, this);
}
Expand All @@ -102,18 +107,19 @@ DummyLink::~DummyLink() {
}

void DummyLink::tx_radiotap(const uint8_t *packet_buff, int packet_size) {
const bool drop=should_drop();
//const bool drop=should_drop();
const bool drop= false;
if(!should_drop()){
send_data(m_fd_tx,m_fn_tx,packet_buff,packet_size);
}
}

std::shared_ptr<std::vector<uint8_t>> DummyLink::rx_radiotap() {
std::lock_guard<std::mutex> guard(m_rx_mutex);
if(!m_rx_queue.empty()){
auto packet=m_rx_queue.front();
m_rx_queue.pop();
return packet;
std::shared_ptr<DummyLink::RxPacket> packet= nullptr;
static constexpr std::int64_t timeout_usecs=100*1000;
if(m_rx_queue->wait_dequeue_timed(packet,timeout_usecs)) {
// dequeued frame
return packet->buff;
}
return nullptr;
}
Expand All @@ -123,8 +129,14 @@ void DummyLink::loop_rx() {
auto packet= read_data(m_fd_rx);
if(packet!= nullptr){
//std::cout<<"Got packet"<<packet->size()<<std::endl;
std::lock_guard<std::mutex> guard(m_rx_mutex);
m_rx_queue.push(packet);
auto item=std::make_shared<DummyLink::RxPacket>();
item->buff=packet;
//auto success=m_rx_queue->try_enqueue(item);
static constexpr std::int64_t timeout_usecs=100*1000;
auto success=m_rx_queue->wait_enqueue_timed(item,timeout_usecs);
if(!success){
//std::cout<<"Cannot enqueue packet"<<std::endl;
}
}
//std::cout<<"ARGH"<<std::endl;
}
Expand Down
8 changes: 6 additions & 2 deletions src/dummy_link/DummyLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <thread>
#include <vector>

#include "../moodycamel/readerwriterqueue/readerwritercircularbuffer.h"

// TODO: Write something that emulates a wb link (tx, rx)
// using linux shm or similar
class DummyLink {
Expand All @@ -28,8 +30,6 @@ class DummyLink {
int m_fd_rx;
std::string m_fn_tx;
std::string m_fn_rx;
std::queue<std::shared_ptr<std::vector<uint8_t>>> m_rx_queue;
std::mutex m_rx_mutex;
std::unique_ptr<std::thread> m_receive_thread;
void loop_rx();
bool m_keep_receiving= true;
Expand All @@ -40,6 +40,10 @@ class DummyLink {
}
std::mt19937 m_mt;
std::uniform_int_distribution<> m_dist100{0,100};
struct RxPacket{
std::shared_ptr<std::vector<uint8_t>> buff;
};
std::unique_ptr<moodycamel::BlockingReaderWriterCircularBuffer<std::shared_ptr<RxPacket>>> m_rx_queue;
};


Expand Down

0 comments on commit 3dc70d7

Please sign in to comment.