Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak #269

Merged
merged 9 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crux_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ serde-reflection = { version = "0.4.0", optional = true }
serde_json = "1.0.128"
slab = "0.4.9"
thiserror = "1.0.63"
uuid = { version = "1.10.0", features = ["v4", "serde"] }

[dev-dependencies]
assert_fs = "1.0.13"
Expand All @@ -41,4 +42,3 @@ serde = { version = "1.0.210", features = ["derive"] }
static_assertions = "1.1"
rand = "0.8"
url = "2.5.2"
uuid = { version = "1.10.0", features = ["v4", "serde"] }
143 changes: 112 additions & 31 deletions crux_core/src/capability/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
task::Context,
};
Expand All @@ -9,46 +10,76 @@ use futures::{
task::{waker_ref, ArcWake},
Future, FutureExt,
};
use uuid::Uuid;

// used in docs/internals/runtime.md
// ANCHOR: executor
pub(crate) struct QueuingExecutor {
ready_queue: Receiver<Arc<Task>>,
task_queue: Receiver<Task>,
charypar marked this conversation as resolved.
Show resolved Hide resolved
ready_queue: Receiver<Uuid>,
tasks: Mutex<HashMap<Uuid, Task>>,
charypar marked this conversation as resolved.
Show resolved Hide resolved
}
// ANCHOR_END: executor

// used in docs/internals/runtime.md
// ANCHOR: spawner
#[derive(Clone)]
pub struct Spawner {
task_sender: Sender<Arc<Task>>,
task_sender: Sender<Task>,
ready_sender: Sender<Uuid>,
}
// ANCHOR_END: spawner

// used in docs/internals/runtime.md
// ANCHOR: task
struct Task {
future: Mutex<Option<future::BoxFuture<'static, ()>>>,
id: Uuid,
charypar marked this conversation as resolved.
Show resolved Hide resolved
future: future::BoxFuture<'static, ()>,
ready_sender: Sender<Uuid>,
}

impl Task {
fn id(&self) -> Uuid {
self.id
}

task_sender: Sender<Arc<Task>>,
fn notify(&self) -> NotifyTask {
NotifyTask {
task_id: self.id,
sender: self.ready_sender.clone(),
}
}
charypar marked this conversation as resolved.
Show resolved Hide resolved
}

// ANCHOR_END: task

pub(crate) fn executor_and_spawner() -> (QueuingExecutor, Spawner) {
let (task_sender, ready_queue) = crossbeam_channel::unbounded();
let (task_sender, task_queue) = crossbeam_channel::unbounded();
let (ready_sender, ready_queue) = crossbeam_channel::unbounded();

(QueuingExecutor { ready_queue }, Spawner { task_sender })
(
QueuingExecutor {
ready_queue,
task_queue,
tasks: Mutex::new(HashMap::new()),
},
Spawner {
task_sender,
ready_sender,
},
)
}

// used in docs/internals/runtime.md
// ANCHOR: spawning
impl Spawner {
pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
let task = Task {
id: Uuid::new_v4(),
future,
ready_sender: self.ready_sender.clone(),
};

self.task_sender
.send(task)
Expand All @@ -59,38 +90,88 @@ impl Spawner {

// used in docs/internals/runtime.md
// ANCHOR: arc_wake
impl ArcWake for Task {
impl ArcWake for NotifyTask {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("unable to wake an async task, task sender channel is disconnected.")
let _ = arc_self.sender.send(arc_self.task_id);
// TODO should we report an error if send fails?
}
}
// ANCHOR_END: arc_wake

struct NotifyTask {
task_id: Uuid,
sender: Sender<Uuid>,
}

// used in docs/internals/runtime.md
// ANCHOR: run_all
impl QueuingExecutor {
pub fn run_all(&self) {
// While there are tasks to be processed
while let Ok(task) = self.ready_queue.try_recv() {
// Unlock the future in the Task
let mut future_slot = task.future.lock().unwrap();

// Take it, replace with None, ...
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&waker);

// ...and poll it
if future.as_mut().poll(context).is_pending() {
// If it's still pending, put it back
*future_slot = Some(future)
}
// we read off both queues and execute the tasks we receive.
// Since either queue can generate work for the other queue,
// we read from them in a loop until we are sure both queues
// are exhaused
let mut did_some_work = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler to automatically consider received tasks ready and enqueue them as they get an ID? Here we would then go through all new tasks first, then run the ready queue to exhaustion?

Unless I'm missing something, which is not unlikely :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, what I'm probably missing is that ready tasks may spawn further tasks, which we need to take in. So the result is the same tik-tok loop...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's an ugly loop but necessary, I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a chance this might alter observable behaviour, since the running order of the tasks has potentially changed. Of course we shouldn't be relying on the order that effects arrive, but in practice at least in tests it is hard to avoid.

That could be avoided by having everything arrive on one queue (using an enum), not very pretty though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think effectively random order of execution is a good thing - nothing should be relying on the ordering (beyond the causal one).


while did_some_work {
did_some_work = false;
// While there are tasks to be processed
while let Ok(task) = self.task_queue.try_recv() {
let task_id = task.id();
self.tasks.lock().unwrap().insert(task_id, task);
self.run_task(task_id);
did_some_work = true;
}
while let Ok(task_id) = self.ready_queue.try_recv() {
self.run_task(task_id);
did_some_work = true;
}
}
}

fn run_task(&self, task_id: Uuid) {
let mut tasks = self.tasks.lock().unwrap();
charypar marked this conversation as resolved.
Show resolved Hide resolved
let mut task = tasks.remove(&task_id).unwrap();
drop(tasks);

let notify = Arc::new(task.notify());
let waker = waker_ref(&notify);
let context = &mut Context::from_waker(&waker);
charypar marked this conversation as resolved.
Show resolved Hide resolved

// ...and poll it
if task.future.as_mut().poll(context).is_pending() {
// If it's still pending, put it back
self.tasks.lock().unwrap().insert(task.id, task);
}
}
}
// ANCHOR_END: run_all

#[cfg(test)]
mod tests {
use crate::capability::shell_request::ShellRequest;

use super::*;

#[test]
fn test_task_does_not_leak() {
let counter: Arc<()> = Arc::new(());
assert_eq!(Arc::strong_count(&counter), 1);

let (executor, spawner) = executor_and_spawner();

let future = {
let counter = counter.clone();
async move {
assert_eq!(Arc::strong_count(&counter), 2);
ShellRequest::<()>::new().await;
}
};

spawner.spawn(future);
executor.run_all();
drop(executor);
drop(spawner);
assert_eq!(Arc::strong_count(&counter), 1);
}
}
16 changes: 15 additions & 1 deletion crux_core/src/capability/shell_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ pub struct ShellRequest<T> {
shared_state: Arc<Mutex<SharedState<T>>>,
}

#[cfg(test)]
impl ShellRequest<()> {
pub(crate) fn new() -> Self {
Self {
shared_state: Arc::new(Mutex::new(SharedState {
result: None,
waker: None,
send_request: None,
})),
}
}
}

struct SharedState<T> {
result: Option<T>,
waker: Option<Waker>,
Expand All @@ -38,7 +51,8 @@ impl<T> Future for ShellRequest<T> {
match shared_state.result.take() {
Some(result) => Poll::Ready(result),
None => {
shared_state.waker = Some(cx.waker().clone());
let cloned_waker = cx.waker().clone();
shared_state.waker = Some(cloned_waker);
Poll::Pending
}
}
Expand Down
Loading