Skip to content

Commit

Permalink
FEC encoder -> reserve memory
Browse files Browse the repository at this point in the history
WBStreamTx -> add scheduling priority, and improve method naming
  • Loading branch information
Consti10 committed Feb 22, 2024
1 parent f7a72a6 commit 5e0aa45
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 32 deletions.
47 changes: 20 additions & 27 deletions wifibroadcast/src/WBStreamRx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <utility>

#include "SchedulingHelper.hpp"

WBStreamRx::WBStreamRx(std::shared_ptr<WBTxRx> txrx, Options options1)
: m_txrx(txrx), m_options(options1) {
assert(m_txrx);
Expand Down Expand Up @@ -75,15 +77,7 @@ void WBStreamRx::on_new_packet(uint64_t nonce, int wlan_index,
m_console->warn("Cannot enqueue packet");
}
} else {
if (m_options.enable_fec) {
if (!FECDecoder::validate_packet_size(data_len)) {
m_console->debug("invalid fec packet size {}", data_len);
return;
}
m_fec_decoder->process_valid_packet(data, data_len);
} else {
m_fec_disabled_decoder->process_packet(data, data_len);
}
internal_process_packet(data, data_len);
}
}

Expand All @@ -98,34 +92,21 @@ void WBStreamRx::on_new_session() {
}

void WBStreamRx::loop_process_data() {
std::shared_ptr<EnqueuedPacket> packet;
if (m_options.threading_enabled_set_max_realtime) {
SchedulingHelper::set_thread_params_max_realtime(
"WBStreamRx::loop_process_data", 80);
}
static constexpr std::int64_t timeout_usecs = 1000 * 1000;
while (m_process_data_thread_run) {
auto opt_packet =
m_packet_queue->wait_dequeue_timed(std::chrono::milliseconds(100));
if (opt_packet.has_value()) {
auto packet = opt_packet.value();
process_queued_packet(*packet);
internal_process_packet(packet->data->data(), (int)packet->data->size());
}
}
}

void WBStreamRx::process_queued_packet(
const WBStreamRx::EnqueuedPacket &packet) {
assert(m_options.enable_threading == true);
if (m_options.enable_fec) {
if (!FECDecoder::validate_packet_size(packet.data->size())) {
m_console->debug("invalid fec packet size {}", packet.data->size());
return;
}
m_fec_decoder->process_valid_packet(packet.data->data(),
packet.data->size());
} else {
m_fec_disabled_decoder->process_packet(packet.data->data(),
packet.data->size());
}
}

void WBStreamRx::on_decoded_packet(const uint8_t *data, int data_len) {
m_n_output_bytes += data_len;
if (m_out_cb) {
Expand Down Expand Up @@ -170,3 +151,15 @@ void WBStreamRx::reset_stream_stats() {
void WBStreamRx::set_on_fec_block_done_cb(WBStreamRx::ON_BLOCK_DONE_CB cb) {
m_fec_decoder->m_block_done_cb = cb;
}

void WBStreamRx::internal_process_packet(const uint8_t *data, int data_len) {
if (m_options.enable_fec) {
if (!FECDecoder::validate_packet_size(data_len)) {
m_console->debug("invalid fec packet size {}", data_len);
return;
}
m_fec_decoder->process_valid_packet(data, data_len);
} else {
m_fec_disabled_decoder->process_packet(data, data_len);
}
}
5 changes: 3 additions & 2 deletions wifibroadcast/src/WBStreamRx.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class WBStreamRx {
// enable / disable multi threading (decouples the processing of data from
// the thread that provided the data, e.g. the thread inside WBTxRx
bool enable_threading = false;
bool threading_enabled_set_max_realtime = false;
// only if threading is enabled
int packet_queue_size = 20;
// enable fec debug log, obviously only if fec is enbaled
Expand Down Expand Up @@ -101,9 +102,10 @@ class WBStreamRx {
std::unique_ptr<FECDecoder> m_fec_decoder = nullptr;
std::unique_ptr<FECDisabledDecoder> m_fec_disabled_decoder = nullptr;
void on_new_packet(uint64_t nonce, int wlan_index, const uint8_t *data,
const int data_len);
int data_len);
void on_new_session();
void on_decoded_packet(const uint8_t *data, int data_len);
void internal_process_packet(const uint8_t *data, int data_len);
// used only if threading is enabled
struct EnqueuedPacket {
std::shared_ptr<std::vector<uint8_t>> data;
Expand All @@ -113,7 +115,6 @@ class WBStreamRx {
bool m_process_data_thread_run = true;
std::unique_ptr<std::thread> m_process_data_thread;
void loop_process_data();
void process_queued_packet(const EnqueuedPacket &packet);
};

#endif // WIFIBROADCAST_WBSTREAMRX_H
6 changes: 5 additions & 1 deletion wifibroadcast/src/fec/FECEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
#include "BlockSizeHelper.hpp"
#include "FECConstants.hpp"

FECEncoder::FECEncoder() {
m_primary_fragments_data_p.reserve(MAX_N_P_FRAGMENTS_PER_BLOCK);
}

void FECEncoder::encode_block(
std::vector<std::shared_ptr<std::vector<uint8_t>>> data_packets,
const std::vector<std::shared_ptr<std::vector<uint8_t>>>& data_packets,
int n_secondary_fragments) {
assert(data_packets.size() <= MAX_N_P_FRAGMENTS_PER_BLOCK);
assert(n_secondary_fragments <= MAX_N_S_FRAGMENTS_PER_BLOCK);
Expand Down
4 changes: 2 additions & 2 deletions wifibroadcast/src/fec/FECEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class FECEncoder {
public:
typedef std::function<void(const uint8_t* packet, int packet_len)>
OUTPUT_DATA_CALLBACK;
explicit FECEncoder() = default;
explicit FECEncoder();
FECEncoder(const FECEncoder& other) = delete;

public:
Expand All @@ -24,7 +24,7 @@ class FECEncoder {
* should be created
*/
void encode_block(
std::vector<std::shared_ptr<std::vector<uint8_t>>> data_packets,
const std::vector<std::shared_ptr<std::vector<uint8_t>>>& data_packets,
int n_secondary_fragments);
/**
* Distributes data evenly into @param n_primary_fragments and calculates
Expand Down

0 comments on commit 5e0aa45

Please sign in to comment.