From 3d60857992512f6451ab6bd244146174529f910d Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Tue, 30 Apr 2024 18:00:54 +0200 Subject: [PATCH 1/3] refactor: Send push notifications to trader id This simplifies the notification service as the caller does not have to fetch the fcm token to use that service. Instead the caller only needs to know the pubkey of the user to be notified, which is much easier to get in various contexts. --- coordinator/src/bin/coordinator.rs | 5 +-- coordinator/src/campaign.rs | 24 ++------------ coordinator/src/db/mod.rs | 1 - coordinator/src/db/orders_helper.rs | 50 ----------------------------- coordinator/src/message.rs | 24 ++------------ coordinator/src/notifications.rs | 46 ++++++++++++++++++++++---- coordinator/src/scheduler.rs | 18 +++++------ 7 files changed, 57 insertions(+), 111 deletions(-) delete mode 100644 coordinator/src/db/orders_helper.rs diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 28f92e60f..939a36ec6 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -248,10 +248,10 @@ async fn main() -> Result<()> { let (tx_orderbook_feed, _rx) = broadcast::channel(100); - let notification_service = NotificationService::new(opts.fcm_api_key.clone()); + let notification_service = + NotificationService::new(opts.fcm_api_key.clone(), node.pool.clone()); let (_handle, auth_users_notifier) = spawn_delivering_messages_to_authenticated_users( - pool.clone(), notification_service.get_sender(), tx_user_feed.clone(), ); @@ -260,6 +260,7 @@ async fn main() -> Result<()> { node.clone(), tx_orderbook_feed.clone(), auth_users_notifier.clone(), + notification_service.get_sender(), network, node.inner.oracle_pubkey, ); diff --git a/coordinator/src/campaign.rs b/coordinator/src/campaign.rs index a15a1e3f4..5e53f01cf 100644 --- a/coordinator/src/campaign.rs +++ b/coordinator/src/campaign.rs @@ -1,5 +1,3 @@ -use crate::db; -use crate::notifications::FcmToken; use crate::notifications::Notification; use crate::notifications::NotificationKind; use crate::routes::AppState; @@ -28,22 +26,6 @@ pub async fn post_push_campaign( let params = params.0; tracing::info!(?params, "Sending campaign with push notifications"); - let mut conn = state - .pool - .get() - .map_err(|e| AppError::InternalServerError(format!("Could not get connection: {e:#}")))?; - - let users = db::user::get_users(&mut conn, params.node_ids) - .map_err(|e| AppError::InternalServerError(format!("Failed to get users: {e:#}")))?; - - let fcm_tokens = users - .iter() - .map(|user| user.fcm_token.clone()) - .filter(|token| !token.is_empty() && token != "unavailable") - .map(FcmToken::new) - .filter_map(Result::ok) - .collect::>(); - let notification_kind = NotificationKind::Custom { title: params.title.clone(), message: params.message.clone(), @@ -52,7 +34,7 @@ pub async fn post_push_campaign( tracing::info!( params.title, params.message, - receivers = fcm_tokens.len(), + receivers = params.node_ids.len(), "Sending push notification campaign", ); @@ -62,7 +44,7 @@ pub async fn post_push_campaign( state .notification_sender .send(Notification::new_batch( - fcm_tokens.clone(), + params.clone().node_ids, notification_kind, )) .await @@ -75,6 +57,6 @@ pub async fn post_push_campaign( "Sending push notification campaign (title: {}, message: {} to {} users", params.title, params.message, - fcm_tokens.len(), + params.node_ids.len(), )) } diff --git a/coordinator/src/db/mod.rs b/coordinator/src/db/mod.rs index 9a507c35e..34406e7e7 100644 --- a/coordinator/src/db/mod.rs +++ b/coordinator/src/db/mod.rs @@ -9,7 +9,6 @@ pub mod dlc_protocols; pub mod last_outbound_dlc_message; pub mod liquidity; pub mod liquidity_options; -pub mod orders_helper; pub mod polls; pub mod positions; pub mod reported_errors; diff --git a/coordinator/src/db/orders_helper.rs b/coordinator/src/db/orders_helper.rs deleted file mode 100644 index e885f54c4..000000000 --- a/coordinator/src/db/orders_helper.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::db::user; -use crate::notifications::FcmToken; -use crate::orderbook; -use diesel::Connection; -use diesel::PgConnection; -use diesel::QueryResult; -use xxi_node::commons; - -pub fn get_all_matched_market_orders_by_order_reason( - conn: &mut PgConnection, - order_reasons: Vec, -) -> QueryResult> { - let result = conn.transaction(|conn| { - let orders = orderbook::db::orders::get_all_matched_market_orders_by_order_reason( - conn, - order_reasons, - )?; - join_with_fcm_token(conn, orders) - })?; - - Ok(result) -} - -pub fn join_with_fcm_token( - conn: &mut PgConnection, - orders: Vec, -) -> QueryResult> { - let users = user::all(conn)?; - let result = orders - .into_iter() - // Join orders with users to add the FCM tokens. - // Filter out orders that don't have a FCM token stored in the users - // table which is with them. - // This can be done at the DB level if it ever becomes a performance issue. - .filter_map(|o| { - let maybe_fcm_token = users - .iter() - .find(|u| u.pubkey == o.trader_id.to_string() && !u.fcm_token.is_empty()) - .map(|u| FcmToken::new(u.fcm_token.clone()).expect("To have a non-empty token.")); - - if let Some(fcm_token) = maybe_fcm_token { - Some((o, fcm_token)) - } else { - tracing::warn!(?o, "No FCM token for order"); - None - } - }) - .collect::>(); - diesel::result::QueryResult::Ok(result) -} diff --git a/coordinator/src/message.rs b/coordinator/src/message.rs index ed90af7d8..1601e6e9f 100644 --- a/coordinator/src/message.rs +++ b/coordinator/src/message.rs @@ -1,13 +1,8 @@ -use crate::db::user; -use crate::notifications::FcmToken; use crate::notifications::Notification; use crate::notifications::NotificationKind; use anyhow::Context; use anyhow::Result; use bitcoin::secp256k1::PublicKey; -use diesel::r2d2::ConnectionManager; -use diesel::r2d2::Pool; -use diesel::PgConnection; use futures::future::RemoteHandle; use futures::FutureExt; use parking_lot::RwLock; @@ -17,10 +12,9 @@ use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; -use tokio::task::spawn_blocking; use xxi_node::commons::Message; -/// This value is arbitrarily set to 100 and defines theff message accepted in the message +/// This value is arbitrarily set to 100 and defines the message accepted in the message /// channel buffer. const NOTIFICATION_BUFFER_SIZE: usize = 100; @@ -41,7 +35,6 @@ pub struct NewUserMessage { } pub fn spawn_delivering_messages_to_authenticated_users( - pool: Pool>, notification_sender: Sender, tx_user_feed: broadcast::Sender, ) -> (RemoteHandle<()>, Sender) { @@ -76,7 +69,6 @@ pub fn spawn_delivering_messages_to_authenticated_users( async move { while let Some(notification) = receiver.recv().await { if let Err(e) = process_orderbook_message( - pool.clone(), &authenticated_users, ¬ification_sender, notification, @@ -98,15 +90,10 @@ pub fn spawn_delivering_messages_to_authenticated_users( } async fn process_orderbook_message( - pool: Pool>, authenticated_users: &RwLock>>, notification_sender: &Sender, notification: OrderbookMessage, ) -> Result<()> { - let mut conn = spawn_blocking(move || pool.get()) - .await - .expect("task to complete")?; - match notification { OrderbookMessage::TraderMessage { trader_id, @@ -133,16 +120,11 @@ async fn process_orderbook_message( None => tracing::warn!(%trader_id, "Trader is not connected"), }; - let user = user::by_id(&mut conn, trader_id.to_string()) - .context("Failed to get user by ID")?; - - if let (Some(notification_kind), Some(user)) = (notification, user) { + if let Some(notification_kind) = notification { tracing::debug!(%trader_id, "Sending push notification to user"); - let fcm_token = FcmToken::new(user.fcm_token)?; - notification_sender - .send(Notification::new(fcm_token, notification_kind)) + .send(Notification::new(trader_id, notification_kind)) .await .with_context(|| { format!("Failed to send push notification to trader {trader_id}") diff --git a/coordinator/src/notifications.rs b/coordinator/src/notifications.rs index e23d0047d..d623f4aa9 100644 --- a/coordinator/src/notifications.rs +++ b/coordinator/src/notifications.rs @@ -1,6 +1,11 @@ +use crate::db; use anyhow::ensure; use anyhow::Context; use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; use std::fmt::Display; use tokio::sync::mpsc; @@ -29,22 +34,22 @@ impl Display for NotificationKind { #[derive(Debug, Clone)] pub struct Notification { - fcm_tokens: Vec, + trader_ids: Vec, notification_kind: NotificationKind, } impl Notification { - pub fn new(user_fcm_token: FcmToken, notification_kind: NotificationKind) -> Self { + pub fn new(trader_id: PublicKey, notification_kind: NotificationKind) -> Self { Self { notification_kind, - fcm_tokens: vec![user_fcm_token], + trader_ids: vec![trader_id], } } - pub fn new_batch(fcm_tokens: Vec, notification_kind: NotificationKind) -> Self { + pub fn new_batch(trader_ids: Vec, notification_kind: NotificationKind) -> Self { Self { notification_kind, - fcm_tokens, + trader_ids, } } } @@ -59,7 +64,7 @@ impl NotificationService { /// /// If an empty string is passed in the constructor, the service will not send any notification. /// It will only log the notification that it would have sent. - pub fn new(fcm_api_key: String) -> Self { + pub fn new(fcm_api_key: String, pool: Pool>) -> Self { if fcm_api_key.is_empty() { // Log it as error, as in production it should always be set tracing::error!("FCM API key is empty. No notifications will not be sent."); @@ -72,10 +77,37 @@ impl NotificationService { let client = fcm::Client::new(); async move { while let Some(Notification { - fcm_tokens, + trader_ids, notification_kind, }) = notification_receiver.recv().await { + let result = tokio::task::spawn_blocking({ + let pool = pool.clone(); + move || { + let mut conn = pool.get()?; + let users = db::user::get_users(&mut conn, trader_ids)?; + anyhow::Ok(users) + } + }) + .await + .expect("task to complete"); + + let users = match result { + Ok(users) => users, + Err(e) => { + tracing::error!("Failed to fetch users. Error: {e:#}"); + continue; + } + }; + + let fcm_tokens = users + .iter() + .map(|user| user.fcm_token.clone()) + .filter(|token| !token.is_empty() && token != "unavailable") + .map(FcmToken::new) + .filter_map(Result::ok) + .collect::>(); + for user_fcm_token in fcm_tokens { tracing::info!(%notification_kind, %user_fcm_token, "Sending notification"); diff --git a/coordinator/src/scheduler.rs b/coordinator/src/scheduler.rs index c09c94a36..12279dc9c 100644 --- a/coordinator/src/scheduler.rs +++ b/coordinator/src/scheduler.rs @@ -1,9 +1,9 @@ use crate::db; -use crate::db::orders_helper::get_all_matched_market_orders_by_order_reason; use crate::message::OrderbookMessage; use crate::node::Node; use crate::notifications::Notification; use crate::notifications::NotificationKind; +use crate::orderbook; use crate::referrals; use crate::settings::Settings; use anyhow::Result; @@ -283,17 +283,17 @@ fn build_remind_to_close_expired_position_notification_job( // Note, positions that are expired longer than // [`crate::node::expired_positions::EXPIRED_POSITION_TIMEOUT`] are set to closing, hence // those positions will not get notified anymore afterwards. - match get_all_matched_market_orders_by_order_reason( + match orderbook::db::orders::get_all_matched_market_orders_by_order_reason( &mut conn, vec![commons::OrderReason::Expired], ) { - Ok(positions_with_token) => Box::pin({ + Ok(orders) => Box::pin({ async move { - for (order, fcm_token) in positions_with_token { + for order in orders { tracing::debug!(trader_id=%order.trader_id, "Sending reminder to close expired position."); if let Err(e) = notification_sender .send(Notification::new( - fcm_token.clone(), + order.trader_id, NotificationKind::PositionExpired, )) .await @@ -332,16 +332,16 @@ fn build_remind_to_close_liquidated_position_notification_job( // Note, positions that are liquidated longer than // [`crate::node::liquidated_positions::LIQUIDATED_POSITION_TIMEOUT`] are set to closing, // hence those positions will not get notified anymore afterwards. - match get_all_matched_market_orders_by_order_reason( + match orderbook::db::orders::get_all_matched_market_orders_by_order_reason( &mut conn, vec![ commons::OrderReason::TraderLiquidated, commons::OrderReason::CoordinatorLiquidated, ], ) { - Ok(orders_with_token) => Box::pin({ + Ok(orders) => Box::pin({ async move { - for (order, fcm_token) in orders_with_token { + for order in orders { tracing::debug!(trader_id=%order.trader_id, "Sending reminder to close liquidated position."); let notification_kind = NotificationKind::Custom { @@ -351,7 +351,7 @@ fn build_remind_to_close_liquidated_position_notification_job( if let Err(e) = notification_sender .send(Notification::new( - fcm_token.clone(), + order.trader_id, notification_kind.clone(), )) .await From ce3f9f7e14868c9523ad9d72f2457083ddb9e7c6 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Tue, 30 Apr 2024 21:39:39 +0200 Subject: [PATCH 2/3] feat: Move async match and rollover to offer messages - Introduces a new protocol message to differentiate between a Rollover and a regular Renewal - Removes the async match websocket event and adds the filled_with and order to the `TenTenOneSettleOffer` message - Removes the match websocket event and adds the filled_with tto the `TenTenOneChannelOffer` and `TenTenOneRenewOffer` messages. --- Cargo.lock | 1 + .../down.sql | 1 + .../up.sql | 3 + coordinator/src/bin/coordinator.rs | 5 +- coordinator/src/db/custom_types.rs | 2 + coordinator/src/db/dlc_messages.rs | 3 + coordinator/src/node/rollover.rs | 29 +-- coordinator/src/orderbook/async_match.rs | 27 +- coordinator/src/orderbook/trading.rs | 69 +++--- coordinator/src/scheduler.rs | 14 +- coordinator/src/trade/mod.rs | 26 +- crates/xxi-node/Cargo.toml | 1 + crates/xxi-node/src/commons/message.rs | 16 -- crates/xxi-node/src/commons/trade.rs | 4 + crates/xxi-node/src/dlc_message.rs | 8 + crates/xxi-node/src/message_handler.rs | 230 +++++++++++++++--- crates/xxi-node/src/node/dlc_channel.rs | 29 ++- crates/xxi-node/src/node/dlc_manager.rs | 7 +- ...sage_handler__tests__reject_writeable.snap | 7 + ...ts__settle_offer_impl_serde_writeable.snap | 7 + crates/xxi-node/src/tests/dlc_channel.rs | 12 +- crates/xxi-node/src/tests/mod.rs | 35 +++ mobile/native/src/db/custom_types.rs | 1 + mobile/native/src/db/dlc_messages.rs | 3 + mobile/native/src/dlc/node.rs | 134 ++++++++-- mobile/native/src/orderbook.rs | 44 ---- mobile/native/src/trade/order/handler.rs | 12 +- 27 files changed, 507 insertions(+), 223 deletions(-) create mode 100644 coordinator/migrations/2024-04-30-211331_add_rollover_offer_dlc_message_type/down.sql create mode 100644 coordinator/migrations/2024-04-30-211331_add_rollover_offer_dlc_message_type/up.sql create mode 100644 crates/xxi-node/src/snapshots/xxi_node__message_handler__tests__reject_writeable.snap create mode 100644 crates/xxi-node/src/snapshots/xxi_node__message_handler__tests__settle_offer_impl_serde_writeable.snap diff --git a/Cargo.lock b/Cargo.lock index bf456bb9a..6f02e9f87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5336,6 +5336,7 @@ dependencies = [ "getrandom", "hex", "hkdf", + "insta", "lightning", "log", "mempool", diff --git a/coordinator/migrations/2024-04-30-211331_add_rollover_offer_dlc_message_type/down.sql b/coordinator/migrations/2024-04-30-211331_add_rollover_offer_dlc_message_type/down.sql new file mode 100644 index 000000000..ab290eb4c --- /dev/null +++ b/coordinator/migrations/2024-04-30-211331_add_rollover_offer_dlc_message_type/down.sql @@ -0,0 +1 @@ +select 1; diff --git a/coordinator/migrations/2024-04-30-211331_add_rollover_offer_dlc_message_type/up.sql b/coordinator/migrations/2024-04-30-211331_add_rollover_offer_dlc_message_type/up.sql new file mode 100644 index 000000000..fc878a2ae --- /dev/null +++ b/coordinator/migrations/2024-04-30-211331_add_rollover_offer_dlc_message_type/up.sql @@ -0,0 +1,3 @@ +ALTER TYPE "Message_Type_Type" + ADD + VALUE IF NOT EXISTS 'RolloverOffer'; diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 939a36ec6..f12af76e0 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -274,7 +274,7 @@ async fn main() -> Result<()> { let _handle = rollover::monitor( pool.clone(), node_event_handler.subscribe(), - auth_users_notifier.clone(), + notification_service.get_sender(), network, node.clone(), ); @@ -332,8 +332,7 @@ async fn main() -> Result<()> { ); let sender = notification_service.get_sender(); - let notification_scheduler = - NotificationScheduler::new(sender, settings, network, node, auth_users_notifier); + let notification_scheduler = NotificationScheduler::new(sender, settings, network, node); tokio::spawn({ let pool = pool.clone(); let scheduler = notification_scheduler; diff --git a/coordinator/src/db/custom_types.rs b/coordinator/src/db/custom_types.rs index 312551d75..acf4e24a8 100644 --- a/coordinator/src/db/custom_types.rs +++ b/coordinator/src/db/custom_types.rs @@ -105,6 +105,7 @@ impl ToSql for MessageType { MessageType::SettleConfirm => out.write_all(b"SettleConfirm")?, MessageType::SettleFinalize => out.write_all(b"SettleFinalize")?, MessageType::RenewOffer => out.write_all(b"RenewOffer")?, + MessageType::RolloverOffer => out.write_all(b"RolloverOffer")?, MessageType::RenewAccept => out.write_all(b"RenewAccept")?, MessageType::RenewConfirm => out.write_all(b"RenewConfirm")?, MessageType::RenewFinalize => out.write_all(b"RenewFinalize")?, @@ -127,6 +128,7 @@ impl FromSql for MessageType { b"SettleConfirm" => Ok(MessageType::SettleConfirm), b"SettleFinalize" => Ok(MessageType::SettleFinalize), b"RenewOffer" => Ok(MessageType::RenewOffer), + b"RolloverOffer" => Ok(MessageType::RolloverOffer), b"RenewAccept" => Ok(MessageType::RenewAccept), b"RenewConfirm" => Ok(MessageType::RenewConfirm), b"RenewFinalize" => Ok(MessageType::RenewFinalize), diff --git a/coordinator/src/db/dlc_messages.rs b/coordinator/src/db/dlc_messages.rs index 504a25b16..99ba11cfc 100644 --- a/coordinator/src/db/dlc_messages.rs +++ b/coordinator/src/db/dlc_messages.rs @@ -32,6 +32,7 @@ pub(crate) enum MessageType { SettleConfirm, SettleFinalize, RenewOffer, + RolloverOffer, RenewAccept, RenewConfirm, RenewFinalize, @@ -102,6 +103,7 @@ impl From for MessageType { xxi_node::dlc_message::DlcMessageType::SettleConfirm => Self::SettleConfirm, xxi_node::dlc_message::DlcMessageType::SettleFinalize => Self::SettleFinalize, xxi_node::dlc_message::DlcMessageType::RenewOffer => Self::RenewOffer, + xxi_node::dlc_message::DlcMessageType::RolloverOffer => Self::RolloverOffer, xxi_node::dlc_message::DlcMessageType::RenewAccept => Self::RenewAccept, xxi_node::dlc_message::DlcMessageType::RenewConfirm => Self::RenewConfirm, xxi_node::dlc_message::DlcMessageType::RenewFinalize => Self::RenewFinalize, @@ -137,6 +139,7 @@ impl From for xxi_node::dlc_message::DlcMessageType { MessageType::SettleConfirm => xxi_node::dlc_message::DlcMessageType::SettleConfirm, MessageType::SettleFinalize => xxi_node::dlc_message::DlcMessageType::SettleFinalize, MessageType::RenewOffer => xxi_node::dlc_message::DlcMessageType::RenewOffer, + MessageType::RolloverOffer => xxi_node::dlc_message::DlcMessageType::RolloverOffer, MessageType::RenewAccept => xxi_node::dlc_message::DlcMessageType::RenewAccept, MessageType::RenewConfirm => xxi_node::dlc_message::DlcMessageType::RenewConfirm, MessageType::RenewFinalize => xxi_node::dlc_message::DlcMessageType::RenewFinalize, diff --git a/coordinator/src/node/rollover.rs b/coordinator/src/node/rollover.rs index 09a586b7f..57806db5e 100644 --- a/coordinator/src/node/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -4,8 +4,8 @@ use crate::db::positions; use crate::dlc_protocol; use crate::dlc_protocol::DlcProtocolType; use crate::dlc_protocol::ProtocolId; -use crate::message::OrderbookMessage; use crate::node::Node; +use crate::notifications::Notification; use crate::notifications::NotificationKind; use crate::position::models::PositionState; use anyhow::bail; @@ -53,7 +53,7 @@ struct Rollover { pub fn monitor( pool: Pool>, mut receiver: broadcast::Receiver, - notifier: mpsc::Sender, + notifier: mpsc::Sender, network: Network, node: Node, ) -> RemoteHandle<()> { @@ -156,7 +156,7 @@ impl Node { async fn check_if_eligible_for_rollover( &self, pool: Pool>, - notifier: mpsc::Sender, + notifier: mpsc::Sender, trader_id: PublicKey, network: Network, ) -> Result<()> { @@ -195,13 +195,11 @@ impl Node { trader_id: PublicKey, expiry_timestamp: OffsetDateTime, network: Network, - notifier: &mpsc::Sender, + notifier: &mpsc::Sender, notification: Option, ) -> Result<()> { let signed_channel = self.inner.get_signed_channel_by_trader_id(trader_id)?; - let contract_id = signed_channel.get_contract_id(); - if commons::is_eligible_for_rollover(OffsetDateTime::now_utc(), network) // not expired && OffsetDateTime::now_utc() < expiry_timestamp @@ -212,16 +210,15 @@ impl Node { return Ok(()); } - tracing::debug!(%trader_id, "Notifying user about rollover"); - - let message = OrderbookMessage::TraderMessage { - trader_id, - message: commons::Message::Rollover(contract_id.map(hex::encode)), - notification, - }; + tracing::debug!(%trader_id, "Push notifying user about rollover"); - if let Err(e) = notifier.send(message).await { - tracing::debug!("Failed to notify trader. Error: {e:#}"); + if let Some(notification) = notification { + if let Err(e) = notifier + .send(Notification::new(trader_id, notification)) + .await + { + tracing::warn!("Failed to push notify trader. Error: {e:#}"); + } } if self.is_connected(trader_id) { @@ -258,7 +255,7 @@ impl Node { let contract_id = self .inner - .propose_dlc_channel_update(dlc_channel_id, contract_input, protocol_id.into()) + .propose_dlc_channel_update(None, dlc_channel_id, contract_input, protocol_id.into()) .await?; let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); diff --git a/coordinator/src/orderbook/async_match.rs b/coordinator/src/orderbook/async_match.rs index 89a9905a8..8ed58d989 100644 --- a/coordinator/src/orderbook/async_match.rs +++ b/coordinator/src/orderbook/async_match.rs @@ -23,8 +23,6 @@ use xxi_node::commons::ContractSymbol; use xxi_node::commons::FilledWith; use xxi_node::commons::Match; use xxi_node::commons::Matches; -use xxi_node::commons::Message; -use xxi_node::commons::OrderReason; use xxi_node::commons::OrderState; use xxi_node::commons::TradeAndChannelParams; use xxi_node::commons::TradeParams; @@ -99,34 +97,11 @@ async fn process_pending_match( if let Some(order) = orders::get_by_trader_id_and_state(&mut conn, trader_id, OrderState::Matched)? { - tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match"); + tracing::debug!(%trader_id, order_id=%order.id, "Executing pending match"); let matches = matches::get_matches_by_order_id(&mut conn, order.id)?; - let filled_with = get_filled_with_from_matches(matches, network, oracle_pk)?; - let message = match order.order_reason { - OrderReason::Manual => Message::Match(filled_with.clone()), - OrderReason::Expired - | OrderReason::CoordinatorLiquidated - | OrderReason::TraderLiquidated => Message::AsyncMatch { - order: order.clone(), - filled_with: filled_with.clone(), - }, - }; - - // Sending no optional push notification as this is only executed if the user just - // registered on the websocket. So we can assume that the user is still online. - let notification = None; - let msg = OrderbookMessage::TraderMessage { - trader_id, - message, - notification, - }; - if let Err(e) = notifier.send(msg).await { - tracing::error!("Failed to send notification. Error: {e:#}"); - } - let channel_opening_params = db::channel_opening_params::get_by_order_id(&mut conn, order.id)?; diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 6a0aa0de1..e1f0aa28b 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,6 +1,7 @@ use crate::db; use crate::message::OrderbookMessage; use crate::node::Node; +use crate::notifications::Notification; use crate::notifications::NotificationKind; use crate::orderbook::db::matches; use crate::orderbook::db::orders; @@ -69,7 +70,8 @@ pub struct TraderMatchParams { pub fn start( node: Node, tx_orderbook_feed: broadcast::Sender, - notifier: mpsc::Sender, + trade_notifier: mpsc::Sender, + notifier: mpsc::Sender, network: Network, oracle_pk: XOnlyPublicKey, ) -> (RemoteHandle<()>, mpsc::Sender) { @@ -80,6 +82,7 @@ pub fn start( tokio::spawn({ let tx_orderbook_feed = tx_orderbook_feed.clone(); let notifier = notifier.clone(); + let trade_notifier = trade_notifier.clone(); let node = node.clone(); async move { let new_order = new_order_msg.order; @@ -99,6 +102,7 @@ pub fn start( process_new_market_order( node, notifier.clone(), + trade_notifier.clone(), new_order, network, oracle_pk, @@ -117,7 +121,7 @@ pub fn start( } { // TODO(holzeis): the maker is currently not subscribed to the websocket // api, hence it wouldn't receive the error message. - if let Err(e) = notifier + if let Err(e) = trade_notifier .send(OrderbookMessage::TraderMessage { trader_id, message: TradeError { order_id, error }, @@ -177,7 +181,8 @@ pub async fn process_new_limit_order( // happen in a single transaction to ensure either all data or nothing is stored to the database. pub async fn process_new_market_order( node: Node, - notifier: mpsc::Sender, + notifier: mpsc::Sender, + trade_notifier: mpsc::Sender, order: Order, network: Network, oracle_pk: XOnlyPublicKey, @@ -266,16 +271,6 @@ pub async fn process_new_market_order( tracing::info!(%trader_id, order_id, "Notifying trader about match"); - let message = match &order.order_reason { - OrderReason::Manual => Message::Match(match_param.filled_with.clone()), - OrderReason::Expired - | OrderReason::TraderLiquidated - | OrderReason::CoordinatorLiquidated => Message::AsyncMatch { - order: order.clone(), - filled_with: match_param.filled_with.clone(), - }, - }; - let notification = match &order.order_reason { OrderReason::Expired => Some(NotificationKind::PositionExpired), OrderReason::TraderLiquidated => Some(NotificationKind::Custom { @@ -289,32 +284,28 @@ pub async fn process_new_market_order( OrderReason::Manual => None, }; - let msg = OrderbookMessage::TraderMessage { - trader_id, - message, - notification, - }; + if let Some(notification) = notification { + // send user a push notification + notifier + .send(Notification::new(order.trader_id, notification)) + .await + .with_context(|| { + format!( + "Failed to send push notification. trader_id = {}", + order.trader_id + ) + })?; + } - let order_state = match notifier.send(msg).await { - Ok(()) => { - tracing::debug!(%trader_id, order_id, "Successfully notified trader"); - OrderState::Matched - } - Err(e) => { - tracing::warn!(%trader_id, order_id, "Failed to send trader message: {e:#}"); - - if order.order_type == OrderType::Limit { - // FIXME: The maker is currently not connected to the WebSocket so we can't - // notify him about a trade. However, trades are always accepted by the - // maker at the moment so in order to not have all limit orders in order - // state `Match` we are setting the order to `Taken` even if we couldn't - // notify the maker. - - OrderState::Taken - } else { - OrderState::Matched - } - } + let order_state = if order.order_type == OrderType::Limit { + // FIXME: The maker is currently not connected to the WebSocket so we can't + // notify him about a trade. However, trades are always accepted by the + // maker at the moment so in order to not have all limit orders in order + // state `Match` we are setting the order to `Taken` even if we couldn't + // notify the maker. + OrderState::Taken + } else { + OrderState::Matched }; tracing::debug!(%trader_id, order_id, "Updating the order state to {order_state:?}"); @@ -330,7 +321,7 @@ pub async fn process_new_market_order( if node.inner.is_connected(order.trader_id) { tracing::info!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Executing trade for match"); - let trade_executor = TradeExecutor::new(node.clone(), notifier); + let trade_executor = TradeExecutor::new(node.clone(), trade_notifier); trade_executor .execute(&TradeAndChannelParams { diff --git a/coordinator/src/scheduler.rs b/coordinator/src/scheduler.rs index 12279dc9c..4ae855819 100644 --- a/coordinator/src/scheduler.rs +++ b/coordinator/src/scheduler.rs @@ -1,5 +1,4 @@ use crate::db; -use crate::message::OrderbookMessage; use crate::node::Node; use crate::notifications::Notification; use crate::notifications::NotificationKind; @@ -24,7 +23,6 @@ pub struct NotificationScheduler { settings: Settings, network: Network, node: Node, - notifier: mpsc::Sender, } impl NotificationScheduler { @@ -33,7 +31,6 @@ impl NotificationScheduler { settings: Settings, network: Network, node: Node, - notifier: mpsc::Sender, ) -> Self { let scheduler = JobScheduler::new() .await @@ -45,7 +42,6 @@ impl NotificationScheduler { settings, network, node, - notifier, } } @@ -117,7 +113,7 @@ impl NotificationScheduler { let schedule = self.settings.rollover_window_open_scheduler.clone(); let network = self.network; let node = self.node.clone(); - let notifier = self.notifier.clone(); + let sender = self.sender.clone(); let uuid = self .scheduler @@ -127,7 +123,7 @@ impl NotificationScheduler { network, NotificationKind::RolloverWindowOpen, node, - notifier, + sender, )?) .await?; tracing::debug!( @@ -144,7 +140,7 @@ impl NotificationScheduler { let schedule = self.settings.rollover_window_close_scheduler.clone(); let network = self.network; let node = self.node.clone(); - let notifier = self.notifier.clone(); + let sender = self.sender.clone(); let uuid = self .scheduler @@ -154,7 +150,7 @@ impl NotificationScheduler { network, NotificationKind::PositionSoonToExpire, node, - notifier, + sender, )?) .await?; @@ -177,7 +173,7 @@ fn build_rollover_notification_job( network: Network, notification: NotificationKind, node: Node, - notifier: mpsc::Sender, + notifier: mpsc::Sender, ) -> Result { Job::new_async(schedule, move |_, _| { let notifier = notifier.clone(); diff --git a/coordinator/src/trade/mod.rs b/coordinator/src/trade/mod.rs index f970a90e9..89f2d256a 100644 --- a/coordinator/src/trade/mod.rs +++ b/coordinator/src/trade/mod.rs @@ -43,6 +43,7 @@ use xxi_node::cfd::calculate_long_liquidation_price; use xxi_node::cfd::calculate_margin; use xxi_node::cfd::calculate_pnl; use xxi_node::cfd::calculate_short_liquidation_price; +use xxi_node::commons; use xxi_node::commons::Direction; use xxi_node::commons::MatchState; use xxi_node::commons::Message; @@ -236,6 +237,7 @@ impl TradeExecutor { } => self .start_closing_position( &mut connection, + order, &position, ¶ms.trade_params, channel_id, @@ -363,7 +365,12 @@ impl TradeExecutor { let (temporary_contract_id, temporary_channel_id) = self .node .inner - .propose_dlc_channel(contract_input, trade_params.pubkey, protocol_id.into()) + .propose_dlc_channel( + trade_params.filled_with.clone(), + contract_input, + trade_params.pubkey, + protocol_id.into(), + ) .await .context("Could not propose DLC channel")?; @@ -537,7 +544,12 @@ impl TradeExecutor { let temporary_contract_id = self .node .inner - .propose_dlc_channel_update(&dlc_channel_id, contract_input, protocol_id.into()) + .propose_dlc_channel_update( + Some(trade_params.filled_with.clone()), + &dlc_channel_id, + contract_input, + protocol_id.into(), + ) .await .context("Could not propose DLC channel update")?; @@ -705,7 +717,12 @@ impl TradeExecutor { let temporary_contract_id = self .node .inner - .propose_dlc_channel_update(&dlc_channel_id, contract_input, protocol_id.into()) + .propose_dlc_channel_update( + Some(trade_params.filled_with.clone()), + &dlc_channel_id, + contract_input, + protocol_id.into(), + ) .await .context("Could not propose DLC channel update")?; @@ -802,6 +819,7 @@ impl TradeExecutor { pub async fn start_closing_position( &self, conn: &mut PgConnection, + order: commons::Order, position: &Position, trade_params: &TradeParams, channel_id: DlcChannelId, @@ -856,6 +874,8 @@ impl TradeExecutor { self.node .inner .propose_dlc_channel_collaborative_settlement( + order, + trade_params.filled_with.clone(), &channel_id, settlement_amount_trader, protocol_id.into(), diff --git a/crates/xxi-node/Cargo.toml b/crates/xxi-node/Cargo.toml index 297f03f45..d688a852e 100644 --- a/crates/xxi-node/Cargo.toml +++ b/crates/xxi-node/Cargo.toml @@ -56,6 +56,7 @@ version = "*" features = ["js"] # Has no effect on other targets [dev-dependencies] +insta = { version = "1" } secp256k1 = { version = "0.27.0", features = ["serde", "rand", "global-context"] } time = { version = "0.3", features = ["serde"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/xxi-node/src/commons/message.rs b/crates/xxi-node/src/commons/message.rs index 3f4482300..0583f128b 100644 --- a/crates/xxi-node/src/commons/message.rs +++ b/crates/xxi-node/src/commons/message.rs @@ -1,6 +1,5 @@ use crate::commons::order::Order; use crate::commons::signature::Signature; -use crate::commons::trade::FilledWith; use crate::commons::LiquidityOption; use crate::commons::NewLimitOrder; use crate::commons::ReferralStatus; @@ -27,12 +26,6 @@ pub enum Message { Update(Order), InvalidAuthentication(String), Authenticated(TenTenOneConfig), - Match(FilledWith), - AsyncMatch { - order: Order, - filled_with: FilledWith, - }, - Rollover(Option), /// Message used to collaboratively revert DLC channels. DlcChannelCollaborativeRevert { channel_id: DlcChannelId, @@ -118,15 +111,6 @@ impl Display for Message { Message::Authenticated(_) => { write!(f, "Authenticated") } - Message::Match(_) => { - write!(f, "Match") - } - Message::AsyncMatch { .. } => { - write!(f, "AsyncMatch") - } - Message::Rollover(_) => { - write!(f, "Rollover") - } Message::DlcChannelCollaborativeRevert { .. } => { write!(f, "DlcChannelCollaborativeRevert") } diff --git a/crates/xxi-node/src/commons/trade.rs b/crates/xxi-node/src/commons/trade.rs index 2e364f607..fbcaf09b3 100644 --- a/crates/xxi-node/src/commons/trade.rs +++ b/crates/xxi-node/src/commons/trade.rs @@ -123,6 +123,10 @@ pub struct FilledWith { /// The id of the order defined by the orderbook /// /// The identifier of the order as defined by the orderbook. + /// + /// TODO(holzeis): We might want to consider adding the order to the filled with struct. Having + /// this separated doesn't make much sense anymore since, the filled with is not separately + /// processed by the app anymore. pub order_id: Uuid, /// The expiry timestamp of the contract-to-be diff --git a/crates/xxi-node/src/dlc_message.rs b/crates/xxi-node/src/dlc_message.rs index de1c3a316..d41456b39 100644 --- a/crates/xxi-node/src/dlc_message.rs +++ b/crates/xxi-node/src/dlc_message.rs @@ -58,6 +58,7 @@ pub enum DlcMessageType { SettleConfirm, SettleFinalize, RenewOffer, + RolloverOffer, RenewAccept, RenewConfirm, RenewFinalize, @@ -98,6 +99,9 @@ impl TryFrom<&SerializedDlcMessage> for TenTenOneMessage { DlcMessageType::RenewOffer => { TenTenOneMessage::RenewOffer(serde_json::from_str(&serialized_msg.message)?) } + DlcMessageType::RolloverOffer => { + TenTenOneMessage::RolloverOffer(serde_json::from_str(&serialized_msg.message)?) + } DlcMessageType::RenewAccept => { TenTenOneMessage::RenewAccept(serde_json::from_str(&serialized_msg.message)?) } @@ -151,6 +155,10 @@ impl TryFrom<&TenTenOneMessage> for SerializedDlcMessage { serde_json::to_string(&renew_offer)?, DlcMessageType::RenewOffer, ), + TenTenOneMessage::RolloverOffer(rollover_offer) => ( + serde_json::to_string(&rollover_offer)?, + DlcMessageType::RolloverOffer, + ), TenTenOneMessage::RenewAccept(renew_accept) => ( serde_json::to_string(&renew_accept)?, DlcMessageType::RenewAccept, diff --git a/crates/xxi-node/src/message_handler.rs b/crates/xxi-node/src/message_handler.rs index 0535fef8b..d67a2d985 100644 --- a/crates/xxi-node/src/message_handler.rs +++ b/crates/xxi-node/src/message_handler.rs @@ -1,7 +1,9 @@ use crate::bitcoin_conversion::to_secp_pk_30; +use crate::commons::FilledWith; +use crate::commons::Order; use crate::node::event::NodeEvent; use crate::node::event::NodeEventHandler; -use anyhow::bail; +use anyhow::Result; use dlc_manager::ReferenceId; use dlc_messages::channel::AcceptChannel; use dlc_messages::channel::CollaborativeCloseOffer; @@ -25,6 +27,8 @@ use dlc_messages::segmentation::get_segments; use dlc_messages::segmentation::segment_reader::SegmentReader; use dlc_messages::segmentation::SegmentChunk; use dlc_messages::segmentation::SegmentStart; +use dlc_messages::ser_impls::read_string; +use dlc_messages::ser_impls::write_string; use dlc_messages::ChannelMessage; use dlc_messages::Message; use lightning::events::OnionMessageProvider; @@ -91,7 +95,7 @@ impl OnionMessageHandler for TenTenOneMessageHandler { their_node_id: &PublicKey, _init: &msgs::Init, inbound: bool, - ) -> anyhow::Result<(), ()> { + ) -> Result<(), ()> { tracing::info!(%their_node_id, inbound, "Peer connected!"); self.handler.publish(NodeEvent::Connected { @@ -128,6 +132,7 @@ pub enum TenTenOneMessage { SettleConfirm(TenTenOneSettleConfirm), SettleFinalize(TenTenOneSettleFinalize), RenewOffer(TenTenOneRenewOffer), + RolloverOffer(TenTenOneRolloverOffer), RenewAccept(TenTenOneRenewAccept), RenewConfirm(TenTenOneRenewConfirm), RenewFinalize(TenTenOneRenewFinalize), @@ -142,6 +147,7 @@ pub struct TenTenOneReject { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct TenTenOneOfferChannel { + pub filled_with: FilledWith, pub offer_channel: OfferChannel, } @@ -155,8 +161,10 @@ pub struct TenTenOneSignChannel { pub sign_channel: SignChannel, } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct TenTenOneSettleOffer { + pub order: Order, + pub filled_with: FilledWith, pub settle_offer: SettleOffer, } @@ -177,6 +185,12 @@ pub struct TenTenOneSettleFinalize { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct TenTenOneRenewOffer { + pub filled_with: FilledWith, + pub renew_offer: RenewOffer, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct TenTenOneRolloverOffer { pub renew_offer: RenewOffer, } @@ -372,6 +386,7 @@ pub fn tentenone_message_name(msg: &TenTenOneMessage) -> String { TenTenOneMessage::SettleConfirm(_) => "SettleConfirm", TenTenOneMessage::SettleFinalize(_) => "SettleFinalize", TenTenOneMessage::RenewOffer(_) => "RenewOffer", + TenTenOneMessage::RolloverOffer(_) => "RolloverOffer", TenTenOneMessage::RenewAccept(_) => "RenewAccept", TenTenOneMessage::RenewConfirm(_) => "RenewConfirm", TenTenOneMessage::RenewFinalize(_) => "RenewFinalize", @@ -383,23 +398,19 @@ pub fn tentenone_message_name(msg: &TenTenOneMessage) -> String { name.to_string() } -impl TryFrom for TenTenOneMessage { - type Error = anyhow::Error; - - fn try_from(value: Message) -> Result { - let msg = match value { - Message::Channel(ChannelMessage::Offer(offer_channel)) => { - TenTenOneMessage::Offer(TenTenOneOfferChannel { offer_channel }) - } +impl TenTenOneMessage { + /// Builds a 10101 message from the rust-dlc response message. Note, a response can never return + /// an offer so if an offer is passed the function will panic. This is most likely not a future + /// proof solution as we'd might want to enrich the response with 10101 metadata as well. If + /// that happens we will have to rework this part. + pub fn build_from_response(message: Message) -> Self { + match message { Message::Channel(ChannelMessage::Accept(accept_channel)) => { TenTenOneMessage::Accept(TenTenOneAcceptChannel { accept_channel }) } Message::Channel(ChannelMessage::Sign(sign_channel)) => { TenTenOneMessage::Sign(TenTenOneSignChannel { sign_channel }) } - Message::Channel(ChannelMessage::SettleOffer(settle_offer)) => { - TenTenOneMessage::SettleOffer(TenTenOneSettleOffer { settle_offer }) - } Message::Channel(ChannelMessage::SettleAccept(settle_accept)) => { TenTenOneMessage::SettleAccept(TenTenOneSettleAccept { settle_accept }) } @@ -409,9 +420,6 @@ impl TryFrom for TenTenOneMessage { Message::Channel(ChannelMessage::SettleFinalize(settle_finalize)) => { TenTenOneMessage::SettleFinalize(TenTenOneSettleFinalize { settle_finalize }) } - Message::Channel(ChannelMessage::RenewOffer(renew_offer)) => { - TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { renew_offer }) - } Message::Channel(ChannelMessage::RenewAccept(renew_accept)) => { TenTenOneMessage::RenewAccept(TenTenOneRenewAccept { renew_accept }) } @@ -432,18 +440,21 @@ impl TryFrom for TenTenOneMessage { Message::Channel(ChannelMessage::Reject(reject)) => { TenTenOneMessage::Reject(TenTenOneReject { reject }) } - Message::OnChain(_) | Message::SubChannel(_) => bail!("Unexpected dlc message"), - }; - - Ok(msg) + Message::OnChain(_) + | Message::SubChannel(_) + | Message::Channel(ChannelMessage::RenewOffer(_)) + | Message::Channel(ChannelMessage::SettleOffer(_)) + | Message::Channel(ChannelMessage::Offer(_)) => { + unreachable!() + } + } } -} -impl TenTenOneMessage { pub fn get_reference_id(&self) -> Option { match self { TenTenOneMessage::Offer(TenTenOneOfferChannel { offer_channel: OfferChannel { reference_id, .. }, + .. }) | TenTenOneMessage::Accept(TenTenOneAcceptChannel { accept_channel: AcceptChannel { reference_id, .. }, @@ -453,6 +464,7 @@ impl TenTenOneMessage { }) | TenTenOneMessage::SettleOffer(TenTenOneSettleOffer { settle_offer: SettleOffer { reference_id, .. }, + .. }) | TenTenOneMessage::SettleAccept(TenTenOneSettleAccept { settle_accept: SettleAccept { reference_id, .. }, @@ -465,6 +477,10 @@ impl TenTenOneMessage { }) | TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { renew_offer: RenewOffer { reference_id, .. }, + .. + }) + | TenTenOneMessage::RolloverOffer(TenTenOneRolloverOffer { + renew_offer: RenewOffer { reference_id, .. }, }) | TenTenOneMessage::RenewAccept(TenTenOneRenewAccept { renew_accept: RenewAccept { reference_id, .. }, @@ -498,7 +514,7 @@ impl From for Message { impl From for ChannelMessage { fn from(value: TenTenOneMessage) -> Self { match value { - TenTenOneMessage::Offer(TenTenOneOfferChannel { offer_channel }) => { + TenTenOneMessage::Offer(TenTenOneOfferChannel { offer_channel, .. }) => { ChannelMessage::Offer(offer_channel) } TenTenOneMessage::Accept(TenTenOneAcceptChannel { accept_channel }) => { @@ -507,7 +523,7 @@ impl From for ChannelMessage { TenTenOneMessage::Sign(TenTenOneSignChannel { sign_channel }) => { ChannelMessage::Sign(sign_channel) } - TenTenOneMessage::SettleOffer(TenTenOneSettleOffer { settle_offer }) => { + TenTenOneMessage::SettleOffer(TenTenOneSettleOffer { settle_offer, .. }) => { ChannelMessage::SettleOffer(settle_offer) } TenTenOneMessage::SettleAccept(TenTenOneSettleAccept { settle_accept }) => { @@ -519,7 +535,10 @@ impl From for ChannelMessage { TenTenOneMessage::SettleFinalize(TenTenOneSettleFinalize { settle_finalize }) => { ChannelMessage::SettleFinalize(settle_finalize) } - TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { renew_offer }) => { + TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { renew_offer, .. }) => { + ChannelMessage::RenewOffer(renew_offer) + } + TenTenOneMessage::RolloverOffer(TenTenOneRolloverOffer { renew_offer }) => { ChannelMessage::RenewOffer(renew_offer) } TenTenOneMessage::RenewAccept(TenTenOneRenewAccept { renew_accept }) => { @@ -587,6 +606,24 @@ macro_rules! handle_read_tentenone_messages { }}; } +macro_rules! impl_serde_writeable { + ($st:ident) => { + impl Writeable for $st { + fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { + let serialized = serde_json::to_string(self)?; + write_string(&serialized, w) + } + } + + impl Readable for $st { + fn read(r: &mut R) -> Result { + let serialized = read_string(r)?; + serde_json::from_str(&serialized).map_err(|_| DecodeError::InvalidValue) + } + } + }; +} + impl_type_writeable_for_enum!(WireMessage, { Message, SegmentStart, SegmentChunk }); impl_type_writeable_for_enum!(TenTenOneMessage, { @@ -599,6 +636,7 @@ impl_type_writeable_for_enum!(TenTenOneMessage, SettleConfirm, SettleFinalize, RenewOffer, + RolloverOffer, RenewAccept, RenewConfirm, RenewFinalize, @@ -607,14 +645,15 @@ impl_type_writeable_for_enum!(TenTenOneMessage, }); impl_dlc_writeable!(TenTenOneReject, { (reject, writeable) }); -impl_dlc_writeable!(TenTenOneOfferChannel, { (offer_channel, writeable) }); +impl_dlc_writeable!(TenTenOneOfferChannel, { (filled_with, writeable), (offer_channel, writeable) }); impl_dlc_writeable!(TenTenOneAcceptChannel, { (accept_channel, writeable) }); impl_dlc_writeable!(TenTenOneSignChannel, { (sign_channel, writeable) }); -impl_dlc_writeable!(TenTenOneSettleOffer, { (settle_offer, writeable) }); +impl_dlc_writeable!(TenTenOneSettleOffer, { (order, writeable), (filled_with, writeable), (settle_offer, writeable) }); impl_dlc_writeable!(TenTenOneSettleAccept, { (settle_accept, writeable) }); impl_dlc_writeable!(TenTenOneSettleConfirm, { (settle_confirm, writeable) }); impl_dlc_writeable!(TenTenOneSettleFinalize, { (settle_finalize, writeable) }); -impl_dlc_writeable!(TenTenOneRenewOffer, { (renew_offer, writeable) }); +impl_dlc_writeable!(TenTenOneRenewOffer, { (filled_with, writeable), (renew_offer, writeable) }); +impl_dlc_writeable!(TenTenOneRolloverOffer, { (renew_offer, writeable) }); impl_dlc_writeable!(TenTenOneRenewAccept, { (renew_accept, writeable) }); impl_dlc_writeable!(TenTenOneRenewConfirm, { (renew_confirm, writeable) }); impl_dlc_writeable!(TenTenOneRenewFinalize, { (renew_finalize, writeable) }); @@ -632,6 +671,7 @@ impl_type!(SETTLE_CHANNEL_ACCEPT_TYPE, TenTenOneSettleAccept, 43008); impl_type!(SETTLE_CHANNEL_CONFIRM_TYPE, TenTenOneSettleConfirm, 43010); impl_type!(SETTLE_CHANNEL_FINALIZE_TYPE, TenTenOneSettleFinalize, 43012); impl_type!(RENEW_CHANNEL_OFFER_TYPE, TenTenOneRenewOffer, 43014); +impl_type!(ROLLOVER_CHANNEL_OFFER_TYPE, TenTenOneRolloverOffer, 43028); impl_type!(RENEW_CHANNEL_ACCEPT_TYPE, TenTenOneRenewAccept, 43016); impl_type!(RENEW_CHANNEL_CONFIRM_TYPE, TenTenOneRenewConfirm, 43018); impl_type!(RENEW_CHANNEL_FINALIZE_TYPE, TenTenOneRenewFinalize, 43020); @@ -642,6 +682,9 @@ impl_type!( 43022 ); +impl_serde_writeable!(Order); +impl_serde_writeable!(FilledWith); + fn read_tentenone_message( msg_type: u16, mut buffer: &mut R, @@ -658,6 +701,7 @@ fn read_tentenone_message( (SETTLE_CHANNEL_CONFIRM_TYPE, SettleConfirm), (SETTLE_CHANNEL_FINALIZE_TYPE, SettleFinalize), (RENEW_CHANNEL_OFFER_TYPE, RenewOffer), + (ROLLOVER_CHANNEL_OFFER_TYPE, RolloverOffer), (RENEW_CHANNEL_ACCEPT_TYPE, RenewAccept), (RENEW_CHANNEL_CONFIRM_TYPE, RenewConfirm), (RENEW_CHANNEL_FINALIZE_TYPE, RenewFinalize), @@ -665,3 +709,131 @@ fn read_tentenone_message( (COLLABORATIVE_CLOSE_OFFER_TYPE, CollaborativeCloseOffer) ) } + +#[cfg(test)] +mod tests { + use crate::commons; + use crate::commons::ContractSymbol; + use crate::commons::Direction; + use crate::commons::OrderReason; + use crate::commons::OrderState; + use crate::commons::OrderType; + use crate::message_handler::TenTenOneMessageHandler; + use crate::message_handler::TenTenOneReject; + use crate::message_handler::TenTenOneSettleOffer; + use crate::node::event::NodeEventHandler; + use anyhow::anyhow; + use anyhow::Result; + use dlc_manager::DlcChannelId; + use dlc_messages::channel::Reject; + use dlc_messages::channel::SettleOffer; + use insta::assert_debug_snapshot; + use lightning::ln::wire::CustomMessageReader; + use lightning::ln::wire::Type; + use lightning::util::ser::Readable; + use lightning::util::ser::Writeable; + use secp256k1::PublicKey; + use std::io::Cursor; + use std::str::FromStr; + use std::sync::Arc; + use time::OffsetDateTime; + + #[test] + fn test_reject_writeable() { + let reject = TenTenOneReject { + reject: Reject { + channel_id: DlcChannelId::default(), + timestamp: 0, + reference_id: None, + }, + }; + + let json_msg = handler_read_test(reject); + assert_debug_snapshot!(json_msg); + } + + #[test] + fn test_settle_offer_impl_serde_writeable() { + let settle_offer = TenTenOneSettleOffer { + settle_offer: SettleOffer { + channel_id: DlcChannelId::default(), + counter_payout: 0, + next_per_update_point: dummy_pubkey(), + timestamp: 0, + reference_id: None, + }, + order: dummy_order(), + filled_with: dummy_filled_with(), + }; + + let json_msg = handler_read_test(settle_offer); + assert_debug_snapshot!(json_msg); + } + + fn dummy_filled_with() -> commons::FilledWith { + commons::FilledWith { + order_id: Default::default(), + expiry_timestamp: dummy_timestamp(), + oracle_pk: dummy_x_only_pubkey(), + matches: vec![], + } + } + + fn dummy_order() -> commons::Order { + commons::Order { + id: Default::default(), + price: Default::default(), + leverage: 0.0, + contract_symbol: ContractSymbol::BtcUsd, + trader_id: PublicKey::from_str( + "02d5aa8fce495f6301b466594af056a46104dcdc6d735ec4793aa43108854cbd4a", + ) + .unwrap(), + direction: Direction::Long, + quantity: Default::default(), + order_type: OrderType::Market, + timestamp: dummy_timestamp(), + expiry: dummy_timestamp(), + order_state: OrderState::Open, + order_reason: OrderReason::Manual, + stable: false, + } + } + + fn dummy_timestamp() -> OffsetDateTime { + OffsetDateTime::from_unix_timestamp(0).unwrap() + } + + fn dummy_pubkey() -> secp256k1_zkp::PublicKey { + secp256k1_zkp::PublicKey::from_str( + "02d5aa8fce495f6301b466594af056a46104dcdc6d735ec4793aa43108854cbd4a", + ) + .unwrap() + } + + fn dummy_x_only_pubkey() -> secp256k1::XOnlyPublicKey { + secp256k1::XOnlyPublicKey::from_str( + "cc8a4bc64d897bddc5fbc2f670f7a8ba0b386779106cf1223c6fc5d7cd6fc115", + ) + .unwrap() + } + + fn handler_read_test( + msg: T, + ) -> Result { + let mut buf = Vec::new(); + msg.type_id().write(&mut buf)?; + msg.write(&mut buf)?; + + let handler = TenTenOneMessageHandler::new(Arc::new(NodeEventHandler::new())); + let mut reader = Cursor::new(&mut buf); + let message_type = ::read(&mut reader).map_err(|e| anyhow!("{e:#}"))?; + let message = handler + .read(message_type, &mut reader) + .map_err(|e| anyhow!("{e:#}"))? + .expect("to read message"); + + let msg = serde_json::to_string(&message)?; + Ok(msg) + } +} diff --git a/crates/xxi-node/src/node/dlc_channel.rs b/crates/xxi-node/src/node/dlc_channel.rs index c9c8c8b2a..884999307 100644 --- a/crates/xxi-node/src/node/dlc_channel.rs +++ b/crates/xxi-node/src/node/dlc_channel.rs @@ -1,10 +1,12 @@ use crate::bitcoin_conversion::to_secp_pk_29; use crate::bitcoin_conversion::to_secp_pk_30; +use crate::commons; use crate::message_handler::TenTenOneCollaborativeCloseOffer; use crate::message_handler::TenTenOneMessage; use crate::message_handler::TenTenOneMessageHandler; use crate::message_handler::TenTenOneOfferChannel; use crate::message_handler::TenTenOneRenewOffer; +use crate::message_handler::TenTenOneRolloverOffer; use crate::message_handler::TenTenOneSettleAccept; use crate::message_handler::TenTenOneSettleOffer; use crate::node::event::NodeEvent; @@ -39,6 +41,7 @@ impl, dlc_channel_id: &DlcChannelId, contract_input: ContractInput, protocol_id: ReferenceId, @@ -302,8 +315,18 @@ impl TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { + renew_offer, + filled_with, + }), + // if no filled with is provided we are rolling over. + None => TenTenOneMessage::RolloverOffer(TenTenOneRolloverOffer { renew_offer }), + }; + event_handler.publish(NodeEvent::StoreDlcMessage { - msg: TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { renew_offer }), + msg, peer: to_secp_pk_30(counterparty_pubkey), }); diff --git a/crates/xxi-node/src/node/dlc_manager.rs b/crates/xxi-node/src/node/dlc_manager.rs index 3fcd4ca30..089208a27 100644 --- a/crates/xxi-node/src/node/dlc_manager.rs +++ b/crates/xxi-node/src/node/dlc_manager.rs @@ -76,12 +76,7 @@ impl Some(TenTenOneMessage::try_from(response)?), - None => None, - }; - - Ok(response) + Ok(response.map(TenTenOneMessage::build_from_response)) } } diff --git a/crates/xxi-node/src/snapshots/xxi_node__message_handler__tests__reject_writeable.snap b/crates/xxi-node/src/snapshots/xxi_node__message_handler__tests__reject_writeable.snap new file mode 100644 index 000000000..47211cbb7 --- /dev/null +++ b/crates/xxi-node/src/snapshots/xxi_node__message_handler__tests__reject_writeable.snap @@ -0,0 +1,7 @@ +--- +source: crates/xxi-node/src/message_handler.rs +expression: json_msg +--- +Ok( + "{\"Message\":{\"Reject\":{\"reject\":{\"channelId\":\"0000000000000000000000000000000000000000000000000000000000000000\",\"timestamp\":0,\"referenceId\":null}}}}", +) diff --git a/crates/xxi-node/src/snapshots/xxi_node__message_handler__tests__settle_offer_impl_serde_writeable.snap b/crates/xxi-node/src/snapshots/xxi_node__message_handler__tests__settle_offer_impl_serde_writeable.snap new file mode 100644 index 000000000..ad0f3acc6 --- /dev/null +++ b/crates/xxi-node/src/snapshots/xxi_node__message_handler__tests__settle_offer_impl_serde_writeable.snap @@ -0,0 +1,7 @@ +--- +source: crates/xxi-node/src/message_handler.rs +expression: json_msg +--- +Ok( + "{\"Message\":{\"SettleOffer\":{\"order\":{\"id\":\"00000000-0000-0000-0000-000000000000\",\"price\":0.0,\"leverage\":0.0,\"contract_symbol\":\"BtcUsd\",\"trader_id\":\"02d5aa8fce495f6301b466594af056a46104dcdc6d735ec4793aa43108854cbd4a\",\"direction\":\"Long\",\"quantity\":0.0,\"order_type\":\"Market\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"expiry\":\"1970-01-01T00:00:00Z\",\"order_state\":\"Open\",\"order_reason\":\"Manual\",\"stable\":false},\"filled_with\":{\"order_id\":\"00000000-0000-0000-0000-000000000000\",\"expiry_timestamp\":[1970,1,0,0,0,0,0,0,0],\"oracle_pk\":\"cc8a4bc64d897bddc5fbc2f670f7a8ba0b386779106cf1223c6fc5d7cd6fc115\",\"matches\":[]},\"settle_offer\":{\"channelId\":\"0000000000000000000000000000000000000000000000000000000000000000\",\"counterPayout\":0,\"nextPerUpdatePoint\":\"02d5aa8fce495f6301b466594af056a46104dcdc6d735ec4793aa43108854cbd4a\",\"timestamp\":0,\"referenceId\":null}}}}", +) diff --git a/crates/xxi-node/src/tests/dlc_channel.rs b/crates/xxi-node/src/tests/dlc_channel.rs index cfacbe949..d102b1e35 100644 --- a/crates/xxi-node/src/tests/dlc_channel.rs +++ b/crates/xxi-node/src/tests/dlc_channel.rs @@ -8,6 +8,8 @@ use crate::on_chain_wallet; use crate::storage::TenTenOneInMemoryStorage; use crate::tests::bitcoind::mine; use crate::tests::dummy_contract_input; +use crate::tests::dummy_filled_with; +use crate::tests::dummy_order; use crate::tests::init_tracing; use crate::tests::new_reference_id; use crate::tests::wait_until; @@ -37,6 +39,7 @@ async fn can_open_and_settle_offchain() { coordinator .propose_dlc_channel_update( + Some(dummy_filled_with()), &coordinator_signed_channel.channel_id, contract_input, new_reference_id(), @@ -455,7 +458,12 @@ async fn open_channel_and_position_and_settle_position( ); coordinator - .propose_dlc_channel(contract_input, app.info.pubkey, new_reference_id()) + .propose_dlc_channel( + dummy_filled_with(), + contract_input, + app.info.pubkey, + new_reference_id(), + ) .await .unwrap(); @@ -575,6 +583,8 @@ async fn open_channel_and_position_and_settle_position( coordinator .propose_dlc_channel_collaborative_settlement( + dummy_order(), + dummy_filled_with(), &coordinator_signed_channel.channel_id, coordinator_dlc_collateral.to_sat() / 2, new_reference_id(), diff --git a/crates/xxi-node/src/tests/mod.rs b/crates/xxi-node/src/tests/mod.rs index b048cfdc7..f815e78e9 100644 --- a/crates/xxi-node/src/tests/mod.rs +++ b/crates/xxi-node/src/tests/mod.rs @@ -1,5 +1,6 @@ use crate::bitcoin_conversion::to_secp_pk_29; use crate::bitcoin_conversion::to_xonly_pk_29; +use crate::commons; use crate::node::dlc_channel::send_dlc_message; use crate::node::event::NodeEvent; use crate::node::event::NodeEventHandler; @@ -35,6 +36,7 @@ use rand::distributions::Alphanumeric; use rand::thread_rng; use rand::Rng; use rand::RngCore; +use secp256k1::PublicKey; use std::collections::HashMap; use std::env::temp_dir; use std::net::TcpListener; @@ -447,3 +449,36 @@ pub fn new_reference_id() -> ReferenceId { array } + +pub fn dummy_order() -> commons::Order { + commons::Order { + id: Default::default(), + price: Default::default(), + leverage: 0.0, + contract_symbol: commons::ContractSymbol::BtcUsd, + trader_id: PublicKey::from_str( + "02d5aa8fce495f6301b466594af056a46104dcdc6d735ec4793aa43108854cbd4a", + ) + .unwrap(), + direction: commons::Direction::Long, + quantity: Default::default(), + order_type: commons::OrderType::Market, + timestamp: OffsetDateTime::now_utc(), + expiry: OffsetDateTime::now_utc(), + order_state: commons::OrderState::Open, + order_reason: commons::OrderReason::Manual, + stable: false, + } +} + +pub fn dummy_filled_with() -> commons::FilledWith { + commons::FilledWith { + order_id: Default::default(), + expiry_timestamp: OffsetDateTime::now_utc(), + oracle_pk: XOnlyPublicKey::from_str( + "cc8a4bc64d897bddc5fbc2f670f7a8ba0b386779106cf1223c6fc5d7cd6fc115", + ) + .unwrap(), + matches: vec![], + } +} diff --git a/mobile/native/src/db/custom_types.rs b/mobile/native/src/db/custom_types.rs index a6a1d2d48..98a8348d9 100644 --- a/mobile/native/src/db/custom_types.rs +++ b/mobile/native/src/db/custom_types.rs @@ -255,6 +255,7 @@ impl ToSql for MessageType { MessageType::SettleConfirm => "SettleConfirm", MessageType::SettleFinalize => "SettleFinalize", MessageType::RenewOffer => "RenewOffer", + MessageType::RolloverOffer => "RolloverOffer", MessageType::RenewAccept => "RenewAccept", MessageType::RenewConfirm => "RenewConfirm", MessageType::RenewFinalize => "RenewFinalize", diff --git a/mobile/native/src/db/dlc_messages.rs b/mobile/native/src/db/dlc_messages.rs index 8e687ff9f..f3bb1fa0a 100644 --- a/mobile/native/src/db/dlc_messages.rs +++ b/mobile/native/src/db/dlc_messages.rs @@ -39,6 +39,7 @@ pub enum MessageType { SettleConfirm, SettleFinalize, RenewOffer, + RolloverOffer, RenewAccept, RenewConfirm, RenewFinalize, @@ -97,6 +98,7 @@ impl From for MessageType { xxi_node::dlc_message::DlcMessageType::SettleConfirm => Self::SettleConfirm, xxi_node::dlc_message::DlcMessageType::SettleFinalize => Self::SettleFinalize, xxi_node::dlc_message::DlcMessageType::RenewOffer => Self::RenewOffer, + xxi_node::dlc_message::DlcMessageType::RolloverOffer => Self::RolloverOffer, xxi_node::dlc_message::DlcMessageType::RenewAccept => Self::RenewAccept, xxi_node::dlc_message::DlcMessageType::RenewConfirm => Self::RenewConfirm, xxi_node::dlc_message::DlcMessageType::RenewFinalize => Self::RenewFinalize, @@ -136,6 +138,7 @@ impl From for xxi_node::dlc_message::DlcMessageType { MessageType::SettleConfirm => xxi_node::dlc_message::DlcMessageType::SettleConfirm, MessageType::SettleFinalize => xxi_node::dlc_message::DlcMessageType::SettleFinalize, MessageType::RenewOffer => xxi_node::dlc_message::DlcMessageType::RenewOffer, + MessageType::RolloverOffer => xxi_node::dlc_message::DlcMessageType::RolloverOffer, MessageType::RenewAccept => xxi_node::dlc_message::DlcMessageType::RenewAccept, MessageType::RenewConfirm => xxi_node::dlc_message::DlcMessageType::RenewConfirm, MessageType::RenewFinalize => xxi_node::dlc_message::DlcMessageType::RenewFinalize, diff --git a/mobile/native/src/dlc/node.rs b/mobile/native/src/dlc/node.rs index e57e068ae..659770264 100644 --- a/mobile/native/src/dlc/node.rs +++ b/mobile/native/src/dlc/node.rs @@ -25,12 +25,15 @@ use lightning::chain::transaction::OutPoint; use lightning::sign::DelayedPaymentOutputDescriptor; use lightning::sign::SpendableOutputDescriptor; use lightning::sign::StaticPaymentOutputDescriptor; +use rust_decimal::prelude::ToPrimitive; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use time::OffsetDateTime; use tracing::instrument; use xxi_node::bitcoin_conversion::to_secp_pk_30; +use xxi_node::commons; +use xxi_node::commons::OrderReason; use xxi_node::dlc_message::DlcMessage; use xxi_node::dlc_message::SerializedDlcMessage; use xxi_node::message_handler::TenTenOneAcceptChannel; @@ -40,6 +43,7 @@ use xxi_node::message_handler::TenTenOneOfferChannel; use xxi_node::message_handler::TenTenOneReject; use xxi_node::message_handler::TenTenOneRenewAccept; use xxi_node::message_handler::TenTenOneRenewOffer; +use xxi_node::message_handler::TenTenOneRolloverOffer; use xxi_node::message_handler::TenTenOneSettleOffer; use xxi_node::node; use xxi_node::node::event::NodeEvent; @@ -216,6 +220,7 @@ impl Node { reference_id, .. }, + .. }) | TenTenOneMessage::SettleOffer(TenTenOneSettleOffer { settle_offer: @@ -224,6 +229,7 @@ impl Node { reference_id, .. }, + .. }) | TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { renew_offer: @@ -232,6 +238,7 @@ impl Node { reference_id, .. }, + .. }) => { if let Err(e) = self.force_reject_offer(node_id, *channel_id, *reference_id) { @@ -267,14 +274,14 @@ impl Node { channel_id = hex::encode(offer.offer_channel.temporary_channel_id), "Automatically accepting dlc channel offer" ); - self.process_dlc_channel_offer(&offer.offer_channel.temporary_channel_id)?; + self.process_dlc_channel_offer(&offer)?; } TenTenOneMessage::SettleOffer(offer) => { tracing::info!( channel_id = hex::encode(offer.settle_offer.channel_id), "Automatically accepting settle offer" ); - self.process_settle_offer(&offer.settle_offer.channel_id)?; + self.process_settle_offer(&offer)?; } TenTenOneMessage::RenewOffer(offer) => { tracing::info!( @@ -282,10 +289,15 @@ impl Node { "Automatically accepting renew offer" ); - let expiry_timestamp = OffsetDateTime::from_unix_timestamp( - offer.renew_offer.contract_info.get_closest_maturity_date() as i64, - )?; - self.process_renew_offer(&offer.renew_offer.channel_id, expiry_timestamp)?; + self.process_renew_offer(&offer)?; + } + TenTenOneMessage::RolloverOffer(offer) => { + tracing::info!( + channel_id = hex::encode(offer.renew_offer.channel_id), + "Automatically accepting rollover offer" + ); + + self.process_rollover_offer(&offer)?; } TenTenOneMessage::RenewRevoke(revoke) => { let channel_id_hex = hex::encode(revoke.renew_revoke.channel_id); @@ -475,14 +487,16 @@ impl Node { ) } - #[instrument(fields(channel_id = hex::encode(channel_id)),skip_all, err(Debug))] - pub fn process_dlc_channel_offer(&self, channel_id: &DlcChannelId) -> Result<()> { + #[instrument(fields(channel_id = hex::encode(offer.offer_channel.temporary_channel_id)),skip_all, err(Debug))] + pub fn process_dlc_channel_offer(&self, offer: &TenTenOneOfferChannel) -> Result<()> { // TODO(holzeis): We should check if the offered amounts are expected. + self.set_order_to_filling(offer.filled_with.clone())?; + let channel_id = offer.offer_channel.temporary_channel_id; match self .inner .dlc_manager - .accept_channel(channel_id) + .accept_channel(&channel_id) .map_err(anyhow::Error::new) { Ok((accept_channel, _, _, node_id)) => { @@ -493,7 +507,7 @@ impl Node { } Err(e) => { tracing::error!("Failed to accept DLC channel offer: {e:#}"); - self.reject_dlc_channel_offer(channel_id)?; + self.reject_dlc_channel_offer(&channel_id)?; } } @@ -517,21 +531,64 @@ impl Node { ) } - #[instrument(fields(channel_id = hex::encode(channel_id)),skip_all, err(Debug))] - pub fn process_settle_offer(&self, channel_id: &DlcChannelId) -> Result<()> { + #[instrument(fields(channel_id = hex::encode(offer.settle_offer.channel_id)),skip_all, err(Debug))] + pub fn process_settle_offer(&self, offer: &TenTenOneSettleOffer) -> Result<()> { // TODO(holzeis): We should check if the offered amounts are expected. + let order_reason = offer.order.clone().order_reason; + let order_id = offer.order.id; + + match order_reason { + OrderReason::Expired + | OrderReason::CoordinatorLiquidated + | OrderReason::TraderLiquidated => { + tracing::info!( + %order_id, + "Received an async match from orderbook. Reason: {order_reason:?}" + ); + event::publish(&EventInternal::BackgroundNotification( + BackgroundTask::AsyncTrade(order_reason.into()), + )); + + order::handler::async_order_filling(&offer.order, &offer.filled_with) + .with_context(|| + format!("Failed to process async match update from orderbook. order_id {order_id}"))?; + } + // Received a regular settle offer after a manual order. + // + // TODO(holzeis): Eventually this should as well start the trade dialog. At the moment + // we automatically show the trade dialog since we expect a synchronous response from + // the orderbook. + OrderReason::Manual => self.set_order_to_filling(offer.filled_with.clone())?, + } + + let channel_id = offer.settle_offer.channel_id; if let Err(e) = self .inner - .accept_dlc_channel_collaborative_settlement(channel_id) + .accept_dlc_channel_collaborative_settlement(&channel_id) { tracing::error!("Failed to accept dlc channel collaborative settlement offer. {e:#}"); - self.reject_settle_offer(channel_id)?; + self.reject_settle_offer(&channel_id)?; } Ok(()) } + fn set_order_to_filling(&self, filled_with: commons::FilledWith) -> Result<()> { + let order_id = filled_with.order_id; + tracing::info!(%order_id, "Received match from orderbook"); + let execution_price = filled_with + .average_execution_price() + .to_f32() + .expect("to fit into f32"); + + let matching_fee = filled_with.order_matching_fee(); + + order::handler::order_filling(order_id, execution_price, matching_fee).with_context(|| { + format!("Failed to process match update from orderbook. order_id = {order_id}") + }) + } + #[instrument(fields(channel_id = hex::encode(channel_id)),skip_all, err(Debug))] pub fn reject_renew_offer(&self, channel_id: &DlcChannelId) -> Result<()> { tracing::warn!("Rejecting dlc channel renew offer!"); @@ -550,15 +607,17 @@ impl Node { ) } - #[instrument(fields(channel_id = hex::encode(channel_id)),skip_all, err(Debug))] - pub fn process_renew_offer( - &self, - channel_id: &DlcChannelId, - expiry_timestamp: OffsetDateTime, - ) -> Result<()> { + #[instrument(fields(channel_id = hex::encode(offer.renew_offer.channel_id)),skip_all, err(Debug))] + pub fn process_renew_offer(&self, offer: &TenTenOneRenewOffer) -> Result<()> { // TODO(holzeis): We should check if the offered amounts are expected. + let expiry_timestamp = OffsetDateTime::from_unix_timestamp( + offer.renew_offer.contract_info.get_closest_maturity_date() as i64, + )?; + + self.set_order_to_filling(offer.filled_with.clone())?; - match self.inner.dlc_manager.accept_renew_offer(channel_id) { + let channel_id = offer.renew_offer.channel_id; + match self.inner.dlc_manager.accept_renew_offer(&channel_id) { Ok((renew_accept, node_id)) => { position::handler::handle_channel_renewal_offer(expiry_timestamp)?; @@ -570,7 +629,38 @@ impl Node { Err(e) => { tracing::error!("Failed to accept dlc channel renew offer. {e:#}"); - self.reject_renew_offer(channel_id)?; + self.reject_renew_offer(&channel_id)?; + } + }; + + Ok(()) + } + + #[instrument(fields(channel_id = hex::encode(offer.renew_offer.channel_id)),skip_all, err(Debug))] + pub fn process_rollover_offer(&self, offer: &TenTenOneRolloverOffer) -> Result<()> { + tracing::info!("Received a rollover notification from orderbook."); + event::publish(&EventInternal::BackgroundNotification( + BackgroundTask::Rollover(TaskStatus::Pending), + )); + + let expiry_timestamp = OffsetDateTime::from_unix_timestamp( + offer.renew_offer.contract_info.get_closest_maturity_date() as i64, + )?; + + let channel_id = offer.renew_offer.channel_id; + match self.inner.dlc_manager.accept_renew_offer(&channel_id) { + Ok((renew_accept, node_id)) => { + position::handler::handle_channel_renewal_offer(expiry_timestamp)?; + + self.send_dlc_message( + to_secp_pk_30(node_id), + TenTenOneMessage::RenewAccept(TenTenOneRenewAccept { renew_accept }), + )?; + } + Err(e) => { + tracing::error!("Failed to accept dlc channel rollover offer. {e:#}"); + + self.reject_renew_offer(&channel_id)?; } }; diff --git a/mobile/native/src/orderbook.rs b/mobile/native/src/orderbook.rs index 0ccac1125..09358deb6 100644 --- a/mobile/native/src/orderbook.rs +++ b/mobile/native/src/orderbook.rs @@ -16,7 +16,6 @@ use bitcoin::secp256k1::SECP256K1; use futures::SinkExt; use futures::TryStreamExt; use parking_lot::Mutex; -use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use std::collections::HashMap; use std::sync::Arc; @@ -36,11 +35,6 @@ use xxi_node::commons::OrderState; use xxi_node::commons::OrderbookRequest; use xxi_node::commons::Signature; -/// FIXME(holzeis): There is an edge case where the app is still open while we move into the -/// rollover window. If the coordinator restarts while the app remains open in that scenario, the -/// rollover will fail. However the rollover will succeed on the next restart. -/// This could be fixed by only sending the rollover message once the channel is usable with the -/// trader. const WS_RECONNECT_TIMEOUT: Duration = Duration::from_millis(200); pub fn subscribe( @@ -178,44 +172,6 @@ async fn handle_orderbook_message( state::set_tentenone_config(config.clone()); event::publish(&EventInternal::Authenticated(config)); } - Message::Rollover(_) => { - tracing::info!("Received a rollover notification from orderbook."); - event::publish(&EventInternal::BackgroundNotification( - BackgroundTask::Rollover(TaskStatus::Pending), - )); - } - Message::AsyncMatch { order, filled_with } => { - let order_id = order.id; - let order_reason = order.clone().order_reason.into(); - - tracing::info!( - %order_id, - "Received an async match from orderbook. Reason: {order_reason:?}" - ); - - event::publish(&EventInternal::BackgroundNotification( - BackgroundTask::AsyncTrade(order_reason), - )); - - order::handler::async_order_filling(order, filled_with).with_context(|| { - format!("Failed to process async match update from orderbook. order_id {order_id}") - })?; - } - Message::Match(filled) => { - let order_id = filled.order_id; - - tracing::info!(%order_id, "Received match from orderbook"); - let execution_price = filled - .average_execution_price() - .to_f32() - .expect("to fit into f32"); - - let matching_fee = filled.order_matching_fee(); - - order::handler::order_filling(order_id, execution_price, matching_fee).with_context( - || format!("Failed to process match update from orderbook. order_id = {order_id}"), - )?; - } Message::AllOrders(initial_orders) => { let mut orders = orders.lock(); if !orders.is_empty() { diff --git a/mobile/native/src/trade/order/handler.rs b/mobile/native/src/trade/order/handler.rs index 95e26c538..729b6c7a4 100644 --- a/mobile/native/src/trade/order/handler.rs +++ b/mobile/native/src/trade/order/handler.rs @@ -27,9 +27,9 @@ use rust_decimal::prelude::ToPrimitive; use time::Duration; use time::OffsetDateTime; use uuid::Uuid; +use xxi_node::commons; use xxi_node::commons::ChannelOpeningParams; use xxi_node::commons::Direction; -use xxi_node::commons::FilledWith; use xxi_node::node::signed_channel_state_name; const ORDER_OUTDATED_AFTER: Duration = Duration::minutes(5); @@ -186,12 +186,12 @@ fn check_channel_state() -> Result<(), SubmitOrderError> { } pub(crate) fn async_order_filling( - order: xxi_node::commons::Order, - filled_with: FilledWith, + order: &commons::Order, + filled_with: &commons::FilledWith, ) -> Result<()> { let order_type = match order.order_type { - xxi_node::commons::OrderType::Market => OrderType::Market, - xxi_node::commons::OrderType::Limit => OrderType::Limit { + commons::OrderType::Market => OrderType::Market, + commons::OrderType::Limit => OrderType::Limit { price: order.price.to_f32().expect("to fit into f32"), }, }; @@ -218,7 +218,7 @@ pub(crate) fn async_order_filling( }, creation_timestamp: order.timestamp, order_expiry_timestamp: order.expiry, - reason: order.order_reason.into(), + reason: order.order_reason.clone().into(), stable: order.stable, failure_reason: None, }; From 9abef5c0d48352df6a9c34e47b0cb35ee74aa210 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Wed, 1 May 2024 21:54:55 +0200 Subject: [PATCH 3/3] refactor: Split up propose dlc channel update into propose reopen or resize and rollover --- coordinator/src/node/rollover.rs | 2 +- coordinator/src/trade/mod.rs | 12 ++--- crates/xxi-node/src/node/dlc_channel.rs | 65 ++++++++++++++++++++---- crates/xxi-node/src/tests/dlc_channel.rs | 4 +- 4 files changed, 63 insertions(+), 20 deletions(-) diff --git a/coordinator/src/node/rollover.rs b/coordinator/src/node/rollover.rs index 57806db5e..608f31c99 100644 --- a/coordinator/src/node/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -255,7 +255,7 @@ impl Node { let contract_id = self .inner - .propose_dlc_channel_update(None, dlc_channel_id, contract_input, protocol_id.into()) + .propose_rollover(dlc_channel_id, contract_input, protocol_id.into()) .await?; let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); diff --git a/coordinator/src/trade/mod.rs b/coordinator/src/trade/mod.rs index 89f2d256a..2397a00b9 100644 --- a/coordinator/src/trade/mod.rs +++ b/coordinator/src/trade/mod.rs @@ -544,14 +544,14 @@ impl TradeExecutor { let temporary_contract_id = self .node .inner - .propose_dlc_channel_update( - Some(trade_params.filled_with.clone()), + .propose_reopen_or_resize( + trade_params.filled_with.clone(), &dlc_channel_id, contract_input, protocol_id.into(), ) .await - .context("Could not propose DLC channel update")?; + .context("Could not propose reopen DLC channel update")?; let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.node.pool.clone()); protocol_executor.start_dlc_protocol( @@ -717,14 +717,14 @@ impl TradeExecutor { let temporary_contract_id = self .node .inner - .propose_dlc_channel_update( - Some(trade_params.filled_with.clone()), + .propose_reopen_or_resize( + trade_params.filled_with.clone(), &dlc_channel_id, contract_input, protocol_id.into(), ) .await - .context("Could not propose DLC channel update")?; + .context("Could not propose resize DLC channel update")?; let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.node.pool.clone()); protocol_executor.start_dlc_protocol( diff --git a/crates/xxi-node/src/node/dlc_channel.rs b/crates/xxi-node/src/node/dlc_channel.rs index 884999307..d8ca4c55e 100644 --- a/crates/xxi-node/src/node/dlc_channel.rs +++ b/crates/xxi-node/src/node/dlc_channel.rs @@ -291,15 +291,15 @@ impl, + filled_with: commons::FilledWith, dlc_channel_id: &DlcChannelId, contract_input: ContractInput, protocol_id: ReferenceId, ) -> Result { - tracing::info!(channel_id = %hex::encode(dlc_channel_id), "Proposing a DLC channel update"); + tracing::info!(channel_id = %hex::encode(dlc_channel_id), "Proposing a DLC channel reopen or resize"); spawn_blocking({ let dlc_manager = self.dlc_manager.clone(); let dlc_channel_id = *dlc_channel_id; @@ -315,18 +315,61 @@ impl TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { + event_handler.publish(NodeEvent::StoreDlcMessage { + msg: TenTenOneMessage::RenewOffer(TenTenOneRenewOffer { renew_offer, filled_with, }), - // if no filled with is provided we are rolling over. - None => TenTenOneMessage::RolloverOffer(TenTenOneRolloverOffer { renew_offer }), - }; + peer: to_secp_pk_30(counterparty_pubkey), + }); + + let offered_contracts = dlc_manager.get_store().get_contract_offers()?; + + // We assume that the first `OfferedContract` we find here is the one we just + // proposed when renewing the DLC channel. + // + // TODO: Change `renew_offer` API to return the `temporary_contract_id`, like + // `offer_channel` does. + let offered_contract = offered_contracts + .iter() + .find(|contract| contract.counter_party == counterparty_pubkey) + .context( + "Could not find offered contract after proposing DLC channel update", + )?; + + Ok(offered_contract.id) + } + }) + .await + .map_err(|e| anyhow!("{e:#}"))? + } + + /// Propose an update to the DLC channel based on the provided [`ContractInput`]. A + /// [`TenTenOneRolloverOffer`] is sent to the counterparty, kickstarting the dlc renew protocol. + pub async fn propose_rollover( + &self, + dlc_channel_id: &DlcChannelId, + contract_input: ContractInput, + protocol_id: ReferenceId, + ) -> Result { + tracing::info!(channel_id = %hex::encode(dlc_channel_id), "Proposing a DLC channel rollover"); + spawn_blocking({ + let dlc_manager = self.dlc_manager.clone(); + let dlc_channel_id = *dlc_channel_id; + let event_handler = self.event_handler.clone(); + move || { + // Not actually needed. See https://github.com/p2pderivatives/rust-dlc/issues/149. + let counter_payout = 0; + + let (renew_offer, counterparty_pubkey) = dlc_manager.renew_offer( + &dlc_channel_id, + counter_payout, + &contract_input, + Some(protocol_id), + )?; event_handler.publish(NodeEvent::StoreDlcMessage { - msg, + msg: TenTenOneMessage::RolloverOffer(TenTenOneRolloverOffer { renew_offer }), peer: to_secp_pk_30(counterparty_pubkey), }); diff --git a/crates/xxi-node/src/tests/dlc_channel.rs b/crates/xxi-node/src/tests/dlc_channel.rs index d102b1e35..2f5b86c5c 100644 --- a/crates/xxi-node/src/tests/dlc_channel.rs +++ b/crates/xxi-node/src/tests/dlc_channel.rs @@ -38,8 +38,8 @@ async fn can_open_and_settle_offchain() { let contract_input = dummy_contract_input(15_000, 5_000, oracle_pk, None); coordinator - .propose_dlc_channel_update( - Some(dummy_filled_with()), + .propose_reopen_or_resize( + dummy_filled_with(), &coordinator_signed_channel.channel_id, contract_input, new_reference_id(),