Skip to content

Commit

Permalink
improve video fec
Browse files Browse the repository at this point in the history
  • Loading branch information
Consti10 committed Jan 7, 2024
1 parent 71ffd76 commit 35a199a
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 25 deletions.
2 changes: 2 additions & 0 deletions WBLib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ target_sources(wifibroadcast PRIVATE
${CMAKE_CURRENT_LIST_DIR}/src/encryption/Encryption.cpp
${CMAKE_CURRENT_LIST_DIR}/src/encryption/EncryptionFsUtils.cpp
${CMAKE_CURRENT_LIST_DIR}/src/encryption/Decryptor.cpp
##
${CMAKE_CURRENT_LIST_DIR}/src/dummy_link/DummyLink.cpp

${CMAKE_CURRENT_LIST_DIR}/src/fec/FEC.cpp
${CMAKE_CURRENT_LIST_DIR}/src/fec/FECConstants.hpp
Expand Down
58 changes: 43 additions & 15 deletions src/WBVideoStreamTx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

#include "WBVideoStreamTx.h"

#include "BlockSizeHelper.hpp"
#include "SchedulingHelper.hpp"

struct CodecConfig{
struct CodecConfigPacket{
uint8_t codec_type;
uint16_t config_data_len;
// config_data_len bytes follow
Expand All @@ -24,7 +25,7 @@ WBVideoStreamTx::WBVideoStreamTx(
m_console=wifibroadcast::log::create_or_get("wb_tx"+std::to_string(options.radio_port));
}
assert(m_console);
m_block_queue=std::make_unique<moodycamel::BlockingReaderWriterCircularBuffer<std::shared_ptr<EnqueuedBlock>>>(options.frame_queue_size);
m_block_queue=std::make_unique<moodycamel::BlockingReaderWriterCircularBuffer<std::shared_ptr<EnqueuedFrame>>>(options.frame_queue_size);
m_fec_encoder = std::make_unique<FECEncoder>();
auto cb=[this](const uint8_t* packet,int packet_len){
send_packet(packet,packet_len);
Expand All @@ -46,37 +47,64 @@ void WBVideoStreamTx::loop_process_data() {
SchedulingHelper::setThreadParamsMaxRealtime();
}
static constexpr std::int64_t timeout_usecs=100*1000;
std::shared_ptr<EnqueuedBlock> frame= nullptr;
std::shared_ptr<EnqueuedFrame> frame= nullptr;
std::chrono::steady_clock::time_point last_config=std::chrono::steady_clock::now();
while (m_process_data_thread_run){
if(m_block_queue->wait_dequeue_timed(frame,timeout_usecs)){
process_enqueued_frame(*frame);
}
const auto now=std::chrono::steady_clock::now();
if(now-last_config>=options.codec_config_interval){
// send config data
last_config=now;
if(send_video_config()){
last_config=now;
}
}
}
}

void WBVideoStreamTx::process_enqueued_frame(const EnqueuedBlock &block) {
void WBVideoStreamTx::process_enqueued_frame(const EnqueuedFrame & enq_frame) {
//TODO: Figure out the ideal fragment size for this frame
const int n_primary_fragments=blocksize::div_ceil(enq_frame.frame->size(),FEC_PACKET_MAX_PAYLOAD_SIZE);
const int n_secondary_fragments= calculate_n_secondary_fragments(n_primary_fragments, enq_frame.fec_overhead_perc);
m_fec_encoder->fragment_and_encode(
enq_frame.frame->data(), enq_frame.frame->size(),n_primary_fragments,
n_secondary_fragments);
}

void WBVideoStreamTx::set_config_data(
uint8_t codec_type, std::shared_ptr<std::vector<uint8_t>> codec_config) {

}

void WBVideoStreamTx::enqueue_frame(
std::shared_ptr<std::vector<uint8_t>> codec_config) {

uint8_t codec_type, std::shared_ptr<std::vector<uint8_t>> config_buff) {
auto config=std::make_shared<CodecConfigData>();
config->codec_type=codec_type;
config->config_buff=config_buff;
std::lock_guard<std::mutex> guard(m_codec_config_mutex);
m_codec_config=config;
}

void WBVideoStreamTx::reset() {

bool WBVideoStreamTx::enqueue_frame(
std::shared_ptr<std::vector<uint8_t>> frame, int max_block_size,
int fec_overhead_perc,
std::chrono::steady_clock::time_point creation_time) {
auto item=std::make_shared<EnqueuedFrame>();
item->frame=frame;
item->max_block_size=max_block_size;
item->fec_overhead_perc=fec_overhead_perc;
item->creation_time=creation_time;
const bool res= m_block_queue->try_enqueue(item);
return res;
}

void WBVideoStreamTx::send_packet(const uint8_t *packet, int packet_len) {
const auto radiotap_header=m_radiotap_header_holder->thread_safe_get();
const bool encrypt=m_enable_encryption.load();
m_txrx->tx_inject_packet(options.radio_port,packet,packet_len,radiotap_header,encrypt);
}

bool WBVideoStreamTx::send_video_config() {
std::lock_guard<std::mutex> guard(m_codec_config_mutex);
if(m_codec_config== nullptr)return false;
auto& codec_config=*m_codec_config;
auto& config_buff=*m_codec_config->config_buff;
assert(!config_buff.empty() && config_buff.size()<MAX_PAYLOAD_BEFORE_FEC);
m_fec_encoder->fragment_and_encode(config_buff.data(),config_buff.size(),1,0);
return true;
}
24 changes: 15 additions & 9 deletions src/WBVideoStreamTx.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <thread>
#include <variant>

#include "moodycamel/concurrentqueue/blockingconcurrentqueue.h"
#include "moodycamel/readerwriterqueue/readerwritercircularbuffer.h"
#include "fec/FEC.h"
#include "SimpleStream.hpp"
Expand All @@ -33,16 +32,20 @@ class WBVideoStreamTx {
WBVideoStreamTx(const WBVideoStreamTx&) = delete;
WBVideoStreamTx&operator=(const WBVideoStreamTx&) = delete;
~WBVideoStreamTx();
struct EnqueuedBlock {
struct EnqueuedFrame {
std::chrono::steady_clock::time_point enqueue_time_point=std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point creation_time=std::chrono::steady_clock::now();
int max_block_size;
int fec_overhead_perc;
std::shared_ptr<std::vector<uint8_t>> frame= nullptr; // replaces fragments
std::shared_ptr<std::vector<uint8_t>> frame= nullptr;
};
void set_config_data(uint8_t codec_type,std::shared_ptr<std::vector<uint8_t>> codec_config);
void enqueue_frame(std::shared_ptr<std::vector<uint8_t>> frame);
void reset();
struct CodecConfigData{
uint8_t codec_type;
std::shared_ptr<std::vector<uint8_t>> config_buff=nullptr;
};
void set_config_data(uint8_t codec_type,std::shared_ptr<std::vector<uint8_t>> config_buff);
bool enqueue_frame(std::shared_ptr<std::vector<uint8_t>> frame,int max_block_size,int fec_overhead_perc,
std::chrono::steady_clock::time_point creation_time=std::chrono::steady_clock::now());
std::atomic_int32_t in_fps=0;
std::atomic_int32_t in_bps=0;
std::atomic_int32_t out_pps=0;
Expand All @@ -54,12 +57,15 @@ class WBVideoStreamTx {
// On the tx, either one of those two is active at the same time
std::unique_ptr<FECEncoder> m_fec_encoder = nullptr;
std::unique_ptr<std::thread> m_process_data_thread;
std::unique_ptr<moodycamel::BlockingReaderWriterCircularBuffer<std::shared_ptr<EnqueuedBlock>>> m_block_queue;
std::unique_ptr<moodycamel::BlockingReaderWriterCircularBuffer<std::shared_ptr<EnqueuedFrame>>> m_block_queue;
bool m_process_data_thread_run=true;
void loop_process_data();
void process_enqueued_frame(const EnqueuedBlock& block);
void process_enqueued_frame(const EnqueuedFrame& enq_frame);
void send_packet(const uint8_t* packet,int packet_len);
std::shared_ptr<std::vector<uint8_t>> m_codec_config=nullptr;
bool send_video_config();
std::shared_ptr<CodecConfigData> m_codec_config= nullptr;
std::mutex m_codec_config_mutex;
std::atomic<bool> m_enable_encryption=true;
};

#endif // WIFIBROADCAST_WBVIDEOSTREAMTX_H
1 change: 1 addition & 0 deletions src/fec/FECEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ void FECEncoder::encode_block(
}

void FECEncoder::fragment_and_encode(const uint8_t* data,int data_len,int n_primary_fragments,int n_secondary_fragments) {
// size of each fragment must not exceed max fec payload size
assert(data_len<=FEC_PACKET_MAX_PAYLOAD_SIZE*n_primary_fragments);
init_block(n_primary_fragments);
int consumed=0;
Expand Down
6 changes: 5 additions & 1 deletion src/fec/FECEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ class FECEncoder {
void encode_block(
std::vector<std::shared_ptr<std::vector<uint8_t>>> data_packets,
int n_secondary_fragments);
//
/**
* Distributes data evenly into @param n_primary_fragments and calculates
* @param n_secondary_fragments afterwards. Reduces latency to a minimum by forwarding
* packets via the cb as soon as possible (primary fragments are forwarded before the fec step is performed).
*/
void fragment_and_encode(const uint8_t* data,int data_len,int n_primary_fragments,int n_secondary_fragments);
OUTPUT_DATA_CALLBACK m_out_cb= nullptr;
AvgCalculator m_fec_block_encode_time;
Expand Down

0 comments on commit 35a199a

Please sign in to comment.