Skip to content

Commit

Permalink
add receive messages reconnect effect (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xavrax authored Jun 7, 2023
1 parent a15caea commit 205f0b9
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/dx/subscribe/event_engine/effect_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl EffectHandler<SubscribeEffectInvocation, SubscribeEffect> for SubscribeEffe
cursor: *cursor,
attempts: *attempts,
reason: reason.clone(),
executor: self.receive,
}),
SubscribeEffectInvocation::EmitStatus(status) => {
// TODO: Provide emit status effect
Expand Down
21 changes: 21 additions & 0 deletions src/dx/subscribe/event_engine/effects/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::{HandshakeFunction, ReceiveFunction};
mod handshake;
mod handshake_reconnection;
mod receive;
mod receive_reconnection;

/// Subscription state machine effects.
#[allow(dead_code)]
Expand Down Expand Up @@ -114,6 +115,11 @@ pub(crate) enum SubscribeEffect {

/// Receive updates attempt failure reason.
reason: PubNubError,

/// Executor function.
///
/// Function which will be used to execute receive updates.
executor: ReceiveFunction,
},

/// Status change notification effect invocation.
Expand Down Expand Up @@ -167,6 +173,21 @@ impl Effect for SubscribeEffect {
cursor,
executor,
} => receive::execute(channels, channel_groups, cursor, *executor),
SubscribeEffect::ReceiveReconnect {
channels,
channel_groups,
cursor,
attempts,
reason,
executor,
} => receive_reconnection::execute(
channels,
channel_groups,
cursor,
*attempts,
reason.clone(), // TODO: Does run function need to borrow self? Or we can consume it?
*executor,
),
_ => {
/* TODO: Implement other effects */
None
Expand Down
119 changes: 119 additions & 0 deletions src/dx/subscribe/event_engine/effects/receive_reconnection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use crate::lib::alloc::{string::String, vec, vec::Vec};
use crate::{
core::PubNubError,
dx::subscribe::{
event_engine::{ReceiveFunction, SubscribeEvent},
SubscribeCursor,
},
};

pub(crate) fn execute(
channels: &Option<Vec<String>>,
channel_groups: &Option<Vec<String>>,
cursor: &SubscribeCursor,
attempt: u8,
reason: PubNubError,
executor: ReceiveFunction,
) -> Option<Vec<SubscribeEvent>> {
Some(
executor(channels, channel_groups, cursor, attempt, Some(reason))
.unwrap_or_else(|err| vec![SubscribeEvent::ReceiveReconnectFailure { reason: err }]),
)
}

#[cfg(test)]
mod should {
use super::*;
use crate::{core::PubNubError, dx::subscribe::SubscribeCursor};

#[test]
fn receive_messages() {
fn mock_receive_function(
channels: &Option<Vec<String>>,
channel_groups: &Option<Vec<String>>,
cursor: &SubscribeCursor,
attempt: u8,
reason: Option<PubNubError>,
) -> Result<Vec<SubscribeEvent>, PubNubError> {
assert_eq!(channels, &Some(vec!["ch1".to_string()]));
assert_eq!(channel_groups, &Some(vec!["cg1".to_string()]));
assert_eq!(attempt, 10);
assert_eq!(
reason,
Some(PubNubError::Transport {
details: "test".into(),
})
);
assert_eq!(
cursor,
&SubscribeCursor {
timetoken: 0,
region: 0
}
);

Ok(vec![SubscribeEvent::ReceiveSuccess {
cursor: SubscribeCursor {
timetoken: 0,
region: 0,
},
messages: vec![],
}])
}

let result = execute(
&Some(vec!["ch1".to_string()]),
&Some(vec!["cg1".to_string()]),
&SubscribeCursor {
timetoken: 0,
region: 0,
},
10,
PubNubError::Transport {
details: "test".into(),
},
mock_receive_function,
);

assert!(matches!(
result.unwrap().first().unwrap(),
&SubscribeEvent::ReceiveSuccess { .. }
))
}

#[test]
fn return_handskahe_failure_event_on_err() {
fn mock_receive_function(
_channels: &Option<Vec<String>>,
_channel_groups: &Option<Vec<String>>,
_cursor: &SubscribeCursor,
_attempt: u8,
_reason: Option<PubNubError>,
) -> Result<Vec<SubscribeEvent>, PubNubError> {
Err(PubNubError::Transport {
details: "test".into(),
})
}

let binding = execute(
&Some(vec!["ch1".to_string()]),
&Some(vec!["cg1".to_string()]),
&SubscribeCursor {
timetoken: 0,
region: 0,
},
10,
PubNubError::Transport {
details: "test".into(),
},
mock_receive_function,
)
.unwrap();
let result = &binding[0];

assert!(matches!(
result,
&SubscribeEvent::ReceiveReconnectFailure { .. }
));
}
}

0 comments on commit 205f0b9

Please sign in to comment.