Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev: move fn mg_timer_add #170

Merged
merged 19 commits into from
Jul 22, 2023
Merged
5 changes: 3 additions & 2 deletions wiliwili/include/api/live/danmaku_live.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <thread>
#include <mutex>
#include <functional>
#include <utility>

#include "mongoose.h" // Include Mongoose header file

Expand All @@ -27,8 +28,8 @@ class LiveDanmaku : public brls::Singleton<LiveDanmaku> {
void send_heartbeat();
void send_text_message(const std::string &message);

void setonMessage(std::function<void(std::string)> func);
std::function<void(std::string)> onMessage;
void setonMessage(std::function<void(std::string&&)> func);
std::function<void(std::string&&)> onMessage;

void set_wait_time(int time);
int wait_time = 600;
Expand Down
2 changes: 2 additions & 0 deletions wiliwili/include/view/danmaku_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
class DanmakuItem {
public:
DanmakuItem(std::string content, const char *attributes);
DanmakuItem(std::string &&content, const std::string &attributes);

std::string msg; // 弹幕内容
float time; // 弹幕出现的时间
Expand Down Expand Up @@ -89,6 +90,7 @@ class DanmakuCore : public brls::Singleton<DanmakuCore> {
* @param item 单条弹幕
*/
void addSingleDanmaku(const DanmakuItem &item);
void addSingleDanmaku(DanmakuItem &&item);

/**
* 获取弹幕数据
Expand Down
23 changes: 8 additions & 15 deletions wiliwili/source/activity/live_player_activity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

using namespace brls::literals;

void onDanmakuReceived(std::string message) {
std::vector<uint8_t> payload(message.begin(), message.end());
static std::string tem = ",1,25,16777215,0,0,0,0,9";//临时方案
void onDanmakuReceived(std::string&& message) {
const std::string& msg = message;
std::vector<uint8_t> payload(msg.begin(), msg.end());
std::vector<std::string> messages = parse_packet(payload);

if(messages.size() == 0){
Expand All @@ -29,19 +31,10 @@ void onDanmakuReceived(std::string message) {
return;
}

std::vector<std::string> danmaku_list = extract_danmu_messages(messages);

double time;
std::string time_str;
std::string combined_attr;

std::string tem = ",1,25,16777215,0,0,0,0,9";//临时方案
for(auto &dan : danmaku_list){
time = MPVCore::instance().getPlaybackTime() + 0.3;
time_str = std::to_string(time);
combined_attr = time_str + tem;
DanmakuItem item(dan, combined_attr.c_str());
DanmakuCore::instance().addSingleDanmaku(item);
for(auto &&dan : extract_danmu_messages(messages)){
double time = MPVCore::instance().getPlaybackTime() + 0.1;
std::string combined_attr = std::to_string(time) + tem;
DanmakuCore::instance().addSingleDanmaku(DanmakuItem(std::move(dan), combined_attr));
}
}

Expand Down
10 changes: 4 additions & 6 deletions wiliwili/source/api/danmaku_live.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ void LiveDanmaku::connect(int room_id, int uid) {
break;
}
this->mongoose_mutex.unlock();

mg_timer_add(this->mgr, 20000, MG_TIMER_REPEAT, heartbeat_timer,
this);
mg_mgr_poll(this->mgr, wait_time);
}
mg_mgr_free(this->mgr);
Expand Down Expand Up @@ -168,16 +165,17 @@ static void mongoose_event_handler(struct mg_connection *nc, int ev, void *ev_da
liveDanmaku->ms_ev_ok.store(false, std::memory_order_release);
} else if (ev == MG_EV_WS_OPEN) {
liveDanmaku->send_join_request(liveDanmaku->room_id, liveDanmaku->uid);
mg_timer_add(liveDanmaku->mgr, 30000, MG_TIMER_REPEAT,
heartbeat_timer, user_data);
} else if (ev == MG_EV_WS_MSG) {
struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
std::string message(wm->data.ptr, wm->data.len);
liveDanmaku->onMessage(message);
liveDanmaku->onMessage(std::string(wm->data.ptr, wm->data.len));
} else if(ev == MG_EV_CLOSE) {
//liveDanmaku->disconnect();
liveDanmaku->ms_ev_ok.store(false, std::memory_order_release);
}
}

void LiveDanmaku::setonMessage(std::function<void(std::string)> func) {
void LiveDanmaku::setonMessage(std::function<void(std::string&&)> func) {
onMessage = func;
}
37 changes: 26 additions & 11 deletions wiliwili/source/api/util/extract_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,37 @@

#include <nlohmann/json.hpp>

#include <utility>

std::vector<std::string> extract_danmu_messages(const std::vector<std::string>& messages) {

std::vector<std::string> danmu_messages;
danmu_messages.reserve(messages.size()); // 预留空间

for (auto& message : messages) {

for (const auto& message : messages) {
try {
auto json_message = nlohmann::json::parse(message);
// // 简单验证
// if (message.size() < 10 ||
// message.substr(0, 5) != "{\"cmd\"") {
// continue;
// }

if (json_message.contains("cmd") && json_message["cmd"] == "DANMU_MSG") {
if (json_message.contains("info") && json_message["info"].is_array() && json_message["info"].size() > 1) {
danmu_messages.push_back(json_message["info"][1].get<std::string>());
//try {
nlohmann::json json_message = nlohmann::json::parse(message);

auto it = json_message.find("cmd");
if (it != json_message.end() && it->get<std::string>() == "DANMU_MSG") {

auto& info = json_message["info"];
if (info.is_array() && info.size() > 1) {
// 直接插入结果,避免中间变量
danmu_messages.emplace_back(info[1].get_ref<const std::string&>());
}
}
} catch (const nlohmann::json::parse_error& e) {
// /std::cerr << "Failed to parse JSON message: " << e.what() << std::endl;
}
}

//} catch(const std::exception& e) {
// 忽略JSON解析错误
//}
}
return danmu_messages;
}
}
108 changes: 41 additions & 67 deletions wiliwili/source/api/util/ws_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "live/ws_utils.hpp"

#include <utility>
#include <iostream>
#include <cstring>
#include <memory>
Expand All @@ -11,103 +12,76 @@
#include <zlib.h>
//#include <brotli/decode.h>



//buffer
static uint8_t buffer[1024 * 256];
// 解析数据包
std::vector<std::string> parse_packet(const std::vector<uint8_t>& data) {
std::vector<std::string> messages;
messages.reserve(128);

std::vector<uint8_t> decompressed;
decompressed.reserve(1024 * 256);

z_stream strm;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;

size_t data_len = data.size();
size_t offset = 0;

uint32_t packet_length;
uint16_t header_length;
uint16_t protocol_version;
uint32_t operation;
while (offset < data_len) {
std::memcpy(&packet_length, data.data() + offset, sizeof(uint32_t));
std::memcpy(&header_length, data.data() + offset + 4, sizeof(uint16_t));
std::memcpy(&protocol_version, data.data() + offset + 6,
sizeof(uint16_t));
std::memcpy(&operation, data.data() + offset + 8, sizeof(uint32_t));

packet_length = ntohl(packet_length);
header_length = ntohs(header_length);
protocol_version = ntohs(protocol_version);
operation = ntohl(operation);
uint32_t packet_length = ntohl(*reinterpret_cast<const uint32_t*>(data.data() + offset));
uint16_t header_length = ntohs(*reinterpret_cast<const uint16_t*>(data.data() + offset + 4));
uint16_t protocol_version = ntohs(*reinterpret_cast<const uint16_t*>(data.data() + offset + 6));
uint32_t operation = ntohl(*reinterpret_cast<const uint32_t*>(data.data() + offset + 8));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这一段可以找到更好的方式来实现吗?直接使用 reinterpret_cast 转换两个数字类型会导致 Undefined Behavior,虽然大多数时候都能正常运行,但是还是应该避免这个警告。

https://stackoverflow.com/a/25586060

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我使用的uint32_t应该能在大部分平台保持一致吧,你引用的问题使用了reinterpret_cast<unsigned long>(int),他这个确实可能会有问题
要不就换回我之前使用的memcpy。。

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maye174 你之前就是写的 reinterpret_cast,这里的 memcpy 是我改的,如果不改的话在调试的时候IDE一直报告问题。
我在这里有提及:#164 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xfangfang 那你再改回去吧,我真没注意这个,我比较惯用这个。。


offset += header_length;

//| 3 | 服务器 | 数据类型为Int 32 Big Endian | 心跳回应 | Body 内容为房间人气值 |
if (operation == 3){
uint32_t body = ntohl(*reinterpret_cast<const uint32_t*>(data.data() + offset));
std::string heart_reply = "heartbeat reply: " + std::to_string(body);
messages.push_back(heart_reply);
messages.emplace_back("heartbeat reply: " + std::to_string(body));
break;
}
//| 5 | 服务器 | 数据类型为JSON纯文本 | 通知 | 弹幕、广播等全部信息 |
else if (operation == 5) {
std::string body(reinterpret_cast<const char*>(data.data() + offset), packet_length - header_length);

switch (protocol_version) {
case 0: {
messages.push_back(body);
std::string body(
reinterpret_cast<const char*>(data.data() + offset),
packet_length - header_length);

if (protocol_version == 0) {
messages.emplace_back(std::move(body));
break;
}
else if (protocol_version == 2){
strm.avail_in = body.size();
strm.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(body.data()));
if (inflateInit(&strm) != Z_OK) {
//std::cerr << "Failed to initialize zlib" << std::endl;
break;
}
case 2: {
z_stream strm;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = body.size();
strm.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(body.data()));

if (inflateInit(&strm) != Z_OK) {
std::cerr << "Failed to initialize zlib" << std::endl;
do {
strm.avail_out = sizeof(buffer);
strm.next_out = buffer;
if (inflate(&strm, Z_NO_FLUSH) == Z_STREAM_ERROR) {
//std::cerr << "Failed to inflate zlib stream" << std::endl;
break;
}

std::vector<uint8_t> decompressed;
do {
uint8_t buffer[16384];
strm.avail_out = sizeof(buffer);
strm.next_out = buffer;
if (inflate(&strm, Z_NO_FLUSH) == Z_STREAM_ERROR) {
std::cerr << "Failed to inflate zlib stream" << std::endl;
break;
}
decompressed.insert(decompressed.end(), buffer, buffer + sizeof(buffer) - strm.avail_out);
} while (strm.avail_out == 0);
inflateEnd(&strm);

auto nested_messages = parse_packet(decompressed);
messages.insert(messages.end(), nested_messages.begin(), nested_messages.end());
break;
}
case 3: {
/*std::vector<uint8_t> decompressed;
size_t available_out = 0;
BrotliDecoderState* brotli_state = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
BrotliDecoderDecompress(body.size(), reinterpret_cast<const uint8_t*>(body.data()), &available_out, decompressed.data(), nullptr);
BrotliDecoderDestroyInstance(brotli_state);

auto nested_messages = parse_packet(decompressed);
messages.insert(messages.end(), nested_messages.begin(), nested_messages.end());
*/break;
}
default: {
std::cerr << "Unknown protocol version: " << protocol_version << std::endl;
break;
}
decompressed.insert(decompressed.end(), buffer, buffer + sizeof(buffer) - strm.avail_out);
} while (strm.avail_out == 0);
inflateEnd(&strm);
auto nested_messages = parse_packet(decompressed);
messages.insert(messages.end(), nested_messages.begin(), nested_messages.end());
}
}

