Skip to content

Commit

Permalink
Merge pull request #10237 from MinaProtocol/fix/super-catchup-async-loop
Browse files Browse the repository at this point in the history
Fix Super Catchup Async Loop
  • Loading branch information
nholland94 authored Feb 14, 2022
2 parents 3674703 + e3701be commit 60fabf0
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions src/lib/downloader/downloader.ml
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ end = struct
, unit )
Strict_pipe.Writer.t
(* buffer of length 0 *)
; jobs_added_bvar : (unit, read_write) Bvar.t
; get : Peer.t -> Key.t list -> Result.t list Deferred.Or_error.t
; max_batch_size : int
(* A peer is useful if there is a job in the pending queue which has not
Expand All @@ -618,6 +619,8 @@ end = struct
; stop : unit Deferred.t
}

let jobs_added t = Bvar.broadcast t.jobs_added_bvar ()

let total_jobs (t : t) = Q.length t.pending + Hashtbl.length t.downloading

(* Checks disjointness *)
Expand Down Expand Up @@ -673,7 +676,12 @@ end = struct
kill_job t j ;
Useful_peers.update t.useful_peers (Job_cancelled h)

let enqueue t e = Q.enqueue t.pending e
let enqueue t e =
match Q.enqueue t.pending e with
| `Ok ->
jobs_added t ; `Ok
| `Key_already_present ->
`Key_already_present

let enqueue_exn t e =
assert ([%equal: [ `Ok | `Key_already_present ]] (enqueue t e) `Ok)
Expand Down Expand Up @@ -703,6 +711,7 @@ end = struct
; get = _
; got_new_peers_w
; flush_r = _
; jobs_added_bvar = _
; useful_peers
; got_new_peers_r = _
; pending
Expand Down Expand Up @@ -756,6 +765,7 @@ end = struct
in
List.iter xs ~f:(fun x ->
Hashtbl.set t.downloading ~key:x.key ~data:(peer, x, Time.now ())) ;
jobs_added t ;
Useful_peers.update t.useful_peers (Download_starting peer) ;
let download_deferred = t.get peer keys in
upon download_deferred (fun res ->
Expand Down Expand Up @@ -944,6 +954,7 @@ end = struct
; next_flush = None
; flush_r
; flush_w
; jobs_added_bvar = Bvar.create ()
; got_new_peers_r
; got_new_peers_w
; useful_peers = Useful_peers.create ~all_peers ~preferred
Expand All @@ -965,27 +976,17 @@ end = struct
with Broadcast_pipe.Already_closed _ -> Deferred.unit) ;
r
in
let deferred_ok = Deferred.return `Ok in
let deferred_finished = Deferred.return `Finished in
let const_eof _ = `Eof in
let rec jobs_to_download stop =
if total_jobs t <> 0 then deferred_ok
if total_jobs t <> 0 then return `Ok
else
match%bind
Deferred.choose
[ choice stop const_eof
; choice
(Pipe.values_available
(Strict_pipe.Reader.to_linear_pipe t.flush_r).pipe)
Fn.id
; choice
(Pipe.values_available
(Strict_pipe.Reader.to_linear_pipe t.useful_peers.r).pipe)
Fn.id
[ choice stop (Fn.const `Eof)
; choice (Bvar.wait t.jobs_added_bvar) (Fn.const `Ok)
]
with
| `Eof ->
deferred_finished
return `Finished
| `Ok ->
jobs_to_download stop
in
Expand Down

0 comments on commit 60fabf0

Please sign in to comment.