Skip to content

Commit

Permalink
Fix large sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
rxdn committed Sep 28, 2024
1 parent 6f703dc commit 895ef67
Showing 1 changed file with 6 additions and 14 deletions.
20 changes: 6 additions & 14 deletions sharder/src/manager/public_shard_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::GatewayError;
use deadpool_redis::Pool;
use std::time::Duration;
use tokio::fs::File;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::sync::{broadcast, mpsc};
use tokio::time::{sleep, timeout};

pub struct PublicShardManager<T: EventForwarder> {
Expand Down Expand Up @@ -49,7 +49,7 @@ impl<T: EventForwarder> PublicShardManager<T> {
}
}

fn build_shard(&self, shard_id: u16, ready_tx: Option<oneshot::Sender<()>>) -> Shard<T> {
fn build_shard(&self, shard_id: u16) -> Shard<T> {
let shard_info = ShardInfo::new(shard_id, self.options.shard_count.total);

let identify = Identify::new(
Expand All @@ -67,21 +67,20 @@ impl<T: EventForwarder> PublicShardManager<T> {
Arc::clone(&self.redis),
self.options.user_id,
Arc::clone(&self.event_forwarder),
ready_tx,
None,
self.shutdown_tx.subscribe(),
#[cfg(feature = "whitelabel")]
command_rx,
)
}

#[tracing::instrument(skip(self, resume_data, ready_tx))]
#[tracing::instrument(skip(self, resume_data))]
async fn start_shard(
&self,
shard_id: u16,
resume_data: Option<SessionData>,
ready_tx: Option<oneshot::Sender<()>>,
) -> (bool, Option<SessionData>) {
let shard = self.build_shard(shard_id, ready_tx);
let shard = self.build_shard(shard_id);

// TODO: Skip ready_rx await on error
match shard.connect(resume_data.clone()).await {
Expand Down Expand Up @@ -111,7 +110,6 @@ impl<T: EventForwarder> ShardManager for PublicShardManager<T> {
#[tracing::instrument(skip(self))]
async fn connect(self: Arc<Self>) {
for shard_id in self.options.shard_count.lowest..self.options.shard_count.highest {
let (ready_tx, ready_rx) = oneshot::channel::<()>();
let sm = Arc::clone(&self);

debug!("Fetching resume data");
Expand All @@ -124,13 +122,12 @@ impl<T: EventForwarder> ShardManager for PublicShardManager<T> {
};

tokio::spawn(async move {
let mut ready_tx = Some(ready_tx);
let mut resume_data = resume_data;

loop {
let (reconnect, new_resume_data) = sm
.as_ref()
.start_shard(shard_id, resume_data.clone(), ready_tx.take())
.start_shard(shard_id, resume_data.clone())
.await;

if !reconnect {
Expand All @@ -143,11 +140,6 @@ impl<T: EventForwarder> ShardManager for PublicShardManager<T> {
sleep(Duration::from_millis(500)).await;
}
});

match ready_rx.await {
Ok(_) => info!(shard_id = %shard_id, "Loaded guilds"),
Err(e) => error!(shard_id = %shard_id, error = %e, "Error reading ready rx"),
}
}

File::create("/tmp/ready").await.unwrap(); // panic if can't create
Expand Down

0 comments on commit 895ef67

Please sign in to comment.