Skip to content

Commit

Permalink
Handle gRPC connection loss (#288)
Browse files Browse the repository at this point in the history
  • Loading branch information
moubctez authored Sep 11, 2024
1 parent 57e4505 commit d8e3aed
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
7 changes: 3 additions & 4 deletions src-tauri/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub mod windows;

use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
ops::Add,
pin::Pin,
time::{Duration, UNIX_EPOCH},
};
Expand Down Expand Up @@ -221,7 +220,7 @@ impl DesktopDaemonService for DaemonService {
debug!("Sending stats update for interface {ifname}");
match wgapi.read_interface_data() {
Ok(host) => {
if let Err(err) = tx.send(Result::<_, Status>::Ok(host.into())).await {
if let Err(err) = tx.send(Ok(host.into())).await {
error!(
"Failed to send stats update for interface {ifname}. Error: {err}"
);
Expand Down Expand Up @@ -325,12 +324,12 @@ impl From<proto::Peer> for Peer {
}),
last_handshake: peer
.last_handshake
.map(|timestamp| UNIX_EPOCH.add(Duration::from_secs(timestamp))),
.map(|timestamp| UNIX_EPOCH + Duration::from_secs(timestamp)),
tx_bytes: peer.tx_bytes,
rx_bytes: peer.rx_bytes,
persistent_keepalive_interval: peer
.persistent_keepalive_interval
.map(|interval| interval as u16),
.and_then(|interval| u16::try_from(interval).ok()),
allowed_ips: peer
.allowed_ips
.into_iter()
Expand Down
19 changes: 13 additions & 6 deletions src-tauri/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use defguard_wireguard_rs::{host::Peer, key::Key, net::IpAddrMask, InterfaceConf
use local_ip_address::local_ip;
use sqlx::query;
use tauri::{AppHandle, Manager};
use tonic::{codegen::tokio_stream::StreamExt, transport::Channel};
use tonic::transport::Channel;
use tracing::Level;

use crate::{
Expand Down Expand Up @@ -65,7 +65,7 @@ pub async fn setup_interface(
}

debug!("Parsing location allowed ips: {}", location.allowed_ips);
let allowed_ips: Vec<String> = if location.route_all_traffic {
let allowed_ips = if location.route_all_traffic {
debug!("Using all traffic routing: {DEFAULT_ROUTE_IPV4} {DEFAULT_ROUTE_IPV6}");
vec![DEFAULT_ROUTE_IPV4.into(), DEFAULT_ROUTE_IPV6.into()]
} else {
Expand Down Expand Up @@ -213,9 +213,9 @@ pub fn spawn_stats_thread(
.expect("Failed to connect to interface stats stream")
.into_inner();

while let Some(item) = stream.next().await {
match item {
Ok(interface_data) => {
loop {
match stream.message().await {
Ok(Some(interface_data)) => {
debug!("Received interface data update: {interface_data:?}");
let peers: Vec<Peer> =
interface_data.peers.into_iter().map(Into::into).collect();
Expand Down Expand Up @@ -245,8 +245,15 @@ pub fn spawn_stats_thread(
}
}
}
Ok(None) => {
info!(
"gRPC stream to the defguard-service managing connections has been closed"
);
break;
}
Err(err) => {
error!("Failed to receive interface data update: {err}");
error!("gRPC stream to the defguard-service managing connections error: {err}");
break;
}
}
}
Expand Down

0 comments on commit d8e3aed

Please sign in to comment.