From 895ef672b8af10456edd93e973007ca9d4b5f516 Mon Sep 17 00:00:00 2001 From: rxdn <29165304+rxdn@users.noreply.github.com> Date: Sat, 28 Sep 2024 17:54:55 +0100 Subject: [PATCH] Fix large sharding --- sharder/src/manager/public_shard_manager.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/sharder/src/manager/public_shard_manager.rs b/sharder/src/manager/public_shard_manager.rs index 1c026cc..2d0d126 100644 --- a/sharder/src/manager/public_shard_manager.rs +++ b/sharder/src/manager/public_shard_manager.rs @@ -17,7 +17,7 @@ use crate::GatewayError; use deadpool_redis::Pool; use std::time::Duration; use tokio::fs::File; -use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio::sync::{broadcast, mpsc}; use tokio::time::{sleep, timeout}; pub struct PublicShardManager { @@ -49,7 +49,7 @@ impl PublicShardManager { } } - fn build_shard(&self, shard_id: u16, ready_tx: Option>) -> Shard { + fn build_shard(&self, shard_id: u16) -> Shard { let shard_info = ShardInfo::new(shard_id, self.options.shard_count.total); let identify = Identify::new( @@ -67,21 +67,20 @@ impl PublicShardManager { Arc::clone(&self.redis), self.options.user_id, Arc::clone(&self.event_forwarder), - ready_tx, + None, self.shutdown_tx.subscribe(), #[cfg(feature = "whitelabel")] command_rx, ) } - #[tracing::instrument(skip(self, resume_data, ready_tx))] + #[tracing::instrument(skip(self, resume_data))] async fn start_shard( &self, shard_id: u16, resume_data: Option, - ready_tx: Option>, ) -> (bool, Option) { - let shard = self.build_shard(shard_id, ready_tx); + let shard = self.build_shard(shard_id); // TODO: Skip ready_rx await on error match shard.connect(resume_data.clone()).await { @@ -111,7 +110,6 @@ impl ShardManager for PublicShardManager { #[tracing::instrument(skip(self))] async fn connect(self: Arc) { for shard_id in self.options.shard_count.lowest..self.options.shard_count.highest { - let (ready_tx, ready_rx) = oneshot::channel::<()>(); let sm = Arc::clone(&self); debug!("Fetching resume data"); @@ -124,13 +122,12 @@ impl ShardManager for PublicShardManager { }; tokio::spawn(async move { - let mut ready_tx = Some(ready_tx); let mut resume_data = resume_data; loop { let (reconnect, new_resume_data) = sm .as_ref() - .start_shard(shard_id, resume_data.clone(), ready_tx.take()) + .start_shard(shard_id, resume_data.clone()) .await; if !reconnect { @@ -143,11 +140,6 @@ impl ShardManager for PublicShardManager { sleep(Duration::from_millis(500)).await; } }); - - match ready_rx.await { - Ok(_) => info!(shard_id = %shard_id, "Loaded guilds"), - Err(e) => error!(shard_id = %shard_id, error = %e, "Error reading ready rx"), - } } File::create("/tmp/ready").await.unwrap(); // panic if can't create