offset += (packet_length - header_length);
}

return messages;
}

// 编码数据包
// 编码数据包
std::vector<uint8_t> encode_packet(uint16_t protocol_version, uint32_t operation, const std::string& body) {
std::vector<uint8_t> packet;
Expand Down
47 changes: 47 additions & 0 deletions wiliwili/source/view/danmaku_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//

#include <pystring.h>
#include <utility>

#include "view/danmaku_core.hpp"
#include "view/mpv_core.hpp"
Expand Down Expand Up @@ -43,6 +44,42 @@ DanmakuItem::DanmakuItem(std::string content, const char *attributes)
}
}

DanmakuItem::DanmakuItem(std::string &&content, const std::string &attributes){
msg = std::move(content);

#ifdef OPENCC
static bool ZH_T = brls::Application::getLocale() == brls::LOCALE_ZH_HANT ||
brls::Application::getLocale() == brls::LOCALE_ZH_TW;
if (ZH_T && brls::Label::OPENCC_ON) msg = brls::Label::STConverter(msg);
#endif
std::vector<std::string> attrs;
pystring::split(attributes.c_str(), attrs, ",");
if (attrs.size() < 9) {
brls::Logger::error("error decode danmaku: {} {}", msg, attributes);
type = -1;
return;
}
time = atof(attrs[0].c_str());
type = atoi(attrs[1].c_str());
fontSize = atoi(attrs[2].c_str());
fontColor = atoi(attrs[3].c_str());
level = atoi(attrs[8].c_str());

int r = (fontColor >> 16) & 0xff;
int g = (fontColor >> 8) & 0xff;
int b = fontColor & 0xff;
isDefaultColor = (r & g & b) == 0xff;
color = nvgRGB(r, g, b);
color.a = DanmakuCore::DANMAKU_STYLE_ALPHA * 0.01;
borderColor.a = DanmakuCore::DANMAKU_STYLE_ALPHA * 0.005;

// 判断是否添加浅色边框
if ((r * 299 + g * 587 + b * 114) < 60000) {
borderColor = nvgRGB(255, 255, 255);
borderColor.a = DanmakuCore::DANMAKU_STYLE_ALPHA * 0.5;
}
}

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里面可以和原本的构造函数复用的部分是不是应该写到一起去,避免相同的代码写两遍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那传递的时候依旧需要复制一遍,就没有意义了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果你指的是单独拿出重复的部分,那是没有问题的

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果你指的是单独拿出重复的部分,那是没有问题的

是的我就是这个意思,这样之后改的话也不需要同时改两个地方

void DanmakuCore::reset() {
danmakuMutex.lock();
lineNum = 20;
Expand Down Expand Up @@ -78,6 +115,16 @@ void DanmakuCore::addSingleDanmaku(const DanmakuItem &item) {
MPVCore::instance().getCustomEvent()->fire("DANMAKU_LOADED", nullptr);
}

void DanmakuCore::addSingleDanmaku(DanmakuItem &&item) {
danmakuMutex.lock();
this->danmakuData.emplace_back(std::move(item));
this->danmakuLoaded = true;
danmakuMutex.unlock();

// 通过mpv来通知弹幕加载完成
MPVCore::instance().getCustomEvent()->fire("DANMAKU_LOADED", nullptr);
}

void DanmakuCore::refresh() {
danmakuMutex.lock();

Expand Down
Loading