Skip to content

Commit

Permalink
Expose isReady to check if sync will block (#123 #22) (#124)
Browse files Browse the repository at this point in the history
* Expose isReady to check if sync will block (#123 #22)

* update README

* Add test

* The CI is very bad at precise sleep
  • Loading branch information
mratsim authored May 6, 2020
1 parent 6dbf0e5 commit 652399c
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 15 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ exit(Weave)
- `spawn fnCall(args)` which spawns a function that may run on another thread and gives you an awaitable Flowvar handle.
- `newPledge`, `fulfill` and `spawnDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships.
- `sync(Flowvar)` will await a Flowvar and block until you receive a result.
- `isReady(Flowvar)` will check if `sync` will actually block or return the result immediately.
- `syncRoot(Weave)` is a global barrier for the main thread on the main task.
Using syncRoot in a proc means that the can only be called from the main thread.
`syncRoot(Weave)` is implicitly called by `exit(Weave)`
Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

### v0.x.x - Unreleased

#### Features

- Added `isReady(Flowvar)` which will return true is `sync` would block on that Flowvar or if the result is actually immediately available.

### v0.4.0 - April 2020 - "Bespoke"

#### Compatibility
Expand Down
2 changes: 1 addition & 1 deletion weave.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export
parallelFor, parallelForStrided,
init, exit,
loadBalance,
isSpawned,
isSpawned, isReady,
getThreadId, getNumThreads,
# Experimental threadlocal prologue/epilogue
parallelForStaged, parallelForStagedStrided,
Expand Down
21 changes: 19 additions & 2 deletions weave/datatypes/flowvars.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,16 @@ EagerFV:
let resultSent {.used.} = fv.chan[].trySend(childResult)
postCondition: resultSent

template isFutReady*(fv: Flowvar): bool =
template tryComplete*[T](fv: Flowvar, parentResult: var T): bool =
fv.chan[].tryRecv(parentResult)

func isReady*[T](fv: Flowvar[T]): bool {.inline.} =
## Returns true if the result of a Flowvar is ready.
## In that case `sync` will not block.
## Otherwise the current will block to help on all the pending tasks
## until the Flowvar is ready.
not fv.chan[].isEmpty()

LazyFV:
proc recycleChannel*(fv: Flowvar) {.inline.} =
recycle(fv.lfv.lazy.chan)
Expand All @@ -108,13 +115,23 @@ LazyFV:
ascertain: not fv.lfv.lazy.chan.isNil
discard fv.lfv.lazy.chan[].trySend(childResult)

template isFutReady*(fv: Flowvar): bool =
template tryComplete*[T](fv: Flowvar, parentResult: var T): bool =
if fv.lfv.hasChannel:
ascertain: not fv.lfv.lazy.chan.isNil
fv.lfv.lazy.chan[].tryRecv(parentResult)
else:
fv.lfv.isReady

func isReady*[T](fv: Flowvar[T]): bool {.inline.} =
## Returns true if the result of a Flowvar is ready.
## In that case `sync` will not block.
## Otherwise the current will block to help on all the pending tasks
## until the Flowvar is ready.
if not fv.lfv.hasChannel:
fv.lfv.isReady
else:
not fv.lfv.lazy.chan[].isEmpty()

import sync_types

proc convertLazyFlowvar*(task: Task) {.inline.}=
Expand Down
32 changes: 31 additions & 1 deletion weave/parallel_tasks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ macro spawnDelayed*(pledges: varargs[typed], fnCall: typed): untyped =
# --------------------------------------------------------

when isMainModule:
import ./runtime, ./state_machines/[sync, sync_root], os
import ./runtime, ./state_machines/[sync, sync_root], os, std/[times, monotimes]

block: # Async without result

Expand Down Expand Up @@ -244,6 +244,36 @@ when isMainModule:

main2()

block: # isReady
proc sleepingLion(ms: int): int =
sleep(ms)
echo "--> Slept for ", ms, " ms"
return ms

proc main2() =
echo "Sanity check 3: isReady"
const target = 123

init(Weave)
echo "Spawning sleeping thread for ", target, " ms"
let start = getMonoTime()
let f = spawn sleepingLion(123)
while not f.isReady():
cpuRelax()
let stopReady = getMonoTime()
let res = sync(f)
let stopSync = getMonoTime()
exit(Weave)

let readyTime = inMilliseconds(stopReady-start)
let syncTime = inMilliseconds(stopSync-stopReady)

echo "Retrieved: ", res, " (isReady: ", readyTime, " ms, sync: ", syncTime, " ms)"
doAssert syncTime <= 1, "sync should be non-blocking"
# doAssert readyTime in {target-1 .. target+1}, "asking to sleep for " & $target & " ms but slept for " & $readyTime

main2()

block: # Delayed computation

proc echoA(pA: Pledge) =
Expand Down
20 changes: 10 additions & 10 deletions weave/state_machines/sync.dot
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
digraph awaitFSA{
splines=ortho;
node [shape = doublecircle]; InitialState AW_Exit;
node [shape = circle, fontcolor=white, fillcolor=darkslategrey, style="filled"]; AW_Steal AW_SuccessfulTheft AW_CheckTask AW_OutOfChildTasks;
node [shape = circle, fontcolor=white, fillcolor=darkslategrey, style="filled"]; AW_Steal AW_SuccessfulTheft AW_CheckTask AW_OutOfDirectChildTasks;
InitialState -> AW_CheckTask [color="black:invis:black", xlabel="entry point"];
node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; AW_Steal_AWE_ReceivedTask AW_CheckTask_AWE_HasChildTask ;
node [shape = diamond, fontcolor=black, fillcolor=coral, style="rounded,filled"]; AW_Steal_AWE_FutureReady AW_CheckTask_AWE_FutureReady AW_OutOfChildTasks_AWE_FutureReady ;
node [shape = diamond, fontcolor=black, fillcolor=coral, style="rounded,filled"]; AW_Steal_AWE_FutureReady AW_CheckTask_AWE_FutureReady AW_OutOfDirectChildTasks_AWE_FutureReady ;
AW_Steal_AWE_ReceivedTask [label="AWE_ReceivedTask\nlootedTask"];
AW_CheckTask_AWE_HasChildTask [label="AWE_HasChildTask\nnot task.isNil"];
AW_Steal_AWE_FutureReady [label="AWE_FutureReady\nisFutReady(fv)"];
AW_CheckTask_AWE_FutureReady [label="AWE_FutureReady\nisFutReady(fv)"];
AW_OutOfChildTasks_AWE_FutureReady [label="AWE_FutureReady\nisFutReady(fv)"];
AW_Steal_AWE_FutureReady [label="AWE_FutureReady\ntryComplete(fv, parentResult)"];
AW_CheckTask_AWE_FutureReady [label="AWE_FutureReady\ntryComplete(fv, parentResult)"];
AW_OutOfDirectChildTasks_AWE_FutureReady [label="AWE_FutureReady\ntryComplete(fv, parentResult)"];
AW_Steal -> AW_Steal_AWE_FutureReady[style=bold, xlabel="always"];
AW_Steal_AWE_FutureReady -> AW_Exit [color="coral", fontcolor="coral", xlabel="interrupted"];
AW_Steal_AWE_FutureReady -> AW_Steal_AWE_ReceivedTask[xlabel="normal flow"];
AW_Steal_AWE_ReceivedTask -> AW_SuccessfulTheft [style=dashed, xlabel="true"];
AW_Steal_AWE_ReceivedTask -> AW_Steal [xlabel="default"];
AW_SuccessfulTheft -> AW_OutOfChildTasks [xlabel="default"];
AW_SuccessfulTheft -> AW_OutOfDirectChildTasks [xlabel="default"];
AW_CheckTask -> AW_CheckTask_AWE_FutureReady[style=bold, xlabel="always"];
AW_CheckTask_AWE_FutureReady -> AW_Exit [color="coral", fontcolor="coral", xlabel="interrupted"];
AW_CheckTask_AWE_FutureReady -> AW_CheckTask_AWE_HasChildTask[xlabel="normal flow"];
AW_CheckTask_AWE_HasChildTask -> AW_CheckTask [style=dashed, xlabel="true"];
AW_CheckTask_AWE_HasChildTask -> AW_OutOfChildTasks [xlabel="default"];
AW_OutOfChildTasks -> AW_OutOfChildTasks_AWE_FutureReady[style=bold, xlabel="always"];
AW_OutOfChildTasks_AWE_FutureReady -> AW_Exit [color="coral", fontcolor="coral", xlabel="interrupted"];
AW_OutOfChildTasks_AWE_FutureReady -> AW_Steal [xlabel="default"];
AW_CheckTask_AWE_HasChildTask -> AW_OutOfDirectChildTasks [xlabel="default"];
AW_OutOfDirectChildTasks -> AW_OutOfDirectChildTasks_AWE_FutureReady[style=bold, xlabel="always"];
AW_OutOfDirectChildTasks_AWE_FutureReady -> AW_Exit [color="coral", fontcolor="coral", xlabel="interrupted"];
AW_OutOfDirectChildTasks_AWE_FutureReady -> AW_Steal [xlabel="default"];
}
2 changes: 1 addition & 1 deletion weave/state_machines/sync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ setTerminalState(awaitFSA, AW_Exit)
# -------------------------------------------

implEvent(awaitFSA, AWE_FutureReady):
isFutReady(fv)
tryComplete(fv, parentResult)

behavior(awaitFSA):
# In AW_Steal we might recv tasks and steal requests which get stuck in our queues
Expand Down
Binary file modified weave/state_machines/sync.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 652399c

Please sign in to comment.