From e3701bebd98df4fc2b6a285cce2e4e78cb889299 Mon Sep 17 00:00:00 2001 From: Nathan Holland Date: Mon, 14 Feb 2022 12:35:50 -0400 Subject: [PATCH] Fix downloader async loop --- src/lib/downloader/downloader.ml | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/lib/downloader/downloader.ml b/src/lib/downloader/downloader.ml index d0773885d38..feb67693e95 100644 --- a/src/lib/downloader/downloader.ml +++ b/src/lib/downloader/downloader.ml @@ -602,6 +602,7 @@ end = struct , unit ) Strict_pipe.Writer.t (* buffer of length 0 *) + ; jobs_added_bvar : (unit, read_write) Bvar.t ; get : Peer.t -> Key.t list -> Result.t list Deferred.Or_error.t ; max_batch_size : int (* A peer is useful if there is a job in the pending queue which has not @@ -618,6 +619,8 @@ end = struct ; stop : unit Deferred.t } + let jobs_added t = Bvar.broadcast t.jobs_added_bvar () + let total_jobs (t : t) = Q.length t.pending + Hashtbl.length t.downloading (* Checks disjointness *) @@ -673,7 +676,12 @@ end = struct kill_job t j ; Useful_peers.update t.useful_peers (Job_cancelled h) - let enqueue t e = Q.enqueue t.pending e + let enqueue t e = + match Q.enqueue t.pending e with + | `Ok -> + jobs_added t ; `Ok + | `Key_already_present -> + `Key_already_present let enqueue_exn t e = assert ([%equal: [ `Ok | `Key_already_present ]] (enqueue t e) `Ok) @@ -703,6 +711,7 @@ end = struct ; get = _ ; got_new_peers_w ; flush_r = _ + ; jobs_added_bvar = _ ; useful_peers ; got_new_peers_r = _ ; pending @@ -756,6 +765,7 @@ end = struct in List.iter xs ~f:(fun x -> Hashtbl.set t.downloading ~key:x.key ~data:(peer, x, Time.now ())) ; + jobs_added t ; Useful_peers.update t.useful_peers (Download_starting peer) ; let download_deferred = t.get peer keys in upon download_deferred (fun res -> @@ -944,6 +954,7 @@ end = struct ; next_flush = None ; flush_r ; flush_w + ; jobs_added_bvar = Bvar.create () ; got_new_peers_r ; got_new_peers_w ; useful_peers = Useful_peers.create ~all_peers ~preferred @@ -965,27 +976,17 @@ end = struct with Broadcast_pipe.Already_closed _ -> Deferred.unit) ; r in - let deferred_ok = Deferred.return `Ok in - let deferred_finished = Deferred.return `Finished in - let const_eof _ = `Eof in let rec jobs_to_download stop = - if total_jobs t <> 0 then deferred_ok + if total_jobs t <> 0 then return `Ok else match%bind Deferred.choose - [ choice stop const_eof - ; choice - (Pipe.values_available - (Strict_pipe.Reader.to_linear_pipe t.flush_r).pipe) - Fn.id - ; choice - (Pipe.values_available - (Strict_pipe.Reader.to_linear_pipe t.useful_peers.r).pipe) - Fn.id + [ choice stop (Fn.const `Eof) + ; choice (Bvar.wait t.jobs_added_bvar) (Fn.const `Ok) ] with | `Eof -> - deferred_finished + return `Finished | `Ok -> jobs_to_download stop in