Skip to content

Commit

Permalink
Merge pull request #2495 from get10101/feat/add-order-to-offer-messages
Browse files Browse the repository at this point in the history
feat: Move match, async match and rollover websocket events to corresponding offer message
  • Loading branch information
holzeis authored May 2, 2024
2 parents 2397dd6 + 9abef5c commit bdca42b
Show file tree
Hide file tree
Showing 32 changed files with 613 additions and 340 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TYPE "Message_Type_Type"
ADD
VALUE IF NOT EXISTS 'RolloverOffer';
10 changes: 5 additions & 5 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,10 @@ async fn main() -> Result<()> {

let (tx_orderbook_feed, _rx) = broadcast::channel(100);

let notification_service = NotificationService::new(opts.fcm_api_key.clone());
let notification_service =
NotificationService::new(opts.fcm_api_key.clone(), node.pool.clone());

let (_handle, auth_users_notifier) = spawn_delivering_messages_to_authenticated_users(
pool.clone(),
notification_service.get_sender(),
tx_user_feed.clone(),
);
Expand All @@ -260,6 +260,7 @@ async fn main() -> Result<()> {
node.clone(),
tx_orderbook_feed.clone(),
auth_users_notifier.clone(),
notification_service.get_sender(),
network,
node.inner.oracle_pubkey,
);
Expand All @@ -273,7 +274,7 @@ async fn main() -> Result<()> {
let _handle = rollover::monitor(
pool.clone(),
node_event_handler.subscribe(),
auth_users_notifier.clone(),
notification_service.get_sender(),
network,
node.clone(),
);
Expand Down Expand Up @@ -331,8 +332,7 @@ async fn main() -> Result<()> {
);

let sender = notification_service.get_sender();
let notification_scheduler =
NotificationScheduler::new(sender, settings, network, node, auth_users_notifier);
let notification_scheduler = NotificationScheduler::new(sender, settings, network, node);
tokio::spawn({
let pool = pool.clone();
let scheduler = notification_scheduler;
Expand Down
24 changes: 3 additions & 21 deletions coordinator/src/campaign.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::db;
use crate::notifications::FcmToken;
use crate::notifications::Notification;
use crate::notifications::NotificationKind;
use crate::routes::AppState;
Expand Down Expand Up @@ -28,22 +26,6 @@ pub async fn post_push_campaign(
let params = params.0;
tracing::info!(?params, "Sending campaign with push notifications");

let mut conn = state
.pool
.get()
.map_err(|e| AppError::InternalServerError(format!("Could not get connection: {e:#}")))?;

let users = db::user::get_users(&mut conn, params.node_ids)
.map_err(|e| AppError::InternalServerError(format!("Failed to get users: {e:#}")))?;

let fcm_tokens = users
.iter()
.map(|user| user.fcm_token.clone())
.filter(|token| !token.is_empty() && token != "unavailable")
.map(FcmToken::new)
.filter_map(Result::ok)
.collect::<Vec<_>>();

let notification_kind = NotificationKind::Custom {
title: params.title.clone(),
message: params.message.clone(),
Expand All @@ -52,7 +34,7 @@ pub async fn post_push_campaign(
tracing::info!(
params.title,
params.message,
receivers = fcm_tokens.len(),
receivers = params.node_ids.len(),
"Sending push notification campaign",
);

Expand All @@ -62,7 +44,7 @@ pub async fn post_push_campaign(
state
.notification_sender
.send(Notification::new_batch(
fcm_tokens.clone(),
params.clone().node_ids,
notification_kind,
))
.await
Expand All @@ -75,6 +57,6 @@ pub async fn post_push_campaign(
"Sending push notification campaign (title: {}, message: {} to {} users",
params.title,
params.message,
fcm_tokens.len(),
params.node_ids.len(),
))
}
2 changes: 2 additions & 0 deletions coordinator/src/db/custom_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl ToSql<MessageTypeType, Pg> for MessageType {
MessageType::SettleConfirm => out.write_all(b"SettleConfirm")?,
MessageType::SettleFinalize => out.write_all(b"SettleFinalize")?,
MessageType::RenewOffer => out.write_all(b"RenewOffer")?,
MessageType::RolloverOffer => out.write_all(b"RolloverOffer")?,
MessageType::RenewAccept => out.write_all(b"RenewAccept")?,
MessageType::RenewConfirm => out.write_all(b"RenewConfirm")?,
MessageType::RenewFinalize => out.write_all(b"RenewFinalize")?,
Expand All @@ -127,6 +128,7 @@ impl FromSql<MessageTypeType, Pg> for MessageType {
b"SettleConfirm" => Ok(MessageType::SettleConfirm),
b"SettleFinalize" => Ok(MessageType::SettleFinalize),
b"RenewOffer" => Ok(MessageType::RenewOffer),
b"RolloverOffer" => Ok(MessageType::RolloverOffer),
b"RenewAccept" => Ok(MessageType::RenewAccept),
b"RenewConfirm" => Ok(MessageType::RenewConfirm),
b"RenewFinalize" => Ok(MessageType::RenewFinalize),
Expand Down
3 changes: 3 additions & 0 deletions coordinator/src/db/dlc_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub(crate) enum MessageType {
SettleConfirm,
SettleFinalize,
RenewOffer,
RolloverOffer,
RenewAccept,
RenewConfirm,
RenewFinalize,
Expand Down Expand Up @@ -102,6 +103,7 @@ impl From<xxi_node::dlc_message::DlcMessageType> for MessageType {
xxi_node::dlc_message::DlcMessageType::SettleConfirm => Self::SettleConfirm,
xxi_node::dlc_message::DlcMessageType::SettleFinalize => Self::SettleFinalize,
xxi_node::dlc_message::DlcMessageType::RenewOffer => Self::RenewOffer,
xxi_node::dlc_message::DlcMessageType::RolloverOffer => Self::RolloverOffer,
xxi_node::dlc_message::DlcMessageType::RenewAccept => Self::RenewAccept,
xxi_node::dlc_message::DlcMessageType::RenewConfirm => Self::RenewConfirm,
xxi_node::dlc_message::DlcMessageType::RenewFinalize => Self::RenewFinalize,
Expand Down Expand Up @@ -137,6 +139,7 @@ impl From<MessageType> for xxi_node::dlc_message::DlcMessageType {
MessageType::SettleConfirm => xxi_node::dlc_message::DlcMessageType::SettleConfirm,
MessageType::SettleFinalize => xxi_node::dlc_message::DlcMessageType::SettleFinalize,
MessageType::RenewOffer => xxi_node::dlc_message::DlcMessageType::RenewOffer,
MessageType::RolloverOffer => xxi_node::dlc_message::DlcMessageType::RolloverOffer,
MessageType::RenewAccept => xxi_node::dlc_message::DlcMessageType::RenewAccept,
MessageType::RenewConfirm => xxi_node::dlc_message::DlcMessageType::RenewConfirm,
MessageType::RenewFinalize => xxi_node::dlc_message::DlcMessageType::RenewFinalize,
Expand Down
1 change: 0 additions & 1 deletion coordinator/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub mod dlc_protocols;
pub mod last_outbound_dlc_message;
pub mod liquidity;
pub mod liquidity_options;
pub mod orders_helper;
pub mod polls;
pub mod positions;
pub mod reported_errors;
Expand Down
50 changes: 0 additions & 50 deletions coordinator/src/db/orders_helper.rs

This file was deleted.

24 changes: 3 additions & 21 deletions coordinator/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
use crate::db::user;
use crate::notifications::FcmToken;
use crate::notifications::Notification;
use crate::notifications::NotificationKind;
use anyhow::Context;
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::PgConnection;
use futures::future::RemoteHandle;
use futures::FutureExt;
use parking_lot::RwLock;
Expand All @@ -17,10 +12,9 @@ use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::task::spawn_blocking;
use xxi_node::commons::Message;

/// This value is arbitrarily set to 100 and defines theff message accepted in the message
/// This value is arbitrarily set to 100 and defines the message accepted in the message
/// channel buffer.
const NOTIFICATION_BUFFER_SIZE: usize = 100;

Expand All @@ -41,7 +35,6 @@ pub struct NewUserMessage {
}

pub fn spawn_delivering_messages_to_authenticated_users(
pool: Pool<ConnectionManager<PgConnection>>,
notification_sender: Sender<Notification>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
) -> (RemoteHandle<()>, Sender<OrderbookMessage>) {
Expand Down Expand Up @@ -76,7 +69,6 @@ pub fn spawn_delivering_messages_to_authenticated_users(
async move {
while let Some(notification) = receiver.recv().await {
if let Err(e) = process_orderbook_message(
pool.clone(),
&authenticated_users,
&notification_sender,
notification,
Expand All @@ -98,15 +90,10 @@ pub fn spawn_delivering_messages_to_authenticated_users(
}

async fn process_orderbook_message(
pool: Pool<ConnectionManager<PgConnection>>,
authenticated_users: &RwLock<HashMap<PublicKey, Sender<Message>>>,
notification_sender: &Sender<Notification>,
notification: OrderbookMessage,
) -> Result<()> {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

match notification {
OrderbookMessage::TraderMessage {
trader_id,
Expand All @@ -133,16 +120,11 @@ async fn process_orderbook_message(
None => tracing::warn!(%trader_id, "Trader is not connected"),
};

let user = user::by_id(&mut conn, trader_id.to_string())
.context("Failed to get user by ID")?;

if let (Some(notification_kind), Some(user)) = (notification, user) {
if let Some(notification_kind) = notification {
tracing::debug!(%trader_id, "Sending push notification to user");

let fcm_token = FcmToken::new(user.fcm_token)?;

notification_sender
.send(Notification::new(fcm_token, notification_kind))
.send(Notification::new(trader_id, notification_kind))
.await
.with_context(|| {
format!("Failed to send push notification to trader {trader_id}")
Expand Down
29 changes: 13 additions & 16 deletions coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use crate::db::positions;
use crate::dlc_protocol;
use crate::dlc_protocol::DlcProtocolType;
use crate::dlc_protocol::ProtocolId;
use crate::message::OrderbookMessage;
use crate::node::Node;
use crate::notifications::Notification;
use crate::notifications::NotificationKind;
use crate::position::models::PositionState;
use anyhow::bail;
Expand Down Expand Up @@ -53,7 +53,7 @@ struct Rollover {
pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
mut receiver: broadcast::Receiver<NodeEvent>,
notifier: mpsc::Sender<OrderbookMessage>,
notifier: mpsc::Sender<Notification>,
network: Network,
node: Node,
) -> RemoteHandle<()> {
Expand Down Expand Up @@ -156,7 +156,7 @@ impl Node {
async fn check_if_eligible_for_rollover(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
notifier: mpsc::Sender<OrderbookMessage>,
notifier: mpsc::Sender<Notification>,
trader_id: PublicKey,
network: Network,
) -> Result<()> {
Expand Down Expand Up @@ -195,13 +195,11 @@ impl Node {
trader_id: PublicKey,
expiry_timestamp: OffsetDateTime,
network: Network,
notifier: &mpsc::Sender<OrderbookMessage>,
notifier: &mpsc::Sender<Notification>,
notification: Option<NotificationKind>,
) -> Result<()> {
let signed_channel = self.inner.get_signed_channel_by_trader_id(trader_id)?;

let contract_id = signed_channel.get_contract_id();

if commons::is_eligible_for_rollover(OffsetDateTime::now_utc(), network)
// not expired
&& OffsetDateTime::now_utc() < expiry_timestamp
Expand All @@ -212,16 +210,15 @@ impl Node {
return Ok(());
}

tracing::debug!(%trader_id, "Notifying user about rollover");

let message = OrderbookMessage::TraderMessage {
trader_id,
message: commons::Message::Rollover(contract_id.map(hex::encode)),
notification,
};
tracing::debug!(%trader_id, "Push notifying user about rollover");

if let Err(e) = notifier.send(message).await {
tracing::debug!("Failed to notify trader. Error: {e:#}");
if let Some(notification) = notification {
if let Err(e) = notifier
.send(Notification::new(trader_id, notification))
.await
{
tracing::warn!("Failed to push notify trader. Error: {e:#}");
}
}

if self.is_connected(trader_id) {
Expand Down Expand Up @@ -258,7 +255,7 @@ impl Node {

let contract_id = self
.inner
.propose_dlc_channel_update(dlc_channel_id, contract_input, protocol_id.into())
.propose_rollover(dlc_channel_id, contract_input, protocol_id.into())
.await?;

let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone());
Expand Down
Loading

0 comments on commit bdca42b

Please sign in to comment.